Skip to content

Commit

Permalink
[WALL-25371] Remove RxJS (#1018)
Browse files Browse the repository at this point in the history
* [WALL-25371] Remove RxJS WalletLinkConnection

* authenticate

* onceConnected onceLinked

* remove connectionStateSubject

* adding delay function to replace rxjs delay

* applying ws.connectionState$ changes

* remove delay func

* comment

* this.connected

* replace this.onceConnected$

* replace linkedSubject

* cleanup

* refactor RxWebSocket class

* fix tests

* remove rxjs from test files

* cleanup

* ws.setIncomingDataListener

* keep markUnseenEventsAsSeen as it used to be

* implement request resolve mapping

* fix tests

* rename RxWebSocket -> WalletLinkWebSocket

* Revert "rename RxWebSocket -> WalletLinkWebSocket"

This reverts commit c3ae669.

* rename only the class

* remove rxjs package

* duh

* cleanup

* reimplement this.linked update pipeline

* separate out once***ed logic

* cleanup

* this.shouldFetchUnseenEventsOnConnect

* cleanup
  • Loading branch information
bangtoven authored Oct 11, 2023
1 parent 9fd9792 commit a757aa5
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 394 deletions.
1 change: 0 additions & 1 deletion packages/wallet-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"eventemitter3": "^5.0.1",
"keccak": "^3.0.3",
"preact": "^10.16.0",
"rxjs": "^6.6.3",
"sha.js": "^2.4.11"
},
"devDependencies": {
Expand Down
84 changes: 39 additions & 45 deletions packages/wallet-sdk/src/connection/RxWebSocket.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import WS from 'jest-websocket-mock';
import { Observable } from 'rxjs';

import { ConnectionState, RxWebSocket } from './RxWebSocket';
import { ConnectionState, WalletLinkWebSocket } from './RxWebSocket';

describe('RxWebSocket', () => {
describe('WalletLinkWebSocket', () => {
let server: WS;
let rxWS: RxWebSocket;
let rxWS: WalletLinkWebSocket;
beforeEach(() => {
server = new WS('ws://localhost:1234');
rxWS = new RxWebSocket('http://localhost:1234');
rxWS = new WalletLinkWebSocket('http://localhost:1234');
});

afterEach(() => {
Expand All @@ -17,27 +16,13 @@ describe('RxWebSocket', () => {

describe('is connected', () => {
test('@connect & @disconnect', async () => {
const client = rxWS.connect();
const connectionStateListener = jest.fn();
rxWS.setConnectionStateListener(connectionStateListener);

expect(client).toBeInstanceOf(Observable);
await client.toPromise();
await rxWS.connect();
await server.connected;

// @ts-expect-error test private methods
expect(rxWS.webSocket).toBeInstanceOf(WebSocket);
// @ts-expect-error test private methods
rxWS.connectionStateSubject
.subscribe({
next: (val) => {
// Connected state
expect(val).toEqual(ConnectionState.CONNECTED);
},
closed: (val: ConnectionState) => {
// Disconnected state
expect(val).toEqual(ConnectionState.DISCONNECTED);
},
})
.unsubscribe();
expect(connectionStateListener).toHaveBeenCalledWith(ConnectionState.CONNECTED);

// Sends data
const webSocketSendMock = jest
Expand All @@ -49,54 +34,63 @@ describe('RxWebSocket', () => {

// Disconnects
rxWS.disconnect();
expect(connectionStateListener).toHaveBeenCalledWith(ConnectionState.DISCONNECTED);
// @ts-expect-error test private methods
expect(rxWS.webSocket).toBe(null);
});

test('@connectionState$ & @incomingData$', () => {
expect(rxWS.connectionState$).toBeInstanceOf(Observable);
expect(rxWS.incomingData$).toBeInstanceOf(Observable);
});

test('@incomingJSONData$', () => {
expect(rxWS.incomingJSONData$).toBeInstanceOf(Observable);
});

describe('errors & event listeners', () => {
afterEach(() => rxWS.disconnect());

test('@connect throws error when connecting again', async () => {
const client = rxWS.connect();
await client.toPromise();
await rxWS.connect();

await expect(rxWS.connect().toPromise()).rejects.toThrow('webSocket object is not null');
await expect(rxWS.connect()).rejects.toThrow('webSocket object is not null');
});

test('@connect throws error & fails to set websocket instance', async () => {
const errorConnect = new RxWebSocket('');
const errorConnect = new WalletLinkWebSocket('');

await expect(errorConnect.connect().toPromise()).rejects.toThrow(
await expect(errorConnect.connect()).rejects.toThrow(
"Failed to construct 'WebSocket': 1 argument required, but only 0 present."
);
});

test('onclose event throws error', async () => {
const client = rxWS.connect();
await client.toPromise();
await rxWS.connect();
await server.connected;
server.error();

await expect(rxWS.connect().toPromise()).rejects.toThrow('websocket error 1000: ');
await expect(rxWS.connect()).rejects.toThrow('websocket error 1000: ');
});

test('onmessage event emits message', async () => {
const client = rxWS.connect();
await client.toPromise();
const incomingDataListener = jest.fn();
rxWS.setIncomingDataListener(incomingDataListener);

await rxWS.connect();
await server.connected;

const message = {
type: 'ServerMessageType',
data: 'hello world',
};

server.send(JSON.stringify(message));
expect(incomingDataListener).toHaveBeenCalledWith(message);
});

test('onmessage event emits heartbeat message', async () => {
const incomingDataListener = jest.fn();
rxWS.setIncomingDataListener(incomingDataListener);

await rxWS.connect();
await server.connected;

// @ts-expect-error test private methods
rxWS.incomingDataSubject.subscribe((val) => expect(val).toEqual('hello world'));
server.send('hello world');
server.send('h');
expect(incomingDataListener).toHaveBeenCalledWith({
type: 'Heartbeat',
});
});
});
});
Expand Down
98 changes: 38 additions & 60 deletions packages/wallet-sdk/src/connection/RxWebSocket.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
// Copyright (c) 2018-2023 Coinbase, Inc. <https://www.coinbase.com/>
// Licensed under the Apache License, version 2.0

import { BehaviorSubject, empty, Observable, of, Subject, throwError } from 'rxjs';
import { flatMap, take } from 'rxjs/operators';
import { ServerMessage } from './ServerMessage';

export enum ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
}

/**
* Rx-ified WebSocket
*/
export class RxWebSocket<T = object> {
export class WalletLinkWebSocket {
private readonly url: string;
private webSocket: WebSocket | null = null;
private connectionStateSubject = new BehaviorSubject<ConnectionState>(
ConnectionState.DISCONNECTED
);
private incomingDataSubject = new Subject<string>();
private pendingData: string[] = [];

private connectionStateListener?: (_: ConnectionState) => void;
setConnectionStateListener(listener: (_: ConnectionState) => void): void {
this.connectionStateListener = listener;
}

private incomingDataListener?: (_: ServerMessage) => void;
setIncomingDataListener(listener: (_: ServerMessage) => void): void {
this.incomingDataListener = listener;
}

/**
* Constructor
* @param url WebSocket server URL
Expand All @@ -36,41 +38,51 @@ export class RxWebSocket<T = object> {

/**
* Make a websocket connection
* @returns an Observable that completes when connected
* @returns a Promise that resolves when connected
*/
public connect(): Observable<void> {
public async connect() {
if (this.webSocket) {
return throwError(new Error('webSocket object is not null'));
throw new Error('webSocket object is not null');
}
return new Observable<void>((obs) => {
return new Promise<void>((resolve, reject) => {
let webSocket: WebSocket;
try {
this.webSocket = webSocket = new this.WebSocketClass(this.url);
} catch (err) {
obs.error(err);
reject(err);
return;
}
this.connectionStateSubject.next(ConnectionState.CONNECTING);
this.connectionStateListener?.(ConnectionState.CONNECTING);
webSocket.onclose = (evt) => {
this.clearWebSocket();
obs.error(new Error(`websocket error ${evt.code}: ${evt.reason}`));
this.connectionStateSubject.next(ConnectionState.DISCONNECTED);
reject(new Error(`websocket error ${evt.code}: ${evt.reason}`));
this.connectionStateListener?.(ConnectionState.DISCONNECTED);
};
webSocket.onopen = (_) => {
obs.next();
obs.complete();
this.connectionStateSubject.next(ConnectionState.CONNECTED);
resolve();
this.connectionStateListener?.(ConnectionState.CONNECTED);

if (this.pendingData.length > 0) {
const pending = [...this.pendingData];
pending.forEach(data => this.sendData(data));
pending.forEach((data) => this.sendData(data));
this.pendingData = [];
}
};
webSocket.onmessage = (evt) => {
this.incomingDataSubject.next(evt.data as string);
if (evt.data === 'h') {
this.incomingDataListener?.({
type: 'Heartbeat',
});
} else {
try {
const message = JSON.parse(evt.data) as ServerMessage;
this.incomingDataListener?.(message);
} catch {
/* empty */
}
}
};
}).pipe(take(1));
});
}

/**
Expand All @@ -82,48 +94,14 @@ export class RxWebSocket<T = object> {
return;
}
this.clearWebSocket();
this.connectionStateSubject.next(ConnectionState.DISCONNECTED);
this.connectionStateListener?.(ConnectionState.DISCONNECTED);
try {
webSocket.close();
} catch {
// noop
}
}

/**
* Emit current connection state and subsequent changes
* @returns an Observable for the connection state
*/
public get connectionState$(): Observable<ConnectionState> {
return this.connectionStateSubject.asObservable();
}

/**
* Emit incoming data from server
* @returns an Observable for the data received
*/
public get incomingData$(): Observable<string> {
return this.incomingDataSubject.asObservable();
}

/**
* Emit incoming JSON data from server. non-JSON data are ignored
* @returns an Observable for parsed JSON data
*/
public get incomingJSONData$(): Observable<T> {
return this.incomingData$.pipe(
flatMap((m) => {
let j: any;
try {
j = JSON.parse(m);
} catch (err) {
return empty();
}
return of(j);
})
);
}

/**
* Send data to server
* @param data text to send
Expand All @@ -132,7 +110,7 @@ export class RxWebSocket<T = object> {
const { webSocket } = this;
if (!webSocket) {
this.pendingData.push(data);
this.connect().subscribe();
this.connect();
return;
}
webSocket.send(data);
Expand Down
Loading

0 comments on commit a757aa5

Please sign in to comment.