Skip to content

Commit

Permalink
Add random http loading.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Oct 2, 2023
1 parent 2b59ee2 commit 15cfdda
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 53 deletions.
105 changes: 61 additions & 44 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class HybridLoader {
private readonly playback: Playback;
private lastQueueProcessingTimeStamp?: number;
private readonly segmentAvgDuration: number;
private readonly randomHttpDownloadInterval: number;

constructor(
private streamManifestUrl: string,
Expand Down Expand Up @@ -48,6 +49,11 @@ export class HybridLoader {
this.segmentStorage,
this.settings
);

this.randomHttpDownloadInterval = window.setInterval(
() => this.loadRandomThroughHttp(),
1000
);
}

// api method for engines
Expand Down Expand Up @@ -86,7 +92,7 @@ export class HybridLoader {
lastRequestedSegment: this.lastRequestedSegment,
playback: this.playback,
settings: this.settings,
isSegmentLoaded: (segment) => this.segmentStorage.hasSegment(segment),
skipSegment: (segment) => this.segmentStorage.hasSegment(segment),
});

this.requests.abortAllNotRequestedByEngine((segment) =>
Expand All @@ -96,17 +102,6 @@ export class HybridLoader {
const { simultaneousHttpDownloads, simultaneousP2PDownloads } =
this.settings;

for (const request of this.requests.engineRequests()) {
const { segment, loaderRequest } = request;
if (
!queueSegmentIds.has(segment.localId) &&
!loaderRequest &&
segment.startTime < this.lastRequestedSegment.startTime
) {
request.engineCallbacks.onError(new RequestAbortError());
}
}

for (const { segment, statuses } of queue) {
// const timeToPlayback = getTimeToSegmentPlayback(segment, this.playback);
if (statuses.isHighDemand) {
Expand All @@ -130,27 +125,27 @@ export class HybridLoader {
continue;
}

// this.abortLastHttpLoadingAfter(queue, segment.localId);
// if (this.requests.httpRequestsCount < simultaneousHttpDownloads) {
// void this.loadThroughHttp(segment);
// continue;
// }
//
// if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) {
// void this.loadThroughP2P(segment);
// }
//
// this.abortLastP2PLoadingAfter(queue, segment.localId);
// if (this.requests.p2pRequestsCount < simultaneousHttpDownloads) {
// void this.loadThroughHttp(segment);
// continue;
// }
this.abortLastHttpLoadingAfter(queue, segment.localId);
if (this.requests.httpRequestsCount < simultaneousHttpDownloads) {
void this.loadThroughHttp(segment);
continue;
}

if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) {
void this.loadThroughP2P(segment);
}

this.abortLastP2PLoadingAfter(queue, segment.localId);
if (this.requests.p2pRequestsCount < simultaneousHttpDownloads) {
void this.loadThroughHttp(segment);
continue;
}
}
if (statuses.isP2PDownloadable) {
if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) {
void this.loadThroughP2P(segment);
}
}
// if (statuses.isP2PDownloadable) {
// if (this.requests.p2pRequestsCount < simultaneousP2PDownloads) {
// void this.loadThroughP2P(segment);
// }
// }
break;
}
}
Expand Down Expand Up @@ -180,6 +175,25 @@ export class HybridLoader {
if (data) this.onSegmentLoaded(segment, data);
}

private loadRandomThroughHttp() {
const { simultaneousHttpDownloads } = this.settings;
if (this.requests.httpRequestsCount >= simultaneousHttpDownloads) return;
const { queue } = QueueUtils.generateQueue({
lastRequestedSegment: this.lastRequestedSegment,
playback: this.playback,
settings: this.settings,
skipSegment: (segment, statuses) =>
!statuses.isHttpDownloadable ||
this.segmentStorage.hasSegment(segment) ||
this.requests.isHybridLoaderRequested(segment),
});
if (!queue.length) return;

const { segment } = queue[Math.floor(Math.random() * queue.length)];
// console.log("load random: ", getSegmentStringId(segment));
void this.loadThroughHttp(segment);
}

private onSegmentLoaded(segment: Segment, data: ArrayBuffer) {
this.bandwidthApproximator.addBytes(data.byteLength);
void this.segmentStorage.storeSegment(segment, data);
Expand All @@ -191,24 +205,20 @@ export class HybridLoader {
}

private abortLastHttpLoadingAfter(queue: QueueItem[], segmentId: string) {
for (const {
segment: { localId: queueSegmentId },
} of arrayBackwards(queue)) {
if (queueSegmentId === segmentId) break;
if (this.requests.isHttpRequested(queueSegmentId)) {
this.requests.abortLoaderRequest(queueSegmentId);
for (const { segment } of arrayBackwards(queue)) {
if (segment.localId === segmentId) break;
if (this.requests.isHttpRequested(segment)) {
this.requests.abortLoaderRequest(segment);
break;
}
}
}

private abortLastP2PLoadingAfter(queue: QueueItem[], segmentId: string) {
for (const {
segment: { localId: queueSegmentId },
} of arrayBackwards(queue)) {
if (queueSegmentId === segmentId) break;
if (this.requests.isP2PRequested(queueSegmentId)) {
this.requests.abortLoaderRequest(queueSegmentId);
for (const { segment } of arrayBackwards(queue)) {
if (segment.localId === segmentId) break;
if (this.requests.isP2PRequested(segment)) {
this.requests.abortLoaderRequest(segment);
break;
}
}
Expand All @@ -231,6 +241,7 @@ export class HybridLoader {

destroy() {
clearInterval(this.storageCleanUpIntervalId);
clearInterval(this.randomHttpDownloadInterval);
this.storageCleanUpIntervalId = undefined;
void this.segmentStorage.destroy();
this.requests.destroy();
Expand Down Expand Up @@ -341,3 +352,9 @@ function getSegmentAvgDuration(stream: StreamWithSegments) {

return sumDuration / size;
}

function getSegmentStringId(segment: Segment) {
const { index } = segment.stream;
const { externalId } = segment;
return `${index}-${externalId}`;
}
20 changes: 14 additions & 6 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,19 @@ export class RequestContainer {
return !!this.requests.get(segmentId)?.engineCallbacks;
}

isHttpRequested(segmentId: string): boolean {
return this.requests.get(segmentId)?.loaderRequest?.type === "http";
isHttpRequested(segment: Segment): boolean {
const id = getRequestItemId(segment);
return this.requests.get(id)?.loaderRequest?.type === "http";
}

isP2PRequested(segment: Segment): boolean {
const id = getRequestItemId(segment);
return this.requests.get(id)?.loaderRequest?.type === "p2p";
}

isP2PRequested(segmentId: string): boolean {
return this.requests.get(segmentId)?.loaderRequest?.type === "p2p";
isHybridLoaderRequested(segment: Segment): boolean {
const id = getRequestItemId(segment);
return !!this.requests.get(id)?.loaderRequest;
}

abortEngineRequest(segmentId: string) {
Expand All @@ -166,8 +173,9 @@ export class RequestContainer {
request.loaderRequest?.abort();
}

abortLoaderRequest(segmentId: string) {
this.requests.get(segmentId)?.loaderRequest?.abort();
abortLoaderRequest(segment: Segment) {
const id = getRequestItemId(segment);
this.requests.get(id)?.loaderRequest?.abort();
}

private clearRequestItem(
Expand Down
6 changes: 3 additions & 3 deletions packages/p2p-media-loader-core/src/utils/queue-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ export function generateQueue({
lastRequestedSegment,
playback,
settings,
isSegmentLoaded,
skipSegment,
}: {
lastRequestedSegment: Readonly<Segment>;
playback: Readonly<Playback>;
isSegmentLoaded: (segment: Segment) => boolean;
skipSegment: (segment: Segment, statuses: QueueItemStatuses) => boolean;
settings: Pick<
Settings,
"highDemandTimeWindow" | "httpDownloadTimeWindow" | "p2pDownloadTimeWindow"
Expand All @@ -42,7 +42,7 @@ export function generateQueue({
if (isNotActual && !(i === 0 && isNextSegmentHighDemand)) {
break;
}
if (isSegmentLoaded(segment)) continue;
if (skipSegment(segment, statuses)) continue;

queueSegmentIds.add(segment.localId);
if (isNotActual) statuses.isHighDemand = true;
Expand Down

0 comments on commit 15cfdda

Please sign in to comment.