Skip to content

Commit

Permalink
feat: typed rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ulivz committed Jun 4, 2024
1 parent bf9aa91 commit 9981c69
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 60 deletions.
5 changes: 4 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
module.exports = {
extends: "eslint-config-typescript-library",
extends: 'eslint-config-typescript-library',
rules: {
camelcase: 'off',
},
};
6 changes: 4 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"markdown"
],
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"search.exclude": {
"**/.git": true,
Expand Down Expand Up @@ -43,7 +43,9 @@
"liveServer.settings.port": 5501,
"js/ts.implicitProjectConfig.strictNullChecks": false,
"cSpell.words": [
"camelcase",
"insx",
"Unport"
"Unport",
"Unrpc"
]
}
128 changes: 128 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Each of these JSContexts exhibits distinct methods of communicating with the ext
- [Channel](#channel-1)
- [.pipe()](#pipe)
- [ChannelMessage](#channelmessage)
- [Unrpc (Experimental)](#unrpc-experimental)
- [🤝 Contributing](#-contributing)
- [🤝 Credits](#-credits)
- [LICENSE](#license)
Expand Down Expand Up @@ -299,6 +300,133 @@ The `ChannelMessage` type is used for the message in the `onMessage` method.
import { ChannelMessage } from 'unport';
```

### Unrpc (Experimental)

Starting with the 0.6.0 release, we are experimentally introducing support for Typed [RPC (Remote Procedure Call)](https://en.wikipedia.org/wiki/Remote_procedure_call).

When dealing with a single Port that requires RPC definition, we encounter a problem related to the programming paradigm. It's necessary to define `Request` and `Response` messages such as:

```ts
export type IpcDefinition = {
a2b: {
callFoo: {
input: string;
};
};
b2a: {
callFooCallback: {
result: string;
};
};
};
```

In the case where an RPC call needs to be encapsulated, the API might look like this:

```ts
function rpcCall(request: { input: string; }): Promise<{ result: string; }>;
```

Consequently, to associate a callback function, it becomes a requirement to include a `CallbackId` at the **application layer** for every RPC method:

```diff
export type IpcDefinition = {
a2b: {
callFoo: {
input: string;
+ callbackId: string;
};
};
b2a: {
callFooCallback: {
result: string;
+ callbackId: string;
};
};
};
```

`Unrpc` is provided to address this issue, enabling support for Typed RPC starting from the **protocol layer**:

```ts
import { Unrpc } from 'unport';

// "parentPort" is a Port defined based on Unport in the previous example.
const parent = new Unrpc(parentPort);

// Implementing an RPC method.
parent.implement('callFoo', request => ({
user: `parent (${request.id})`,
}));

// Emit a SYN event.
parent.port.postMessage('syn', { pid: 'parent' });

// Listen for the ACK message.
parent.port.onMessage('ack', async payload => {
// Call an RPC method as defined by the "child" port.
const response = await parent.call('getChildInfo', {
name: 'parent',
});
});
```

The implementation on the `child` side is as follows:

```ts
import { Unrpc } from 'unport';

// "parentPort" is a Port also defined based on Unport.
const child = new Unrpc(childPort);

child.implement('getChildInfo', request => ({
clientKey: `[child] ${request.name}`,
}));

// Listen for the SYN message.
child.port.onMessage('syn', async payload => {
const response = await child.call('getInfo', { id: '<child>' });
// Acknowledge the SYN event.
child.port.postMessage('ack', { pid: 'child' });
});
```

The types are defined as such:

```ts
import { Unport } from 'unport';

export type Definition = {
parent2child: {
syn: {
pid: string;
};
getInfo__callback: {
user: string;
};
getChildInfo: {
name: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__callback: {
clientKey: string;
};
ack: {
pid: string;
};
};
};

export type ChildPort = Unport<Definition, 'child'>;
export type ParentPort = Unport<Definition, 'parent'>;
```

In comparison to Unport, the only new concept to grasp is that the RPC response message key must end with `__callback`. Other than that, no additional changes are necessary! `Unrpc` also offers comprehensive type inference based on this convention; for instance, you won't be able to implement an RPC method that is meant to serve as a response.

## 🤝 Contributing

Contributions, issues and feature requests are welcome!
Expand Down
152 changes: 152 additions & 0 deletions __tests__/rpc.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { ChannelMessage, Unport, Unrpc, UnrpcExecutionErrorError, UnrpcNotImplementationError } from '../src';

export type Definition = {
parent2child: {
syn: {
pid: string;
};
getInfo__callback: {
user: string;
};
getChildInfo: {
name: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__callback: {
clientKey: string;
};
ack: {
pid: string;
};
};
};

describe('Unrpc', () => {
let childPort: Unport<Definition, 'child'>;
let parentPort: Unport<Definition, 'parent'>;
let child: Unrpc<Definition, 'child'>;
let parent: Unrpc<Definition, 'parent'>;

beforeEach(() => {
const messageChannel = new MessageChannel();
if (childPort) childPort.destroy();
childPort = new Unport();
childPort.implementChannel({
send(message) {
messageChannel.port1.postMessage(message);
},
accept(pipe) {
messageChannel.port1.onmessage = (message: MessageEvent<ChannelMessage>) => pipe(message.data);
},
destroy() {
messageChannel.port1.close();
},
});
child = new Unrpc(childPort);

parentPort = new Unport();
parentPort.implementChannel({
send(message) {
console.log(message);
messageChannel.port2.postMessage(message);
},
accept(pipe) {
messageChannel.port2.onmessage = (message: MessageEvent<ChannelMessage>) => pipe(message.data);
},
destroy() {
messageChannel.port2.close();
},
});

parent = new Unrpc(parentPort);
});

it('implemented method - asynchronous implementation', async () => {
parent.implement('getInfo', async ({ id }) => ({ user: id }));
const response = child.call('getInfo', { id: 'name' });
expect(response).resolves.toMatchObject({ user: 'name' });
});

it('implemented method - synchronous implementation', async () => {
parent.implement('getInfo', ({ id }) => ({ user: id }));
const response = child.call('getInfo', { id: 'name' });
expect(response).resolves.toMatchObject({ user: 'name' });
});

it('Error: UnrpcNotImplementationError', async () => {
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcNotImplementationError('Method getInfo is not implemented'),
);
});

it('Error: UnrpcExecutionErrorError - script error - asynchronous implementation', async () => {
parent.implement('getInfo', async () => {
// @ts-expect-error mock execution error here.
const result = foo;
return result;
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('foo is not defined'),
);
});

it('Error: UnrpcExecutionErrorError - script error - synchronous implementation', async () => {
parent.implement('getInfo', () => {
// @ts-expect-error mock execution error here.
const result = foo;
return result;
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('foo is not defined'),
);
});

it('Error: UnrpcExecutionErrorError - user throws error', async () => {
parent.implement('getInfo', () => {
throw new Error('mock error');
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('mock error'),
);
});

it('complicated case', async () => {
parent.implement('getInfo', async ({ id }) => ({ user: id }));
child.implement('getChildInfo', async ({ name }) => ({ clientKey: name }));

let finishHandshake: (value?: unknown) => void;
const handshakePromise = new Promise(resolve => {
finishHandshake = resolve;
});

/**
* Simulates a handshake
*/
parent.port.postMessage('syn', { pid: 'parent' });
parent.port.onMessage('ack', async payload => {
expect(payload.pid).toBe('child');
finishHandshake();
});
child.port.onMessage('syn', async payload => {
expect(payload.pid).toBe('parent');
child.port.postMessage('ack', { pid: 'child' });
});

/**
* Wait handshake finished
*/
await handshakePromise;

const [response1, response2] = await Promise.all([
child.call('getInfo', { id: 'child' }),
parent.call('getChildInfo', { name: 'parent' }),
]);
expect(response1).toMatchObject({ user: 'child' });
expect(response2).toMatchObject({ clientKey: 'parent' });
});
});
29 changes: 29 additions & 0 deletions examples/child-process-rpc/child.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ChildPort } from './port';

// 1. Initialize a port
const childPort: ChildPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
childPort.implementChannel({
send(message) {
process.send && process.send(message);
},
accept(pipe) {
process.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
});

// 3. Initialize a rpc client
const childRpcClient = new Unrpc(childPort);
childRpcClient.implement('getChildInfo', request => ({
clientKey: `[child] ${request.name}`,
}));
childRpcClient.port.onMessage('syn', async payload => {
console.log('[child] [event] [syn] [result]', payload);
const response = await childRpcClient.call('getInfo', { id: '<child>' });
console.log('[child] [rpc] [getInfo] [response]', response);
childPort.postMessage('ack', { pid: 'child' });
});
43 changes: 43 additions & 0 deletions examples/child-process-rpc/parent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { join } from 'path';
import { fork } from 'child_process';
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ParentPort } from './port';

// 1. Initialize a port
const parentPort: ParentPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
const childProcess = fork(join(__dirname, './child.js'));
parentPort.implementChannel({
send(message) {
childProcess.send(message);
},
accept(pipe) {
childProcess.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
destroy() {
childProcess.removeAllListeners('message');
childProcess.kill();
},
});

// 3. Initialize a rpc client from port.
const parentRpcClient = new Unrpc(parentPort);

parentRpcClient.implement('getInfo', request => ({
user: `parent (${request.id})`,
}));
parentRpcClient.port.postMessage('syn', { pid: 'parent' });
parentRpcClient.port.onMessage('ack', async payload => {
console.log('[parent] [event] [ack] [result]', payload);
const response = await parentRpcClient.call('getChildInfo', {
name: 'parent',
});
console.log('[parent] [rpc] [getChildInfo] [response]', response);
setTimeout(() => {
console.log('destroy');
parentPort.destroy();
}, 1000);
});
Loading

0 comments on commit 9981c69

Please sign in to comment.