Skip to content

Commit

Permalink
Bandwidth calculator. (#323)
Browse files Browse the repository at this point in the history
* Rename file.

* Add bandwidth calculator.

* Add bandwidth calculator.

* Git use another approach. Suppress loading intervals together.

* Use time shift instead of loading intervals.

---------

Co-authored-by: Igor Zolotarenko <[email protected]>
  • Loading branch information
i-zolotarenko and i-zolotarenko authored Jan 2, 2024
1 parent 0508d9f commit 39ec83d
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 104 deletions.
57 changes: 0 additions & 57 deletions packages/p2p-media-loader-core/src/bandwidth-approximator.ts

This file was deleted.

63 changes: 63 additions & 0 deletions packages/p2p-media-loader-core/src/bandwidth-calculator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const CLEAR_THRESHOLD_MS = 3000;

export class BandwidthCalculator {
private simultaneousLoadingsCount = 0;
private readonly bytes: number[] = [];
private readonly timestamps: number[] = [];
private noLoadingsTotalTime = 0;
private allLoadingsStoppedTimestamp = 0;

addBytes(bytesLength: number, now = performance.now()) {
this.bytes.push(bytesLength);
this.timestamps.push(now - this.noLoadingsTotalTime);
}

startLoading(now = performance.now()) {
this.clearStale();
if (this.simultaneousLoadingsCount === 0) {
this.noLoadingsTotalTime += now - this.allLoadingsStoppedTimestamp;
}
this.simultaneousLoadingsCount++;
}

// in bits per second
stopLoading(now = performance.now()) {
if (this.simultaneousLoadingsCount <= 0) return;
this.simultaneousLoadingsCount--;
if (this.simultaneousLoadingsCount !== 0) return;
this.allLoadingsStoppedTimestamp = now;
}

getBandwidthForLastNSeconds(seconds: number) {
if (!this.timestamps.length) return 0;
const milliseconds = seconds * 1000;
const lastItemTimestamp = this.timestamps[this.timestamps.length - 1];
let lastCountedTimestamp = lastItemTimestamp;
const threshold = lastItemTimestamp - milliseconds;
let totalBytes = 0;

for (let i = this.bytes.length - 1; i >= 0; i--) {
const timestamp = this.timestamps[i];
if (timestamp < threshold) break;
lastCountedTimestamp = timestamp;
totalBytes += this.bytes[i];
}

return (totalBytes * 8000) / (lastItemTimestamp - lastCountedTimestamp);
}

clearStale() {
if (!this.timestamps.length) return;
const threshold =
this.timestamps[this.timestamps.length - 1] - CLEAR_THRESHOLD_MS;

let samplesToRemove = 0;
for (const timestamp of this.timestamps) {
if (timestamp > threshold) break;
samplesToRemove++;
}

this.bytes.splice(0, samplesToRemove);
this.timestamps.splice(0, samplesToRemove);
}
}
7 changes: 3 additions & 4 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "./types";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { EngineCallbacks } from "./request";
import { SegmentsMemoryStorage } from "./segments-storage";

Expand All @@ -31,7 +31,7 @@ export class Core<TStream extends Stream = Stream> {
httpErrorRetries: 3,
p2pErrorRetries: 3,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private readonly bandwidthCalculator = new BandwidthCalculator();
private segmentStorage?: SegmentsMemoryStorage;
private mainStreamLoader?: HybridLoader;
private secondaryStreamLoader?: HybridLoader;
Expand Down Expand Up @@ -113,7 +113,6 @@ export class Core<TStream extends Stream = Stream> {
this.mainStreamLoader = undefined;
this.secondaryStreamLoader = undefined;
this.segmentStorage = undefined;
this.bandwidthApproximator.destroy();
this.manifestResponseUrl = undefined;
}

Expand Down Expand Up @@ -145,7 +144,7 @@ export class Core<TStream extends Stream = Stream> {
manifestResponseUrl,
segment,
this.settings,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.segmentStorage,
this.eventHandlers
);
Expand Down
18 changes: 6 additions & 12 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Segment, StreamWithSegments } from "./index";
import { HttpRequestExecutor } from "./http-loader";
import { SegmentsMemoryStorage } from "./segments-storage";
import { Settings, CoreEventHandlers, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { P2PLoadersContainer } from "./p2p/loaders-container";
import { RequestsContainer } from "./request-container";
import { EngineCallbacks } from "./request";
Expand Down Expand Up @@ -30,7 +30,7 @@ export class HybridLoader {
private streamManifestUrl: string,
requestedSegment: Segment,
private readonly settings: Settings,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly segmentStorage: SegmentsMemoryStorage,
private readonly eventHandlers?: Pick<CoreEventHandlers, "onSegmentLoaded">
) {
Expand All @@ -40,7 +40,7 @@ export class HybridLoader {
this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream);
this.requests = new RequestsContainer(
this.requestProcessQueueMicrotask,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down Expand Up @@ -94,7 +94,7 @@ export class HybridLoader {
if (data) {
callbacks.onSuccess({
data,
bandwidth: this.bandwidthApproximator.getBandwidth(),
bandwidth: this.bandwidthCalculator.getBandwidthForLastNSeconds(3),
});
}
} else {
Expand Down Expand Up @@ -344,7 +344,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "http" && request.status === "loading") {
Expand All @@ -359,7 +359,7 @@ export class HybridLoader {
queue: QueueUtils.QueueItem[],
segment: Segment
): boolean {
for (const { segment: itemSegment } of arrayBackwards(queue)) {
for (const { segment: itemSegment } of Utils.arrayBackwards(queue)) {
if (itemSegment === segment) break;
const request = this.requests.get(itemSegment);
if (request?.type === "p2p" && request.status === "loading") {
Expand Down Expand Up @@ -403,9 +403,3 @@ export class HybridLoader {
this.logger.destroy();
}
}

function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}
14 changes: 0 additions & 14 deletions packages/p2p-media-loader-core/src/linked-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ export class LinkedMap<K, V extends object> {
private _first?: LinkedObject<K, V>;
private _last?: LinkedObject<K, V>;

get first() {
return this._first?.item;
}

get last() {
return this._last?.item;
}

get size() {
return this.map.size;
}
Expand Down Expand Up @@ -53,12 +45,6 @@ export class LinkedMap<K, V extends object> {
this.map.delete(key);
}

clear() {
this._first = undefined;
this._last = undefined;
this.map.clear();
}

*values(key?: K) {
let value = key ? this.map.get(key) : this._first;
if (value === undefined) return;
Expand Down
6 changes: 3 additions & 3 deletions packages/p2p-media-loader-core/src/request-container.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Segment, Settings, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import { Request } from "./request";

export class RequestsContainer {
private readonly requests = new Map<Segment, Request>();

constructor(
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: Settings
) {}
Expand Down Expand Up @@ -44,7 +44,7 @@ export class RequestsContainer {
request = new Request(
segment,
this.requestProcessQueueCallback,
this.bandwidthApproximator,
this.bandwidthCalculator,
this.playback,
this.settings
);
Expand Down
24 changes: 10 additions & 14 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Segment, SegmentResponse, Playback } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { BandwidthCalculator } from "./bandwidth-calculator";
import * as StreamUtils from "./utils/stream";
import * as Utils from "./utils/utils";
import * as LoggerUtils from "./utils/logger";
Expand Down Expand Up @@ -73,7 +73,7 @@ export class Request {
constructor(
readonly segment: Segment,
private readonly requestProcessQueueCallback: () => void,
private readonly bandwidthApproximator: BandwidthApproximator,
private readonly bandwidthCalculator: BandwidthCalculator,
private readonly playback: Playback,
private readonly settings: StreamUtils.PlaybackTimeWindowsSettings
) {
Expand Down Expand Up @@ -179,7 +179,7 @@ export class Request {
loadedBytes: 0,
startTimestamp: performance.now(),
};
this.bandwidthApproximator.addLoading(this.progress);
this.bandwidthCalculator.startLoading();
const { notReceivingBytesTimeoutMs, abort } = controls;
this._abortRequestCallback = abort;

Expand Down Expand Up @@ -207,10 +207,8 @@ export class Request {

resolveEngineCallbacksSuccessfully() {
if (!this.finalData) return;
this._engineCallbacks?.onSuccess({
data: this.finalData,
bandwidth: this.bandwidthApproximator.getBandwidth(),
});
const bandwidth = this.bandwidthCalculator.getBandwidthForLastNSeconds(3);
this._engineCallbacks?.onSuccess({ data: this.finalData, bandwidth });
this._engineCallbacks = undefined;
}

Expand All @@ -236,6 +234,7 @@ export class Request {
this._abortRequestCallback = undefined;
this.currentAttempt = undefined;
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
}

private abortOnTimeout = () => {
Expand All @@ -252,6 +251,7 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

Expand All @@ -266,19 +266,20 @@ export class Request {
error,
});
this.notReceivingBytesTimeout.clear();
this.bandwidthCalculator.stopLoading();
this.requestProcessQueueCallback();
};

private completeOnSuccess = () => {
this.throwErrorIfNotLoadingStatus();
if (!this.currentAttempt) return;

this.bandwidthCalculator.stopLoading();
this.notReceivingBytesTimeout.clear();
this.finalData = Utils.joinChunks(this.bytes);
this.setStatus("succeed");
this._totalBytes = this._loadedBytes;

this.resolveEngineCallbacksSuccessfully();
this.logger(
`${this.currentAttempt.type} ${this.segment.externalId} succeed`
);
Expand All @@ -290,6 +291,7 @@ export class Request {
if (!this.currentAttempt || !this.progress) return;
this.notReceivingBytesTimeout.restart();

this.bandwidthCalculator.addBytes(chunk.length);
this.bytes.push(chunk);
this.progress.lastLoadedChunkTimestamp = performance.now();
this.progress.loadedBytes += chunk.length;
Expand Down Expand Up @@ -320,11 +322,6 @@ export class Request {

class FailedRequestAttempts {
private attempts: Required<RequestAttempt>[] = [];
private _lastClearTimestamp = performance.now();

get lastClearTimestamp() {
return this._lastClearTimestamp;
}

add(attempt: Required<RequestAttempt>) {
this.attempts.push(attempt);
Expand All @@ -343,7 +340,6 @@ class FailedRequestAttempts {

clear() {
this.attempts = [];
this._lastClearTimestamp = performance.now();
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/p2p-media-loader-core/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ export function hexToUtf8(hexString: string) {
const decoder = new TextDecoder();
return decoder.decode(bytes);
}

export function* arrayBackwards<T>(arr: T[]) {
for (let i = arr.length - 1; i >= 0; i--) {
yield arr[i];
}
}

0 comments on commit 39ec83d

Please sign in to comment.