Skip to content

Commit

Permalink
utils: Reference mapping finishes when source finishes
Browse files Browse the repository at this point in the history
"Shared References" is an RxPlayer structure which allows to pass a
value around and making receivers able to register a callback to listen
and react for when that value changes, with added niceties like being
able through type only to enforce that some code block are only able to
read that value, not update it.

It is used a lot to implement RxPlayer API which can change the
RxPlayer's behavior during playback such as `wantedBufferAhead`,
`setPlaybackRate` but also for things like track switching. It was
originally created when removing RxJS from the RxPlayer, and it is
similar to RxJS' `BehaviorSubject` in some ways.

It turned out that we also needed some kind of mapping function for some
edge cases where we want to communicate a value based on a Shared
Reference. An example is the "buffer goal", the size of the buffer
constructed by the RxPlayer, which is based on the `wantedBufferAhead`
option but with a `<=1` factor applied depending on past buffer issues.

Such mapping function was simple enough to implement but there was still
a potential for a (light) memory leak: when/if the source "Shared
Reference" was "finished" (all its listeners are removed and it cannot
emit anymore), the mapped reference wasn't (yet still coudn't emit
anymore).

This is not a real isue right now but it could become one, so I chose to
fix that problem. Sadly, this meant implementing a listener for when a
shared reference is finished, which I do not find particularly elegant.
As such, I made it clear that it is not supposed to be used frequently by
calling it `_onFinished` (so, prepended with an underscore).
  • Loading branch information
peaBerberian committed Sep 26, 2023
1 parent 2b928bb commit f6095fd
Showing 1 changed file with 70 additions and 4 deletions.
74 changes: 70 additions & 4 deletions src/utils/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

import arrayFindIndex from "./array_find_index";
import noop from "./noop";
import { CancellationSignal } from "./task_canceller";

/**
Expand Down Expand Up @@ -131,6 +133,19 @@ export interface ISharedReference<T> {
* `ISharedReference`.
*/
finish() : void;

/**
* Allows to register a callback for when the Shared Reference is "finished".
*
* This function is mostly there for implementing operators on the shared
* reference and isn't meant to be used by regular code, hence it being
* prefixed by `_`.
* @param {Function} cb - Callback to be called once the reference is
* finished.
* @param {Object} cancelSignal - Allows to provide a CancellationSignal which
* will unregister the callback when it emits.
*/
_onFinished(cb: () => void, cancelSignal: CancellationSignal) : () => void;
}

/**
Expand Down Expand Up @@ -159,7 +174,8 @@ export type IReadOnlySharedReference<T> =
Pick<ISharedReference<T>,
"getValue" |
"onUpdate" |
"waitUntilDefined">;
"waitUntilDefined" |
"_onFinished">;

/**
* Create an `ISharedReference` object encapsulating the mutable `initialValue`
Expand Down Expand Up @@ -204,6 +220,9 @@ export default function createSharedReference<T>(

let isFinished = false;

const onFinishCbs : Array<{ trigger : () => void;
hasBeenCleared : boolean; }> = [];

if (cancelSignal !== undefined) {
cancelSignal.register(finish);
}
Expand Down Expand Up @@ -335,6 +354,38 @@ export default function createSharedReference<T>(
}, { clearSignal: options?.clearSignal, emitCurrentValue: true });
},

/**
* Allows to register a callback for when the Shared Reference is "finished".
*
* This function is mostly there for implementing operators on the shared
* reference and isn't meant to be used by regular code, hence it being
* prefixed by `_`.
* @param {Function} cb - Callback to be called once the reference is
* finished.
* @param {Object} onFinishCancelSignal - Allows to provide a
* CancellationSignal which will unregister the callback when it emits.
*/
_onFinished(cb: () => void, onFinishCancelSignal: CancellationSignal) : () => void {
if (onFinishCancelSignal.isCancelled()) {
return noop;
}
const trigger = () => {
cleanUp();
cb();
};
const deregisterCancellation = onFinishCancelSignal.register(cleanUp);
onFinishCbs.push({ trigger, hasBeenCleared: false });
return deregisterCancellation;

function cleanUp() {
const indexOf = arrayFindIndex(onFinishCbs, (x) => x.trigger === trigger);
if (indexOf >= 0) {
onFinishCbs[indexOf].hasBeenCleared = true;
onFinishCbs.splice(indexOf, 1);
}
}
},

/**
* Indicate that no new values will be emitted.
* Allows to automatically free all listeners linked to this reference.
Expand All @@ -358,12 +409,26 @@ export default function createSharedReference<T>(
}
}
cbs.length = 0;
if (onFinishCbs.length > 0) {
const clonedFinishedCbs = onFinishCbs.slice();
for (const cbObj of clonedFinishedCbs) {
try {
if (!cbObj.hasBeenCleared) {
cbObj.trigger();
cbObj.hasBeenCleared = true;
}
} catch (_) {
/* nothing */
}
}
onFinishCbs.length = 0;
}
}
}

/**
* Create a new `ISharedReference` based on another one by mapping over its
* referenced value each time it is updated.
* referenced value each time it is updated and finishing once it finishes.
* @param {Object} originalRef - The Original `ISharedReference` you wish to map
* over.
* @param {Function} mappingFn - The mapping function which will receives
Expand All @@ -383,8 +448,9 @@ export function createMappedReference<T, U>(
newRef.setValue(mappingFn(x));
}, { clearSignal: cancellationSignal });

// TODO nothing is done if `originalRef` is finished, though the returned
// reference could also be finished in that case. To do?
originalRef._onFinished(() => {
newRef.finish();
}, cancellationSignal);

return newRef;
}
Expand Down

0 comments on commit f6095fd

Please sign in to comment.