From da11b3d29a9d62b7f38d27987709454ced435651 Mon Sep 17 00:00:00 2001 From: Dirk Heinke Date: Sat, 6 Mar 2021 16:54:42 +0100 Subject: [PATCH] Add timeout as option to dataconnection --- index.d.ts | 1 + lib/dataconnection.ts | 46 ++++++++++++++++++++++++++++++++++++++++++- lib/enums.ts | 4 ++++ lib/negotiator.ts | 3 ++- lib/peer.ts | 3 ++- 5 files changed, 54 insertions(+), 3 deletions(-) diff --git a/index.d.ts b/index.d.ts index 3f81dc3b1..88305b574 100644 --- a/index.d.ts +++ b/index.d.ts @@ -145,6 +145,7 @@ declare namespace Peer { metadata?: any; serialization?: string; reliable?: boolean; + heartbeatInterval?: number; } interface CallOption { diff --git a/lib/dataconnection.ts b/lib/dataconnection.ts index aec3c054b..f2565844d 100644 --- a/lib/dataconnection.ts +++ b/lib/dataconnection.ts @@ -5,7 +5,8 @@ import { ConnectionType, ConnectionEventType, SerializationType, - ServerMessageType + ServerMessageType, + SocketSpecialMessagePrefix } from "./enums"; import { Peer } from "./peer"; import { BaseConnection } from "./baseconnection"; @@ -24,6 +25,7 @@ export class DataConnection extends BaseConnection implements IDataConnection { readonly label: string; readonly serialization: SerializationType; readonly reliable: boolean; + readonly heartbeatInterval: number; stringify: (data: any) => string = JSON.stringify; parse: (data: string) => any = JSON.parse; @@ -44,6 +46,8 @@ export class DataConnection extends BaseConnection implements IDataConnection { private _dc: RTCDataChannel; private _encodingQueue = new EncodingQueue(); + private _heartbeatSendTimer?: number; + private _heartbeatReceiveTimer?: number; get dataChannel(): RTCDataChannel { return this._dc; @@ -60,6 +64,7 @@ export class DataConnection extends BaseConnection implements IDataConnection { this.label = this.options.label || this.connectionId; this.serialization = this.options.serialization || SerializationType.Binary; this.reliable = !!this.options.reliable; + this.heartbeatInterval = this.options.heartbeatInterval || 0; this._encodingQueue.on('done', (ab: ArrayBuffer) => { this._bufferedSend(ab); @@ -94,6 +99,9 @@ export class DataConnection extends BaseConnection implements IDataConnection { logger.log(`DC#${this.connectionId} dc connection success`); this._open = true; this.emit(ConnectionEventType.Open); + if (this.heartbeatInterval > 0) { + this._enabledHeartbeat() + } }; this.dataChannel.onmessage = (e) => { @@ -135,6 +143,11 @@ export class DataConnection extends BaseConnection implements IDataConnection { deserializedData = this.parse(data as string); } + if (this.heartbeatInterval > 0 && deserializedData === `${SocketSpecialMessagePrefix.Heartbeat}${this.connectionId}`) { + this._restartHeartbeatReceiveTimeout() + return; + } + // Check if we've chunked--if so, piece things back together. // We're guaranteed that this isn't 0. if (deserializedData.__peerData) { @@ -201,6 +214,8 @@ export class DataConnection extends BaseConnection implements IDataConnection { this._encodingQueue = null; } + this._disableHeartbeat(); + if (!this.open) { return; } @@ -312,6 +327,35 @@ export class DataConnection extends BaseConnection implements IDataConnection { } } + private _enabledHeartbeat() { + logger.log(`DC#${this.connectionId} Heartbeat Timer enabled`); + this._heartbeatSendTimer = window.setInterval(() => { + this.send(`${SocketSpecialMessagePrefix.Heartbeat}${this.connectionId}`) + }, this.heartbeatInterval) + + this._restartHeartbeatReceiveTimeout(); + } + + private _restartHeartbeatReceiveTimeout() { + if (this._heartbeatReceiveTimer != null) { + window.clearTimeout(this._heartbeatReceiveTimer); + } + this._heartbeatReceiveTimer = window.setTimeout(() => { + logger.log(`DC#${this.connectionId} Disconnected due to heartbeat timeout`); + this.close(); + }, this.heartbeatInterval * 2) + } + + private _disableHeartbeat() { + if (this._heartbeatReceiveTimer != null) { + window.clearTimeout(this._heartbeatReceiveTimer); + } + + if (this._heartbeatSendTimer != null) { + window.clearTimeout(this._heartbeatSendTimer); + } + } + handleMessage(message: ServerMessage): void { const payload = message.payload; diff --git a/lib/enums.ts b/lib/enums.ts index 4b86e2201..0dffca99f 100644 --- a/lib/enums.ts +++ b/lib/enums.ts @@ -61,4 +61,8 @@ export enum ServerMessageType { Leave = "LEAVE", // Another peer has closed its connection to this peer. Expire = "EXPIRE" // The offer sent to a peer has expired without response. +} + +export enum SocketSpecialMessagePrefix { + Heartbeat = "HEARTBEAT_", } \ No newline at end of file diff --git a/lib/negotiator.ts b/lib/negotiator.ts index 26e7044e1..3bad62000 100644 --- a/lib/negotiator.ts +++ b/lib/negotiator.ts @@ -217,7 +217,8 @@ export class Negotiator { ...payload, label: dataConnection.label, reliable: dataConnection.reliable, - serialization: dataConnection.serialization + serialization: dataConnection.serialization, + heartbeatInterval: dataConnection.heartbeatInterval, }; } diff --git a/lib/peer.ts b/lib/peer.ts index 8f11659db..1cb4d95e9 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -266,7 +266,8 @@ export class Peer extends EventEmitter { metadata: payload.metadata, label: payload.label, serialization: payload.serialization, - reliable: payload.reliable + reliable: payload.reliable, + heartbeatInterval: payload.heartbeatInterval, }); this._addConnection(peerId, connection); this.emit(PeerEventType.Connection, connection);