Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MULTI_THREAD timing issue on Samsung TV #1556

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/core/main/worker/worker_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export default function initializeWorkerMain() {
*/
let playbackObservationRef: SharedReference<IWorkerPlaybackObservation> | null = null;

onmessageerror = (_msg: MessageEvent) => {
log.error("MTCI: Error when receiving message from main thread.");
};
onmessage = function (e: MessageEvent<IMainThreadMessage>) {
log.debug("Worker: received message", e.data.type);

Expand Down
137 changes: 94 additions & 43 deletions src/main_thread/init/multi_thread_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
/** Constructor settings associated to this `MultiThreadContentInitializer`. */
private _settings: IInitializeArguments;

/**
* The WebWorker may be sending messages as soon as we're preparing the
* content but the `MultiThreadContentInitializer` is only able to handle all of
* them only once `start`ed.
*
* As such `_queuedWorkerMessages` is set to an Array when `prepare` has been
* called but not `start` yet, and contains all worker messages that have to
* be processed when `start` is called.
*
* It is set to `null` when there's no need to rely on that queue (either not
* yet `prepare`d or already `start`ed).
*/
private _queuedWorkerMessages: MessageEvent[] | null;

/**
* Information relative to the current loaded content.
*
Expand Down Expand Up @@ -123,6 +137,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
lastMessageId: 0,
resolvers: {},
};
this._queuedWorkerMessages = null;
}

/**
Expand Down Expand Up @@ -176,6 +191,66 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
if (this._initCanceller.isUsed()) {
return;
}
this._queuedWorkerMessages = [];
log.debug("MTCI: addEventListener prepare buffering worker messages");
const onmessage = (evt: MessageEvent): void => {
const msgData = evt.data as unknown as IWorkerMessage;
const type = msgData.type;
switch (type) {
case WorkerMessageType.LogMessage: {
const formatted = msgData.value.logs.map((l) => {
switch (typeof l) {
case "string":
case "number":
case "boolean":
case "undefined":
return l;
case "object":
if (l === null) {
return null;
}
return formatWorkerError(l);
default:
assertUnreachable(l);
}
});
switch (msgData.value.logLevel) {
case "NONE":
break;
case "ERROR":
log.error(...formatted);
break;
case "WARNING":
log.warn(...formatted);
break;
case "INFO":
log.info(...formatted);
break;
case "DEBUG":
log.debug(...formatted);
break;
default:
assertUnreachable(msgData.value.logLevel);
}
break;
}
default:
if (this._queuedWorkerMessages !== null) {
this._queuedWorkerMessages.push(evt);
}
break;
}
};
this._settings.worker.addEventListener("message", onmessage);
const onmessageerror = (_msg: MessageEvent) => {
log.error("MTCI: Error when receiving message from worker.");
};
this._settings.worker.addEventListener("messageerror", onmessageerror);
this._initCanceller.signal.register(() => {
log.debug("MTCI: removeEventListener prepare for worker message");
this._settings.worker.removeEventListener("message", onmessage);
this._settings.worker.removeEventListener("messageerror", onmessageerror);
});

// Also bind all `SharedReference` objects:

Expand Down Expand Up @@ -1042,49 +1117,6 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
}
break;

case WorkerMessageType.LogMessage: {
const formatted = msgData.value.logs.map((l) => {
switch (typeof l) {
case "string":
case "number":
case "boolean":
case "undefined":
return l;
case "object":
if (l === null) {
return null;
}
return formatWorkerError(l);
default:
assertUnreachable(l);
}
});
switch (msgData.value.logLevel) {
case "NONE":
break;
case "ERROR":
log.error(...formatted);
break;
case "WARNING":
log.warn(...formatted);
break;
case "INFO":
log.info(...formatted);
break;
case "DEBUG":
log.debug(...formatted);
break;
default:
assertUnreachable(msgData.value.logLevel);
}
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

case WorkerMessageType.SegmentSinkStoreUpdate: {
if (this._currentContentInfo?.contentId !== msgData.contentId) {
return;
Expand All @@ -1098,13 +1130,32 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
}
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

case WorkerMessageType.LogMessage:
// Already handled by prepare's handler
break;
default:
assertUnreachable(msgData);
}
};

log.debug("MTCI: addEventListener for worker message");
if (this._queuedWorkerMessages !== null) {
const bufferedMessages = this._queuedWorkerMessages.slice();
log.debug("MTCI: Processing buffered messages", bufferedMessages.length);
for (const message of bufferedMessages) {
onmessage(message);
}
this._queuedWorkerMessages = null;
}
this._settings.worker.addEventListener("message", onmessage);
this._initCanceller.signal.register(() => {
log.debug("MTCI: removeEventListener for worker message");
this._settings.worker.removeEventListener("message", onmessage);
});
}
Expand Down
Loading