Skip to content

Commit

Permalink
ws.setIncomingDataListener
Browse files Browse the repository at this point in the history
  • Loading branch information
bangtoven committed Sep 29, 2023
1 parent 9e261b7 commit 922fb3b
Showing 1 changed file with 59 additions and 66 deletions.
125 changes: 59 additions & 66 deletions packages/wallet-sdk/src/connection/WalletLinkConnection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2018-2023 Coinbase, Inc. <https://www.coinbase.com/>
// Licensed under the Apache License, version 2.0
import { merge, Observable, Subject, Subscription, throwError } from 'rxjs';
import { filter, map, take, timeoutWith } from 'rxjs/operators';
import { Observable, throwError } from 'rxjs';
import { filter, take, timeoutWith } from 'rxjs/operators';

import { Session } from '../relay/Session';
import { IntNumber } from '../types';
Expand Down Expand Up @@ -37,13 +37,11 @@ const REQUEST_TIMEOUT = 60000;
*/
export class WalletLinkConnection {
private ws: RxWebSocket;
private subscriptions = new Subscription();
private destroyed = false;
private lastHeartbeatResponse = 0;
private nextReqId = IntNumber(1);
private connected = false;
private linked = false;
private unseenEventsSubject = new Subject<ServerMessageEvent>();

private sessionConfigListener?: (_: SessionConfig) => void;
setSessionConfigListener(listener: (_: SessionConfig) => void): void {
Expand All @@ -60,8 +58,9 @@ export class WalletLinkConnection {
this.connectedListener = listener;
}

private incomingEventListener?: (_: ServerMessageEvent) => void;
setIncomingEventListener(listener: (_: ServerMessageEvent) => void): void {
this.subscribeIncomingEvent(listener);
this.incomingEventListener = listener;
}

/**
Expand Down Expand Up @@ -146,16 +145,16 @@ export class WalletLinkConnection {
this.connectedListener?.(connected);
});

// handle server's heartbeat responses
this.subscriptions.add(
ws.incomingData$.pipe(filter((m) => m === 'h')).subscribe((_) => this.updateLastHeartbeat())
);
ws.setIncomingDataListener((m) => {
switch (m.type) {
// handle server's heartbeat responses
case 'Heartbeat':
this.updateLastHeartbeat();
return;

// handle link status updates
this.subscriptions.add(
ws.incomingJSONData$
.pipe(filter((m) => ['IsLinkedOK', 'Linked'].includes(m.type)))
.subscribe((m) => {
// handle link status updates
case 'IsLinkedOK':
case 'Linked': {
const msg = m as Omit<ServerMessageIsLinkedOK, 'type'> & ServerMessageLinked;
this.diagnostic?.log(EVENTS.LINKED, {
sessionIdHash: Session.hash(sessionId),
Expand All @@ -166,14 +165,12 @@ export class WalletLinkConnection {

this.linked = msg.linked || msg.onlineGuests > 0;
this.linkedListener?.(this.linked);
})
);

// handle session config updates
this.subscriptions.add(
ws.incomingJSONData$
.pipe(filter((m) => ['GetSessionConfigOK', 'SessionConfigUpdated'].includes(m.type)))
.subscribe((m) => {
break;
}

// handle session config updates
case 'GetSessionConfigOK':
case 'SessionConfigUpdated': {
const msg = m as Omit<ServerMessageGetSessionConfigOK, 'type'> &
ServerMessageSessionConfigUpdated;
this.diagnostic?.log(EVENTS.SESSION_CONFIG_RECEIVED, {
Expand All @@ -185,23 +182,15 @@ export class WalletLinkConnection {
webhookUrl: msg.webhookUrl,
metadata: msg.metadata,
});
})
);

// mark unseen events as seen
this.subscriptions.add(
this.unseenEventsSubject.subscribe((e) => {
const credentials = `${this.sessionId}:${this.sessionKey}`;
const auth = `Basic ${btoa(credentials)}`;

fetch(`${this.linkAPIUrl}/events/${e.eventId}/seen`, {
method: 'POST',
headers: {
Authorization: auth,
},
}).catch((error) => console.error('Unabled to mark event as failed:', error));
})
);
break;
}

case 'Event': {
this.handleIncomingEvent(m);
break;
}
}
});
}

/**
Expand All @@ -222,7 +211,6 @@ export class WalletLinkConnection {
* instance of WalletSDKConnection
*/
public destroy(): void {
this.subscriptions.unsubscribe();
this.ws.disconnect();
this.diagnostic?.log(EVENTS.DISCONNECTED, {
sessionIdHash: Session.hash(this.sessionId),
Expand All @@ -234,32 +222,25 @@ export class WalletLinkConnection {
return this.destroyed;
}

/**
* Subscribe to incoming Event messages
*/
private incomingEventSub?: Subscription;
private subscribeIncomingEvent(listener: (_: ServerMessageEvent) => void): void {
this.incomingEventSub?.unsubscribe();
private handleIncomingEvent(m: ServerMessage) {
function isServerMessageEvent(msg: ServerMessage): msg is ServerMessageEvent {
if (m.type !== 'Event') {
return false;
}
const sme = m as ServerMessageEvent;
return (
typeof sme.sessionId === 'string' &&
typeof sme.eventId === 'string' &&
typeof sme.event === 'string' &&
typeof sme.data === 'string'
);
}

this.incomingEventSub = merge(this.ws.incomingJSONData$, this.unseenEventsSubject)
.pipe(
filter((m) => {
if (m.type !== 'Event') {
return false;
}
const sme = m as ServerMessageEvent;
return (
typeof sme.sessionId === 'string' &&
typeof sme.eventId === 'string' &&
typeof sme.event === 'string' &&
typeof sme.data === 'string'
);
}),
map((m) => m as ServerMessageEvent)
)
.subscribe((m) => {
listener(m);
});
if (!isServerMessageEvent(m)) {
return;
}

this.incomingEventListener?.(m);
}

public async checkUnseenEvents() {
Expand Down Expand Up @@ -310,7 +291,19 @@ export class WalletLinkConnection {
event: e.event,
data: e.data,
})) ?? [];
responseEvents.forEach((e) => this.unseenEventsSubject.next(e));
responseEvents.forEach((e) => this.handleIncomingEvent(e));

// mark unseen events as seen
Promise.all(
responseEvents.map((e) =>
fetch(`${this.linkAPIUrl}/events/${e.eventId}/seen`, {
method: 'POST',
headers: {
Authorization: auth,
},
})
)
).catch((error) => console.error('Unabled to mark event as failed:', error));
} else {
throw new Error(`Check unseen events failed: ${response.status}`);
}
Expand Down

0 comments on commit 922fb3b

Please sign in to comment.