Skip to content

Commit

Permalink
Feat: add new config param isP2PUploadDisabled (#440)
Browse files Browse the repository at this point in the history
* feat: add new config param isP2PUploadDisabled

* Update types.ts

* refactor: broadcastAnnouncement

* fix: destroy loader only when isP2PDisabled is true

* fix: sendEmptyAnnouncement

---------

Co-authored-by: Andriy Lysnevych <[email protected]>
  • Loading branch information
DimaDemchenko and mrlika authored Nov 20, 2024
1 parent 2e1e703 commit 205191d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 15 deletions.
70 changes: 63 additions & 7 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
CommonCoreConfig,
StreamConfig,
DefinedCoreConfig,
StreamType,
DynamicStreamConfig,
} from "./types.js";
import { BandwidthCalculators, StreamDetails } from "./internal-types.js";
import * as StreamUtils from "./utils/stream.js";
Expand All @@ -36,6 +38,7 @@ export class Core<TStream extends Stream = Stream> {

/** Default configuration for stream settings. */
static readonly DEFAULT_STREAM_CONFIG: StreamConfig = {
isP2PUploadDisabled: false,
isP2PDisabled: false,
simultaneousHttpDownloads: 2,
simultaneousP2PDownloads: 3,
Expand Down Expand Up @@ -143,8 +146,8 @@ export class Core<TStream extends Stream = Stream> {
* @example
* // Example of dynamically updating the download time windows and timeout settings.
* const dynamicConfig = {
* httpDownloadTimeWindowMs: 60, // Set HTTP download time window to 60 seconds
* p2pDownloadTimeWindowMs: 60, // Set P2P download time window to 60 seconds
* httpDownloadTimeWindow: 60, // Set HTTP download time window to 60 seconds
* p2pDownloadTimeWindow: 60, // Set P2P download time window to 60 seconds
* httpNotReceivingBytesTimeoutMs: 1500, // Set HTTP timeout to 1500 milliseconds
* p2pNotReceivingBytesTimeoutMs: 1500 // Set P2P timeout to 1500 milliseconds
* };
Expand All @@ -153,17 +156,70 @@ export class Core<TStream extends Stream = Stream> {
applyDynamicConfig(dynamicConfig: DynamicCoreConfig) {
const { mainStream, secondaryStream } = dynamicConfig;

const mainStreamConfigCopy = deepCopy(this.mainStreamConfig);
const secondaryStreamConfigCopy = deepCopy(this.secondaryStreamConfig);

this.overrideAllConfigs(dynamicConfig, mainStream, secondaryStream);

if (this.mainStreamConfig.isP2PDisabled) {
this.destroyStreamLoader("main");
this.processSpecificDynamicConfigParams(
mainStreamConfigCopy,
dynamicConfig,
"main",
);
this.processSpecificDynamicConfigParams(
secondaryStreamConfigCopy,
dynamicConfig,
"secondary",
);
}

private processSpecificDynamicConfigParams(
prevConfig: StreamConfig,
updatedConfig: DynamicCoreConfig,
streamType: StreamType,
) {
const isP2PDisabled = this.getUpdatedStreamProperty(
"isP2PDisabled",
updatedConfig,
streamType,
);

if (isP2PDisabled && prevConfig.isP2PDisabled !== isP2PDisabled) {
this.destroyStreamLoader(streamType);
}

if (this.secondaryStreamConfig.isP2PDisabled) {
this.destroyStreamLoader("secondary");
const isP2PUploadDisabled = this.getUpdatedStreamProperty(
"isP2PUploadDisabled",
updatedConfig,
streamType,
);

if (
isP2PUploadDisabled !== undefined &&
prevConfig.isP2PUploadDisabled !== isP2PUploadDisabled
) {
const streamLoader =
streamType === "main"
? this.mainStreamLoader
: this.secondaryStreamLoader;

streamLoader?.sendBroadcastAnnouncement(isP2PUploadDisabled);
}
}

private getUpdatedStreamProperty<K extends keyof DynamicStreamConfig>(
propertyName: K,
updatedConfig: DynamicCoreConfig,
streamType: StreamType,
): DynamicStreamConfig[K] | undefined {
const updatedStreamConfig =
streamType === "main"
? updatedConfig.mainStream
: updatedConfig.secondaryStream;

return updatedStreamConfig?.[propertyName] ?? updatedConfig[propertyName];
}

/**
* Adds an event listener for the specified event type on the core event target.
*
Expand Down Expand Up @@ -436,7 +492,7 @@ export class Core<TStream extends Stream = Stream> {
}
}

private destroyStreamLoader(streamType: "main" | "secondary") {
private destroyStreamLoader(streamType: StreamType) {
if (streamType === "main") {
this.mainStreamLoader?.destroy();
this.mainStreamLoader = undefined;
Expand Down
6 changes: 6 additions & 0 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,12 @@ export class HybridLoader {
this.levelChangedTimestamp = performance.now();
}

sendBroadcastAnnouncement(sendEmptySegmentsAnnouncement = false) {
this.p2pLoaders.currentLoader.broadcastAnnouncement(
sendEmptySegmentsAnnouncement,
);
}

updatePlayback(position: number, rate: number) {
const isRateChanged = this.playback.rate !== rate;
const isPositionChanged = this.playback.position !== position;
Expand Down
27 changes: 24 additions & 3 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,33 @@ export class P2PLoader {
}

private onPeerConnected = (peer: Peer) => {
if (this.config.isP2PUploadDisabled) return;

const { httpLoading, loaded } = this.getSegmentsAnnouncement();
peer.sendSegmentsAnnouncementCommand(loaded, httpLoading);
};

broadcastAnnouncement = () => {
if (this.isAnnounceMicrotaskCreated) return;
broadcastAnnouncement = (sendEmptyAnnouncement = false) => {
if (sendEmptyAnnouncement) {
this.sendSegmentsAnnouncement([], []);
return;
}

if (this.isAnnounceMicrotaskCreated || this.config.isP2PUploadDisabled) {
return;
}

const { loaded, httpLoading } = this.getSegmentsAnnouncement();
this.sendSegmentsAnnouncement(loaded, httpLoading);
};

private sendSegmentsAnnouncement = (
loaded: number[],
httpLoading: number[],
) => {
this.isAnnounceMicrotaskCreated = true;

queueMicrotask(() => {
const { httpLoading, loaded } = this.getSegmentsAnnouncement();
for (const peer of this.trackerClient.peers()) {
peer.sendSegmentsAnnouncementCommand(loaded, httpLoading);
}
Expand All @@ -142,6 +159,10 @@ export class P2PLoader {
segmentExternalId,
);
if (!segment) return;
if (this.config.isP2PUploadDisabled) {
peer.sendSegmentAbsentCommand(segmentExternalId, requestId);
return;
}

const swarmId = this.config.swarmId ?? this.streamManifestUrl;
const streamSwarmId = StreamUtils.getStreamSwarmId(swarmId, this.stream);
Expand Down
25 changes: 20 additions & 5 deletions packages/p2p-media-loader-core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ export type DynamicStreamProperties =
| "p2pErrorRetries"
| "validateP2PSegment"
| "httpRequestSetup"
| "isP2PDisabled";
| "isP2PDisabled"
| "isP2PUploadDisabled";

/**
* Represents a dynamically modifiable configuration, allowing updates to selected CoreConfig properties at runtime.
Expand All @@ -107,11 +108,16 @@ export type DynamicCoreConfig = Partial<
> &
Partial<CommonCoreConfig> & {
/** Optional dynamic configuration for the main stream. */
mainStream?: Partial<Pick<StreamConfig, DynamicStreamProperties>>;
mainStream?: DynamicStreamConfig;
/** Optional dynamic configuration for the secondary stream. */
secondaryStream?: Partial<Pick<StreamConfig, DynamicStreamProperties>>;
secondaryStream?: DynamicStreamConfig;
};

/** Represents a partial configuration for a stream with dynamic properties. */
export type DynamicStreamConfig = Partial<
Pick<StreamConfig, DynamicStreamProperties>
>;

/** Represents the configuration for the Core functionality that is common to all streams. */
export type CommonCoreConfig = {
/**
Expand Down Expand Up @@ -187,8 +193,17 @@ export type CoreConfig = Partial<StreamConfig> &
/** Configuration options for the Core functionality, including network and processing parameters. */
export type StreamConfig = {
/**
* Indicates whether Peer-to-Peer (P2P) functionality is disabled for the stream.
* If set to true, P2P functionality is disabled for the stream.
* Controls if peer-to-peer upload is disabled for the stream.
* If `true`, the stream only downloads segments without uploading to peers.
*
* @default
* ```typescript
* isP2PUploadDisabled: false
* ```
*/
isP2PUploadDisabled: boolean;
/**
* Controls whether peer-to-peer functionality is disabled for the stream.
*
* @default
* ```typescript
Expand Down

0 comments on commit 205191d

Please sign in to comment.