Skip to content

Commit

Permalink
feat: typed rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ulivz committed Jun 4, 2024
1 parent bf9aa91 commit f4a71a4
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 62 deletions.
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"markdown"
],
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"search.exclude": {
"**/.git": true,
Expand Down Expand Up @@ -44,6 +44,7 @@
"js/ts.implicitProjectConfig.strictNullChecks": false,
"cSpell.words": [
"insx",
"Unport"
"Unport",
"Unrpc"
]
}
29 changes: 29 additions & 0 deletions examples/child-process-rpc/child.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ChildPort } from './port';

// 1. Initialize a port
const childPort: ChildPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
childPort.implementChannel({
send(message) {
process.send && process.send(message);
},
accept(pipe) {
process.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
});

// 3. Initialize a rpc client
async function main() {
const childRpc = new Unrpc(childPort);
childRpc.implement('getChildInfo', request => ({
clientKey: `[child] ${request.name}`,
}));
const response = await childRpc.call('getInfo', { id: '<child>' });
console.log('[child] [getInfo] [response]', response);
}

main();
43 changes: 43 additions & 0 deletions examples/child-process-rpc/parent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { join } from 'path';
import { fork } from 'child_process';
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ParentPort } from './port';

// 1. Initialize a port
const parentPort: ParentPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
const childProcess = fork(join(__dirname, './child.js'));
parentPort.implementChannel({
send(message) {
childProcess.send(message);
},
accept(pipe) {
childProcess.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
destroy() {
childProcess.removeAllListeners('message');
childProcess.kill();
},
});

// 3. Initialize a rpc client from port.
async function main() {
const parentRpcClient = new Unrpc(parentPort);
parentRpcClient.implement('getInfo', request => ({
user: `parent (${request.id})`,
}));
const response = await parentRpcClient.call('getChildInfo', {
name: 'parent',
});
console.log('[parent] [getChildInfo] [response]', response);

setTimeout(() => {
console.log('destroy');
parentPort.destroy();
}, 1000);
}

main();
24 changes: 24 additions & 0 deletions examples/child-process-rpc/port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* eslint-disable camelcase */
import { Unport } from '../../lib';

export type Definition = {
parent2child: {
getInfo__CALLBACK: {
user: string;
};
getChildInfo: {
name: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__CALLBACK: {
clientKey: string;
}
};
};

export type ChildPort = Unport<Definition, 'child'>;
export type ParentPort = Unport<Definition, 'parent'>;
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
"build": "run-s build:cjs build:esm build:example",
"dev:cjs": "npm run build:cjs -- --watch",
"dev:esm": "npm run build:esm -- --watch",
"build:cjs": "tsc -p tsconfig.json --module commonjs --outDir lib",
"build:esm": "tsc -p tsconfig.json --module ES2015 --outDir esm",
"build:cjs": "tsc -p tsconfig.src.json --module commonjs --outDir lib",
"build:esm": "tsc -p tsconfig.src.json --module ES2015 --outDir esm",
"dev:example": "tsc -p tsconfig.examples.json --watch",
"build:example": "tsc -p tsconfig.examples.json",
"prepublishOnly": "npm run build",
Expand Down
161 changes: 148 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-classes-per-file */
/**
* @license
* Copyright (c) ULIVZ. All Rights Reserved.
Expand Down Expand Up @@ -100,30 +101,74 @@ type ReverseDirection<
* `Payload` type is a utility to extract the payload type of a specific message, given its
* direction and the name.
*/
type Payload<T extends MessageDefinition, D extends Direction<T>, U extends keyof T[D]> = T[D][U];
export type Payload<T extends MessageDefinition, D extends Direction<T>, U extends keyof T[D]> = T[D][U];
/**
* `Callback` is a type representing a generic function
*/
type Callback<T extends unknown[] = [], U = unknown> = (...args: T) => U;

/**
* A base interface used to describe a Message Port
*/
interface Port<T extends MessageDefinition, D extends Direction<T>> {
postMessage<U extends keyof T[D]>(t: U, p: Payload<T, D, U>): void;
// eslint-disable-next-line no-use-before-define
postMessage<U extends keyof T[D]>(t: U, p?: Payload<T, D, U>, extra?: Pick<ChannelMessage, 'd' | 'c' | 'e'>): void;
onMessage<U extends keyof T[ReverseDirection<T, D>]>(
t: U,
handler: Callback<[Payload<T, ReverseDirection<T, D>, U>]>,
): void;
}

export type EnsureString<T> = T extends string ? T : never;
export type CallbackSuffix = '__CALLBACK';

/**
* A generic type used to infer the return value type of an Rpc call. For example, when you call
* "foo" on one end of the port, the return value is of the type defined by "foo__CALLBACK" on
* the other end.
*/
export type CallbackPayload<
T extends MessageDefinition,
D extends Direction<T>,
U extends keyof T[D],
S extends EnsureString<U> = EnsureString<U>
> =
`${S}${CallbackSuffix}` extends keyof T[ReverseDirection<T, D>]
? Payload<T, ReverseDirection<T, D>, `${S}${CallbackSuffix}`> : unknown;

/**
* A base interface used to describe a Rpc client instance.
*/
export interface Rpc<T extends MessageDefinition, D extends Direction<T>, > {
call<U extends keyof T[D]>(t: U, p: Payload<T, D, U>): Promise<CallbackPayload<T, D, U>>;
implement<R extends keyof T[ReverseDirection<T, D>]>(
t: R,
handler: Callback<
[Payload<T, ReverseDirection<T, D>, R>],
CallbackPayload<T, ReverseDirection<T, D>, R> | Promise<CallbackPayload<T, ReverseDirection<T, D>, R>>
>,
): void;
}

// eslint-disable-next-line no-shadow
export const enum ChannelMessageErrorCode {
NotImplemented = 'NOT_IMPLEMENTED',
ExecutionError = 'EXECUTION_ERROR',
}

/**
* `ChannelMessage` interface defines the structure of a message that can be sent
* or received through an `Channel`.
*
* It contains a `t` field for the name of the message, and a `p` field for the payload
* of the message.
* of the message, `d` for the message id.
*/
export interface ChannelMessage {
t: string | number | symbol;
p: any;
_$: 'un';
t: string | number | symbol; /* message key */
p?: any; /* message payload */
d?: number; /* message id */
e?: string; /* error code */
c?: ChannelMessageErrorCode; /* error message */
}

/**
Expand All @@ -147,17 +192,30 @@ export interface EnhancedChannel extends Channel {
/**
* Expose Unport class
*/
export class Unport<T extends MessageDefinition, U extends InferPorts<T>>
implements Port<T, InferDirectionByPort<T, U>> {
export class Unport<
T extends MessageDefinition,
U extends InferPorts<T>
> implements Port<T, InferDirectionByPort<T, U>> {
private handlers: Record<string | number | symbol, Callback<[any]>[]> = {};

public channel?: EnhancedChannel;

implementChannel(channel: Channel | (() => Channel)): EnhancedChannel {
public channelReceiveMessageListener?: (message: ChannelMessage) => unknown;

public setChannelReceiveMessageListener(listener: (message: ChannelMessage) => unknown) {
if (typeof listener === 'function') {
this.channelReceiveMessageListener = listener;
}
}

public implementChannel(channel: Channel | (() => Channel)): EnhancedChannel {
// @ts-expect-error We will assign it immediately
this.channel = typeof channel === 'function' ? channel() : channel;
if (typeof this.channel === 'object' && typeof this.channel.send === 'function') {
this.channel.pipe = (message: ChannelMessage) => {
if (typeof this.channelReceiveMessageListener === 'function') {
this.channelReceiveMessageListener(message);
}
if (typeof message === 'object' && message._$ === 'un') {
const { t, p } = message;
const handler = this.handlers[t];
Expand All @@ -175,23 +233,100 @@ implements Port<T, InferDirectionByPort<T, U>> {
return this.channel;
}

postMessage: Port<T, InferDirectionByPort<T, U>>['postMessage'] = (t, p) => {
public postMessage: Port<T, InferDirectionByPort<T, U>>['postMessage'] = (t, p, extra) => {
if (!this.channel) {
throw new Error('[2] Port is not implemented or has been destroyed');
}
this.channel.send({ t, p, _$: 'un' });
this.channel.send({ ...(extra || {}), t, p, _$: 'un' });
};

onMessage: Port<T, InferDirectionByPort<T, U>>['onMessage'] = (t, handler) => {
public onMessage: Port<T, InferDirectionByPort<T, U>>['onMessage'] = (t, handler) => {
if (!this.handlers[t]) {
this.handlers[t] = [];
}
this.handlers[t].push(handler);
};

destroy() {
public destroy() {
this.handlers = {};
this.channel?.destroy && this.channel.destroy();
delete this.channel;
}
}

const CALLBACK_SUFFIX: CallbackSuffix = '__CALLBACK';

/**
* Expose Unrpc class
*/
export class Unrpc<T extends MessageDefinition, U extends InferPorts<T>> implements Rpc<T, InferDirectionByPort<T, U>> {
private callbackMap = new Map<number, Callback<[any]>>();

private currentCallbackId = 0;

private implementations = new Map<string | number | symbol, Callback<[any]>>();

constructor(public readonly port: Unport<T, U>) {
this.port.setChannelReceiveMessageListener(message => {
if (typeof message === 'object' && message._$ === 'un') {
const { t, p, d } = message;
const messageKey = String(t);
if (typeof d === 'number') {
// Receive callback
if (messageKey.endsWith(CALLBACK_SUFFIX)) {
const callback = this.callbackMap.get(d);
if (callback) {
callback(p);
}
} else {
// Receive request
const handler = this.implementations.get(t);
const callbackMessageKey = `${messageKey}${CALLBACK_SUFFIX}` as keyof T[InferDirectionByPort<T, U>];
if (handler) {
const handleCallback = (result: any) => {
this.port.postMessage(callbackMessageKey, result, {
d,
});
};
const handleExecutionError = (e: any) => {
this.port.postMessage(callbackMessageKey, undefined, {
d,
c: ChannelMessageErrorCode.ExecutionError,
e: e instanceof Error ? e.message : String(e),
});
};
let result: any;
try {
result = handler(p);
} catch (e) {
handleExecutionError(e);
}
if (result instanceof Promise) {
result.then(handleCallback).catch(handleExecutionError);
}
handleCallback(result);
} else {
this.port.postMessage(callbackMessageKey, undefined, {
d,
c: ChannelMessageErrorCode.NotImplemented,
});
}
}
}
}
});
}

public call: Rpc<T, InferDirectionByPort<T, U>>['call'] = async (t, p) => {
const callbackId = this.currentCallbackId++;
const res = new Promise<CallbackPayload<T, InferDirectionByPort<T, U>, typeof t>>(resolve => {
this.callbackMap.set(callbackId, resolve);
});
this.port.postMessage(t, p, { d: callbackId });
return res;
};

public implement: Rpc<T, InferDirectionByPort<T, U>>['implement'] = (t, p) => {
this.implementations.set(t, p);
};
}
3 changes: 1 addition & 2 deletions tsconfig.examples.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"composite": true,
"rootDir": "examples",
Expand All @@ -13,7 +12,7 @@
],
"references": [
{
"path": "./tsconfig.json"
"path": "./tsconfig.src.json"
}
]
}
Loading

0 comments on commit f4a71a4

Please sign in to comment.