Skip to content

Commit

Permalink
Merge pull request #1556 from canalplus/fix/samsung-multi-thread-timi…
Browse files Browse the repository at this point in the history
…ng-issue

Fix MULTI_THREAD timing issue on Samsung TV
  • Loading branch information
peaBerberian authored Sep 25, 2024
2 parents f5e5ccb + 158f5cb commit c80e239
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 43 deletions.
3 changes: 3 additions & 0 deletions src/core/main/worker/worker_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ export default function initializeWorkerMain() {
*/
let playbackObservationRef: SharedReference<IWorkerPlaybackObservation> | null = null;

onmessageerror = (_msg: MessageEvent) => {
log.error("MTCI: Error when receiving message from main thread.");
};
onmessage = function (e: MessageEvent<IMainThreadMessage>) {
log.debug("Worker: received message", e.data.type);

Expand Down
137 changes: 94 additions & 43 deletions src/main_thread/init/multi_thread_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
/** Constructor settings associated to this `MultiThreadContentInitializer`. */
private _settings: IInitializeArguments;

/**
* The WebWorker may be sending messages as soon as we're preparing the
* content but the `MultiThreadContentInitializer` is only able to handle all of
* them only once `start`ed.
*
* As such `_queuedWorkerMessages` is set to an Array when `prepare` has been
* called but not `start` yet, and contains all worker messages that have to
* be processed when `start` is called.
*
* It is set to `null` when there's no need to rely on that queue (either not
* yet `prepare`d or already `start`ed).
*/
private _queuedWorkerMessages: MessageEvent[] | null;

/**
* Information relative to the current loaded content.
*
Expand Down Expand Up @@ -123,6 +137,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
lastMessageId: 0,
resolvers: {},
};
this._queuedWorkerMessages = null;
}

/**
Expand Down Expand Up @@ -176,6 +191,66 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
if (this._initCanceller.isUsed()) {
return;
}
this._queuedWorkerMessages = [];
log.debug("MTCI: addEventListener prepare buffering worker messages");
const onmessage = (evt: MessageEvent): void => {
const msgData = evt.data as unknown as IWorkerMessage;
const type = msgData.type;
switch (type) {
case WorkerMessageType.LogMessage: {
const formatted = msgData.value.logs.map((l) => {
switch (typeof l) {
case "string":
case "number":
case "boolean":
case "undefined":
return l;
case "object":
if (l === null) {
return null;
}
return formatWorkerError(l);
default:
assertUnreachable(l);
}
});
switch (msgData.value.logLevel) {
case "NONE":
break;
case "ERROR":
log.error(...formatted);
break;
case "WARNING":
log.warn(...formatted);
break;
case "INFO":
log.info(...formatted);
break;
case "DEBUG":
log.debug(...formatted);
break;
default:
assertUnreachable(msgData.value.logLevel);
}
break;
}
default:
if (this._queuedWorkerMessages !== null) {
this._queuedWorkerMessages.push(evt);
}
break;
}
};
this._settings.worker.addEventListener("message", onmessage);
const onmessageerror = (_msg: MessageEvent) => {
log.error("MTCI: Error when receiving message from worker.");
};
this._settings.worker.addEventListener("messageerror", onmessageerror);
this._initCanceller.signal.register(() => {
log.debug("MTCI: removeEventListener prepare for worker message");
this._settings.worker.removeEventListener("message", onmessage);
this._settings.worker.removeEventListener("messageerror", onmessageerror);
});

// Also bind all `SharedReference` objects:

Expand Down Expand Up @@ -1042,49 +1117,6 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
}
break;

case WorkerMessageType.LogMessage: {
const formatted = msgData.value.logs.map((l) => {
switch (typeof l) {
case "string":
case "number":
case "boolean":
case "undefined":
return l;
case "object":
if (l === null) {
return null;
}
return formatWorkerError(l);
default:
assertUnreachable(l);
}
});
switch (msgData.value.logLevel) {
case "NONE":
break;
case "ERROR":
log.error(...formatted);
break;
case "WARNING":
log.warn(...formatted);
break;
case "INFO":
log.info(...formatted);
break;
case "DEBUG":
log.debug(...formatted);
break;
default:
assertUnreachable(msgData.value.logLevel);
}
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

case WorkerMessageType.SegmentSinkStoreUpdate: {
if (this._currentContentInfo?.contentId !== msgData.contentId) {
return;
Expand All @@ -1098,13 +1130,32 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
}
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

case WorkerMessageType.LogMessage:
// Already handled by prepare's handler
break;
default:
assertUnreachable(msgData);
}
};

log.debug("MTCI: addEventListener for worker message");
if (this._queuedWorkerMessages !== null) {
const bufferedMessages = this._queuedWorkerMessages.slice();
log.debug("MTCI: Processing buffered messages", bufferedMessages.length);
for (const message of bufferedMessages) {
onmessage(message);
}
this._queuedWorkerMessages = null;
}
this._settings.worker.addEventListener("message", onmessage);
this._initCanceller.signal.register(() => {
log.debug("MTCI: removeEventListener for worker message");
this._settings.worker.removeEventListener("message", onmessage);
});
}
Expand Down

0 comments on commit c80e239

Please sign in to comment.