diff --git a/packages/p2p-media-loader-core/src/enums.ts b/packages/p2p-media-loader-core/src/enums.ts index d260000e..62114ad3 100644 --- a/packages/p2p-media-loader-core/src/enums.ts +++ b/packages/p2p-media-loader-core/src/enums.ts @@ -1,6 +1,8 @@ export enum PeerCommandType { SegmentsAnnouncement, SegmentRequest, + SegmentData, + SegmentAbsent, } export enum PeerSegmentStatus { diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index 7233eae6..c9ceca27 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -45,6 +45,15 @@ export class HybridLoader { this.streamManifestUrl = url; } + private createP2PLoader(stream: StreamWithSegments) { + if (!this.streamManifestUrl) return; + this.p2pLoader = new P2PLoader( + this.streamManifestUrl, + stream, + this.segmentStorage + ); + } + async loadSegment( segment: Readonly, stream: Readonly @@ -54,14 +63,7 @@ export class HybridLoader { } if (stream !== this.activeStream) { this.activeStream = stream; - if (this.streamManifestUrl) { - const streamExternalId = Utils.getStreamExternalId( - stream, - this.streamManifestUrl - ); - this.p2pLoader = new P2PLoader(streamExternalId); - void this.updateSegmentsLoadingState(); - } + this.createP2PLoader(stream); } this.lastRequestedSegment = segment; void this.processQueue(); @@ -154,7 +156,6 @@ export class HybridLoader { if (!data) return; this.bandwidthApproximator.addBytes(data.byteLength); void this.segmentStorage.storeSegment(segment, data); - void this.updateSegmentsLoadingState(); const request = this.pluginRequests.get(segment.localId); if (request) { request.onSuccess({ @@ -207,23 +208,6 @@ export class HybridLoader { return request; } - private async updateSegmentsLoadingState() { - if (!this.streamManifestUrl || !this.activeStream || !this.p2pLoader) { - return; - } - const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds(); - const loaded: Segment[] = []; - - for (const id of storedSegmentIds) { - const segment = this.activeStream.segments.get(id); - if (!segment) continue; - - loaded.push(segment); - } - - void this.p2pLoader.updateSegmentsLoadingState(loaded, []); - } - destroy() { clearInterval(this.storageCleanUpIntervalId); this.storageCleanUpIntervalId = undefined; diff --git a/packages/p2p-media-loader-core/src/internal-types.ts b/packages/p2p-media-loader-core/src/internal-types.ts index 752396cf..e55126f9 100644 --- a/packages/p2p-media-loader-core/src/internal-types.ts +++ b/packages/p2p-media-loader-core/src/internal-types.ts @@ -28,21 +28,29 @@ export type BasePeerCommand = { c: T; }; -export type PeerSegmentRequestCommand = - BasePeerCommand & { - i: string; - }; - // {[streamId]: [segmentIds[]; segmentStatuses[]]} export type JsonSegmentAnnouncementMap = { [key: string]: [number[], number[]]; }; +export type PeerSegmentCommand = BasePeerCommand< + PeerCommandType.SegmentRequest | PeerCommandType.SegmentAbsent +> & { + i: string; +}; + export type PeerSegmentAnnouncementCommand = BasePeerCommand & { m: JsonSegmentAnnouncementMap; }; +export type PeerSendSegmentCommand = + BasePeerCommand & { + i: string; + s: number; + }; + export type PeerCommand = - | PeerSegmentRequestCommand - | PeerSegmentAnnouncementCommand; + | PeerSegmentCommand + | PeerSegmentAnnouncementCommand + | PeerSendSegmentCommand; diff --git a/packages/p2p-media-loader-core/src/p2p-loader.ts b/packages/p2p-media-loader-core/src/p2p-loader.ts index 0771ba1a..007a6fb6 100644 --- a/packages/p2p-media-loader-core/src/p2p-loader.ts +++ b/packages/p2p-media-loader-core/src/p2p-loader.ts @@ -1,82 +1,97 @@ -import TrackerClient, { TrackerEventHandler } from "bittorrent-tracker"; +import TrackerClient, { PeerCandidate } from "bittorrent-tracker"; import * as RIPEMD160 from "ripemd160"; import { Peer } from "./peer"; import * as PeerUtil from "./peer-utils"; -import { Segment } from "./types"; +import { Segment, StreamWithSegments } from "./types"; import { JsonSegmentAnnouncementMap } from "./internal-types"; +import { SegmentsMemoryStorage } from "./segments-storage"; +import * as Utils from "./utils"; export class P2PLoader { private readonly streamExternalId: string; private readonly streamHash: string; private readonly peerHash: string; - private trackerClient: TrackerClient; + private readonly trackerClient: TrackerClient; private readonly peers = new Map(); private announcementMap: JsonSegmentAnnouncementMap = {}; - constructor(streamExternalId: string) { - this.streamExternalId = streamExternalId; + constructor( + private streamManifestUrl: string, + private readonly stream: StreamWithSegments, + private readonly segmentStorage: SegmentsMemoryStorage + ) { const peerId = PeerUtil.generatePeerId(); + this.streamExternalId = Utils.getStreamExternalId( + this.stream, + this.streamManifestUrl + ); this.streamHash = getHash(this.streamExternalId); this.peerHash = getHash(peerId); - this.trackerClient = new TrackerClient({ - infoHash: this.streamHash, - peerId: this.peerHash, - port: 6881, - announce: [ - "wss://tracker.novage.com.ua", - "wss://tracker.openwebtorrent.com", - ], - rtcConfig: { - iceServers: [ - { - urls: [ - "stun:stun.l.google.com:19302", - "stun:global.stun.twilio.com:3478", - ], - }, - ], - }, + this.trackerClient = createTrackerClient({ + streamHash: this.streamHash, + peerHash: this.peerHash, }); + this.subscribeOnTrackerEvents(this.trackerClient); + this.segmentStorage.subscribeOnUpdate( + this.onSegmentStorageUpdate.bind(this) + ); + this.trackerClient.start(); + } - this.trackerClient.on("update", this.onTrackerUpdate); - this.trackerClient.on("peer", this.onTrackerPeerConnect); - this.trackerClient.on("warning", this.onTrackerWarning); - this.trackerClient.on("error", this.onTrackerError); + private subscribeOnTrackerEvents(trackerClient: TrackerClient) { + // TODO: tracker event handlers + trackerClient.on("update", () => {}); + trackerClient.on("peer", (candidate) => { + const peer = this.peers.get(candidate.id); + if (peer) peer.addCandidate(candidate); + else this.createPeer(candidate); + }); + trackerClient.on("warning", (warning) => {}); + trackerClient.on("error", (error) => {}); + } - this.trackerClient.start(); + private createPeer(candidate: PeerCandidate) { + const peer = new Peer(candidate, { + onPeerConnected: this.onPeerConnected.bind(this), + onSegmentRequested: this.onSegmentRequested.bind(this), + }); + this.peers.set(candidate.id, peer); } - private onTrackerUpdate: TrackerEventHandler<"update"> = (data) => {}; - private onTrackerPeerConnect: TrackerEventHandler<"peer"> = (candidate) => { - const peer = this.peers.get(candidate.id); - if (peer) { - peer.addCandidate(candidate); - } else { - const peer = new Peer(this.streamExternalId, candidate); - this.peers.set(candidate.id, peer); + private async onSegmentStorageUpdate() { + const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds(); + const loaded: Segment[] = []; + + for (const id of storedSegmentIds) { + const segment = this.stream.segments.get(id); + if (!segment) continue; + + loaded.push(segment); } - }; - private onTrackerWarning: TrackerEventHandler<"warning"> = (warning) => {}; - private onTrackerError: TrackerEventHandler<"error"> = (error) => {}; - updateSegmentsLoadingState(loaded: Segment[], loading: Segment[]) { this.announcementMap = PeerUtil.getJsonSegmentsAnnouncementMap( this.streamExternalId, loaded, - loading + [] ); this.broadcastSegmentAnnouncement(); } - sendSegmentsAnnouncementToPeer(peer: Peer) { - if (!peer?.isConnected) return; + private onPeerConnected(peer: Peer) { peer.sendSegmentsAnnouncement(this.announcementMap); } - broadcastSegmentAnnouncement() { + private async onSegmentRequested(peer: Peer, segmentExternalId: string) { + const segmentData = await this.segmentStorage.getSegment(segmentExternalId); + if (segmentData) peer.sendSegmentData(segmentExternalId, segmentData); + else peer.sendSegmentAbsent(segmentExternalId); + } + + private broadcastSegmentAnnouncement() { for (const peer of this.peers.values()) { - this.sendSegmentsAnnouncementToPeer(peer); + if (!peer.isConnected) continue; + peer.sendSegmentsAnnouncement(this.announcementMap); } } } @@ -84,3 +99,31 @@ export class P2PLoader { function getHash(data: string) { return new RIPEMD160().update(data).digest("hex"); } + +function createTrackerClient({ + streamHash, + peerHash, +}: { + streamHash: string; + peerHash: string; +}) { + return new TrackerClient({ + infoHash: streamHash, + peerId: peerHash, + port: 6881, + announce: [ + "wss://tracker.novage.com.ua", + "wss://tracker.openwebtorrent.com", + ], + rtcConfig: { + iceServers: [ + { + urls: [ + "stun:stun.l.google.com:19302", + "stun:global.stun.twilio.com:3478", + ], + }, + ], + }, + }); +} diff --git a/packages/p2p-media-loader-core/src/peer.ts b/packages/p2p-media-loader-core/src/peer.ts index a3090216..09e869f7 100644 --- a/packages/p2p-media-loader-core/src/peer.ts +++ b/packages/p2p-media-loader-core/src/peer.ts @@ -2,22 +2,30 @@ import { PeerCandidate } from "bittorrent-tracker"; import { JsonSegmentAnnouncementMap, PeerCommand, - PeerSegmentRequestCommand, PeerSegmentAnnouncementCommand, + PeerSegmentCommand, + PeerSendSegmentCommand, } from "./internal-types"; import { PeerCommandType, PeerSegmentStatus } from "./enums"; import * as PeerUtil from "./peer-utils"; +const webRtcMaxMessageSize: number = 64 * 1024 - 1; + +type PeerEventHandlers = { + onPeerConnected: (peer: Peer) => void; + onSegmentRequested: (peer: Peer, segmentId: string) => void; +}; + export class Peer { readonly id: string; - private readonly streamExternalId: string; private readonly candidates = new Set(); private connection?: PeerCandidate; + private readonly eventHandlers: PeerEventHandlers; private segments = new Map(); - constructor(streamExternalId: string, candidate: PeerCandidate) { - this.streamExternalId = streamExternalId; + constructor(candidate: PeerCandidate, eventHandlers: PeerEventHandlers) { this.id = candidate.id; + this.eventHandlers = eventHandlers; this.addCandidate(candidate); } @@ -34,6 +42,7 @@ export class Peer { private onCandidateConnect(candidate: PeerCandidate) { this.connection = candidate; + this.eventHandlers.onPeerConnected(this); } private onCandidateClose(candidate: PeerCandidate) { @@ -56,6 +65,7 @@ export class Peer { break; case PeerCommandType.SegmentRequest: + this.eventHandlers.onSegmentRequested(this, command.i); break; } } @@ -66,7 +76,7 @@ export class Peer { } requestSegment(segmentExternalId: string) { - const command: PeerSegmentRequestCommand = { + const command: PeerSegmentCommand = { c: PeerCommandType.SegmentRequest, i: segmentExternalId, }; @@ -80,4 +90,37 @@ export class Peer { }; this.sendCommand(command); } + + sendSegmentData(segmentExternalId: string, data: ArrayBuffer) { + if (!this.connection) return; + const command: PeerSendSegmentCommand = { + c: PeerCommandType.SegmentData, + i: segmentExternalId, + s: data.byteLength, + }; + + this.sendCommand(command); + + let bytesLeft = data.byteLength; + while (bytesLeft > 0) { + const bytesToSend = + bytesLeft >= webRtcMaxMessageSize ? webRtcMaxMessageSize : bytesLeft; + const buffer = Buffer.from( + data, + data.byteLength - bytesLeft, + bytesToSend + ); + + this.connection.send(buffer); + bytesLeft -= bytesToSend; + } + } + + sendSegmentAbsent(segmentExternalId: string) { + const command: PeerSegmentCommand = { + c: PeerCommandType.SegmentAbsent, + i: segmentExternalId, + }; + this.sendCommand(command); + } } diff --git a/packages/p2p-media-loader-core/src/segments-storage.ts b/packages/p2p-media-loader-core/src/segments-storage.ts index 1c2833a8..3ec10acc 100644 --- a/packages/p2p-media-loader-core/src/segments-storage.ts +++ b/packages/p2p-media-loader-core/src/segments-storage.ts @@ -6,6 +6,7 @@ export class SegmentsMemoryStorage { { segment: Segment; data: ArrayBuffer; lastAccessed: number } >(); private isSegmentLockedPredicate?: (segment: Segment) => boolean; + private onUpdateSubscriptions: (() => void)[] = []; constructor( private settings: { @@ -18,12 +19,17 @@ export class SegmentsMemoryStorage { this.isSegmentLockedPredicate = predicate; } + subscribeOnUpdate(callback: () => void) { + this.onUpdateSubscriptions.push(callback); + } + async storeSegment(segment: Segment, data: ArrayBuffer) { this.cache.set(segment.localId, { segment, data, lastAccessed: performance.now(), }); + this.onUpdateSubscriptions.forEach((c) => c()); } async getSegment(segmentId: string): Promise { @@ -78,10 +84,14 @@ export class SegmentsMemoryStorage { } segmentsToDelete.forEach((id) => this.cache.delete(id)); + if (segmentsToDelete.length) { + this.onUpdateSubscriptions.forEach((c) => c()); + } return segmentsToDelete.length > 0; } public async destroy() { this.cache.clear(); + this.onUpdateSubscriptions = []; } }