Skip to content

Commit

Permalink
Add peer events.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Sep 18, 2023
1 parent f61b852 commit 3305c19
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 83 deletions.
2 changes: 2 additions & 0 deletions packages/p2p-media-loader-core/src/enums.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export enum PeerCommandType {
SegmentsAnnouncement,
SegmentRequest,
SegmentData,
SegmentAbsent,
}

export enum PeerSegmentStatus {
Expand Down
36 changes: 10 additions & 26 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ export class HybridLoader {
this.streamManifestUrl = url;
}

private createP2PLoader(stream: StreamWithSegments) {
if (!this.streamManifestUrl) return;
this.p2pLoader = new P2PLoader(
this.streamManifestUrl,
stream,
this.segmentStorage
);
}

async loadSegment(
segment: Readonly<Segment>,
stream: Readonly<StreamWithSegments>
Expand All @@ -54,14 +63,7 @@ export class HybridLoader {
}
if (stream !== this.activeStream) {
this.activeStream = stream;
if (this.streamManifestUrl) {
const streamExternalId = Utils.getStreamExternalId(
stream,
this.streamManifestUrl
);
this.p2pLoader = new P2PLoader(streamExternalId);
void this.updateSegmentsLoadingState();
}
this.createP2PLoader(stream);
}
this.lastRequestedSegment = segment;
void this.processQueue();
Expand Down Expand Up @@ -154,7 +156,6 @@ export class HybridLoader {
if (!data) return;
this.bandwidthApproximator.addBytes(data.byteLength);
void this.segmentStorage.storeSegment(segment, data);
void this.updateSegmentsLoadingState();
const request = this.pluginRequests.get(segment.localId);
if (request) {
request.onSuccess({
Expand Down Expand Up @@ -207,23 +208,6 @@ export class HybridLoader {
return request;
}

private async updateSegmentsLoadingState() {
if (!this.streamManifestUrl || !this.activeStream || !this.p2pLoader) {
return;
}
const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds();
const loaded: Segment[] = [];

for (const id of storedSegmentIds) {
const segment = this.activeStream.segments.get(id);
if (!segment) continue;

loaded.push(segment);
}

void this.p2pLoader.updateSegmentsLoadingState(loaded, []);
}

destroy() {
clearInterval(this.storageCleanUpIntervalId);
this.storageCleanUpIntervalId = undefined;
Expand Down
22 changes: 15 additions & 7 deletions packages/p2p-media-loader-core/src/internal-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,29 @@ export type BasePeerCommand<T extends PeerCommandType = PeerCommandType> = {
c: T;
};

export type PeerSegmentRequestCommand =
BasePeerCommand<PeerCommandType.SegmentRequest> & {
i: string;
};

// {[streamId]: [segmentIds[]; segmentStatuses[]]}
export type JsonSegmentAnnouncementMap = {
[key: string]: [number[], number[]];
};

export type PeerSegmentCommand = BasePeerCommand<
PeerCommandType.SegmentRequest | PeerCommandType.SegmentAbsent
> & {
i: string;
};

export type PeerSegmentAnnouncementCommand =
BasePeerCommand<PeerCommandType.SegmentsAnnouncement> & {
m: JsonSegmentAnnouncementMap;
};

export type PeerSendSegmentCommand =
BasePeerCommand<PeerCommandType.SegmentData> & {
i: string;
s: number;
};

export type PeerCommand =
| PeerSegmentRequestCommand
| PeerSegmentAnnouncementCommand;
| PeerSegmentCommand
| PeerSegmentAnnouncementCommand
| PeerSendSegmentCommand;
133 changes: 88 additions & 45 deletions packages/p2p-media-loader-core/src/p2p-loader.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,129 @@
import TrackerClient, { TrackerEventHandler } from "bittorrent-tracker";
import TrackerClient, { PeerCandidate } from "bittorrent-tracker";
import * as RIPEMD160 from "ripemd160";
import { Peer } from "./peer";
import * as PeerUtil from "./peer-utils";
import { Segment } from "./types";
import { Segment, StreamWithSegments } from "./types";
import { JsonSegmentAnnouncementMap } from "./internal-types";
import { SegmentsMemoryStorage } from "./segments-storage";
import * as Utils from "./utils";

export class P2PLoader {
private readonly streamExternalId: string;
private readonly streamHash: string;
private readonly peerHash: string;
private trackerClient: TrackerClient;
private readonly trackerClient: TrackerClient;
private readonly peers = new Map<string, Peer>();
private announcementMap: JsonSegmentAnnouncementMap = {};

constructor(streamExternalId: string) {
this.streamExternalId = streamExternalId;
constructor(
private streamManifestUrl: string,
private readonly stream: StreamWithSegments,
private readonly segmentStorage: SegmentsMemoryStorage
) {
const peerId = PeerUtil.generatePeerId();
this.streamExternalId = Utils.getStreamExternalId(
this.stream,
this.streamManifestUrl
);
this.streamHash = getHash(this.streamExternalId);
this.peerHash = getHash(peerId);

this.trackerClient = new TrackerClient({
infoHash: this.streamHash,
peerId: this.peerHash,
port: 6881,
announce: [
"wss://tracker.novage.com.ua",
"wss://tracker.openwebtorrent.com",
],
rtcConfig: {
iceServers: [
{
urls: [
"stun:stun.l.google.com:19302",
"stun:global.stun.twilio.com:3478",
],
},
],
},
this.trackerClient = createTrackerClient({
streamHash: this.streamHash,
peerHash: this.peerHash,
});
this.subscribeOnTrackerEvents(this.trackerClient);
this.segmentStorage.subscribeOnUpdate(
this.onSegmentStorageUpdate.bind(this)
);
this.trackerClient.start();
}

this.trackerClient.on("update", this.onTrackerUpdate);
this.trackerClient.on("peer", this.onTrackerPeerConnect);
this.trackerClient.on("warning", this.onTrackerWarning);
this.trackerClient.on("error", this.onTrackerError);
private subscribeOnTrackerEvents(trackerClient: TrackerClient) {
// TODO: tracker event handlers
trackerClient.on("update", () => {});
trackerClient.on("peer", (candidate) => {
const peer = this.peers.get(candidate.id);
if (peer) peer.addCandidate(candidate);
else this.createPeer(candidate);
});
trackerClient.on("warning", (warning) => {});
trackerClient.on("error", (error) => {});
}

this.trackerClient.start();
private createPeer(candidate: PeerCandidate) {
const peer = new Peer(candidate, {
onPeerConnected: this.onPeerConnected.bind(this),
onSegmentRequested: this.onSegmentRequested.bind(this),
});
this.peers.set(candidate.id, peer);
}

private onTrackerUpdate: TrackerEventHandler<"update"> = (data) => {};
private onTrackerPeerConnect: TrackerEventHandler<"peer"> = (candidate) => {
const peer = this.peers.get(candidate.id);
if (peer) {
peer.addCandidate(candidate);
} else {
const peer = new Peer(this.streamExternalId, candidate);
this.peers.set(candidate.id, peer);
private async onSegmentStorageUpdate() {
const storedSegmentIds = await this.segmentStorage.getStoredSegmentIds();
const loaded: Segment[] = [];

for (const id of storedSegmentIds) {
const segment = this.stream.segments.get(id);
if (!segment) continue;

loaded.push(segment);
}
};
private onTrackerWarning: TrackerEventHandler<"warning"> = (warning) => {};
private onTrackerError: TrackerEventHandler<"error"> = (error) => {};

updateSegmentsLoadingState(loaded: Segment[], loading: Segment[]) {
this.announcementMap = PeerUtil.getJsonSegmentsAnnouncementMap(
this.streamExternalId,
loaded,
loading
[]
);
this.broadcastSegmentAnnouncement();
}

sendSegmentsAnnouncementToPeer(peer: Peer) {
if (!peer?.isConnected) return;
private onPeerConnected(peer: Peer) {
peer.sendSegmentsAnnouncement(this.announcementMap);
}

broadcastSegmentAnnouncement() {
private async onSegmentRequested(peer: Peer, segmentExternalId: string) {
const segmentData = await this.segmentStorage.getSegment(segmentExternalId);
if (segmentData) peer.sendSegmentData(segmentExternalId, segmentData);
else peer.sendSegmentAbsent(segmentExternalId);
}

private broadcastSegmentAnnouncement() {
for (const peer of this.peers.values()) {
this.sendSegmentsAnnouncementToPeer(peer);
if (!peer.isConnected) continue;
peer.sendSegmentsAnnouncement(this.announcementMap);
}
}
}

function getHash(data: string) {
return new RIPEMD160().update(data).digest("hex");
}

function createTrackerClient({
streamHash,
peerHash,
}: {
streamHash: string;
peerHash: string;
}) {
return new TrackerClient({
infoHash: streamHash,
peerId: peerHash,
port: 6881,
announce: [
"wss://tracker.novage.com.ua",
"wss://tracker.openwebtorrent.com",
],
rtcConfig: {
iceServers: [
{
urls: [
"stun:stun.l.google.com:19302",
"stun:global.stun.twilio.com:3478",
],
},
],
},
});
}
53 changes: 48 additions & 5 deletions packages/p2p-media-loader-core/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@ import { PeerCandidate } from "bittorrent-tracker";
import {
JsonSegmentAnnouncementMap,
PeerCommand,
PeerSegmentRequestCommand,
PeerSegmentAnnouncementCommand,
PeerSegmentCommand,
PeerSendSegmentCommand,
} from "./internal-types";
import { PeerCommandType, PeerSegmentStatus } from "./enums";
import * as PeerUtil from "./peer-utils";

const webRtcMaxMessageSize: number = 64 * 1024 - 1;

type PeerEventHandlers = {
onPeerConnected: (peer: Peer) => void;
onSegmentRequested: (peer: Peer, segmentId: string) => void;
};

export class Peer {
readonly id: string;
private readonly streamExternalId: string;
private readonly candidates = new Set<PeerCandidate>();
private connection?: PeerCandidate;
private readonly eventHandlers: PeerEventHandlers;
private segments = new Map<string, PeerSegmentStatus>();

constructor(streamExternalId: string, candidate: PeerCandidate) {
this.streamExternalId = streamExternalId;
constructor(candidate: PeerCandidate, eventHandlers: PeerEventHandlers) {
this.id = candidate.id;
this.eventHandlers = eventHandlers;
this.addCandidate(candidate);
}

Expand All @@ -34,6 +42,7 @@ export class Peer {

private onCandidateConnect(candidate: PeerCandidate) {
this.connection = candidate;
this.eventHandlers.onPeerConnected(this);
}

private onCandidateClose(candidate: PeerCandidate) {
Expand All @@ -56,6 +65,7 @@ export class Peer {
break;

case PeerCommandType.SegmentRequest:
this.eventHandlers.onSegmentRequested(this, command.i);
break;
}
}
Expand All @@ -66,7 +76,7 @@ export class Peer {
}

requestSegment(segmentExternalId: string) {
const command: PeerSegmentRequestCommand = {
const command: PeerSegmentCommand = {
c: PeerCommandType.SegmentRequest,
i: segmentExternalId,
};
Expand All @@ -80,4 +90,37 @@ export class Peer {
};
this.sendCommand(command);
}

sendSegmentData(segmentExternalId: string, data: ArrayBuffer) {
if (!this.connection) return;
const command: PeerSendSegmentCommand = {
c: PeerCommandType.SegmentData,
i: segmentExternalId,
s: data.byteLength,
};

this.sendCommand(command);

let bytesLeft = data.byteLength;
while (bytesLeft > 0) {
const bytesToSend =
bytesLeft >= webRtcMaxMessageSize ? webRtcMaxMessageSize : bytesLeft;
const buffer = Buffer.from(
data,
data.byteLength - bytesLeft,
bytesToSend
);

this.connection.send(buffer);
bytesLeft -= bytesToSend;
}
}

sendSegmentAbsent(segmentExternalId: string) {
const command: PeerSegmentCommand = {
c: PeerCommandType.SegmentAbsent,
i: segmentExternalId,
};
this.sendCommand(command);
}
}
Loading

0 comments on commit 3305c19

Please sign in to comment.