Skip to content

Commit

Permalink
feat: typed rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ulivz committed Jun 4, 2024
1 parent bf9aa91 commit ca6910f
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 11 deletions.
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"markdown"
],
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"search.exclude": {
"**/.git": true,
Expand Down Expand Up @@ -44,6 +44,7 @@
"js/ts.implicitProjectConfig.strictNullChecks": false,
"cSpell.words": [
"insx",
"Unport"
"Unport",
"Unrpc"
]
}
29 changes: 29 additions & 0 deletions examples/child-process-rpc/child.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Unport, Unrpc, ChannelMessage } from '../../src';
import { ChildPort } from './port';

// 1. Initialize a port
const childPort: ChildPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
childPort.implementChannel({
send(message) {
process.send && process.send(message);
},
accept(pipe) {
process.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
});

// 3. Initialize a rpc client
async function main() {
const childRpc = new Unrpc(childPort);
childRpc.implement('getChildInfo', request => ({
clientKey: `${request.name} ${request.path}`,
}));
const response = childRpc.call('getInfo', { id: '<child>' });
console.log('[child] [getInfo] [response]', response);
}

main();
30 changes: 30 additions & 0 deletions examples/child-process-rpc/parent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { join } from 'path';
import { fork } from 'child_process';
import { Unport, ChannelMessage } from '../../src';
import { ParentPort } from './port';

// 1. Initialize a port
const parentPort: ParentPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
const childProcess = fork(join(__dirname, './child.js'));
parentPort.implementChannel({
send(message) {
childProcess.send(message);
},
accept(pipe) {
childProcess.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
});

// 3. Initialize a rpc client
async function main() {
const parentRpc = parentPort.rpc();
parentRpc.implement('getInfo', request => ({
user: `parent (${request.id})`,
}));
}

main();
25 changes: 25 additions & 0 deletions examples/child-process-rpc/port.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* eslint-disable camelcase */
import { Unport } from '../../src';

export type Definition = {
parent2child: {
getInfo__CALLBACK: {
user: string;
};
getChildInfo: {
name: string;
path: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__CALLBACK: {
clientKey: string;
}
};
};

export type ChildPort = Unport<Definition, 'child'>;
export type ParentPort = Unport<Definition, 'parent'>;
155 changes: 146 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-classes-per-file */
/**
* @license
* Copyright (c) ULIVZ. All Rights Reserved.
Expand Down Expand Up @@ -100,30 +101,74 @@ type ReverseDirection<
* `Payload` type is a utility to extract the payload type of a specific message, given its
* direction and the name.
*/
type Payload<T extends MessageDefinition, D extends Direction<T>, U extends keyof T[D]> = T[D][U];
export type Payload<T extends MessageDefinition, D extends Direction<T>, U extends keyof T[D]> = T[D][U];
/**
* `Callback` is a type representing a generic function
*/
type Callback<T extends unknown[] = [], U = unknown> = (...args: T) => U;

interface Port<T extends MessageDefinition, D extends Direction<T>> {
postMessage<U extends keyof T[D]>(t: U, p: Payload<T, D, U>): void;
/**
* A base interface used to describe a Message Port
*/
interface Port<T extends MessageDefinition, D extends Direction<T>, > {
// eslint-disable-next-line no-use-before-define
postMessage<U extends keyof T[D]>(t: U, p?: Payload<T, D, U>, extra?: Pick<ChannelMessage, 'd' | 'c' | 'e'>): void;
onMessage<U extends keyof T[ReverseDirection<T, D>]>(
t: U,
handler: Callback<[Payload<T, ReverseDirection<T, D>, U>]>,
): void;
}

export type EnsureString<T> = T extends string ? T : never;
export type CallbackSuffix = '__CALLBACK';

/**
* A generic type used to infer the return value type of an Rpc call. For example, when you call
* "foo" on one end of the port, the return value is of the type defined by "foo__CALLBACK" on
* the other end.
*/
export type CallbackPayload<
T extends MessageDefinition,
D extends Direction<T>,
U extends keyof T[D],
S extends EnsureString<U> = EnsureString<U>
> =
`${S}${CallbackSuffix}` extends keyof T[ReverseDirection<T, D>]
? Payload<T, ReverseDirection<T, D>, `${S}${CallbackSuffix}`> : unknown;

/**
* A base interface used to describe a Rpc client instance.
*/
export interface Rpc<T extends MessageDefinition, D extends Direction<T>, > {
call<U extends keyof T[D]>(t: U, p: Payload<T, D, U>): Promise<CallbackPayload<T, D, U>>;
implement<R extends keyof T[ReverseDirection<T, D>]>(
t: R,
handler: Callback<
[Payload<T, ReverseDirection<T, D>, R>],
CallbackPayload<T, ReverseDirection<T, D>, R> | Promise<CallbackPayload<T, ReverseDirection<T, D>, R>>
>,
): void;
}

// eslint-disable-next-line no-shadow
export const enum ChannelMessageErrorCode {
NotImplemented = 'NOT_IMPLEMENTED',
ExecutionError = 'EXECUTION_ERROR',
}

/**
* `ChannelMessage` interface defines the structure of a message that can be sent
* or received through an `Channel`.
*
* It contains a `t` field for the name of the message, and a `p` field for the payload
* of the message.
* of the message, `d` for the message id.
*/
export interface ChannelMessage {
t: string | number | symbol;
p: any;
_$: 'un';
t: string | number | symbol; /* message key */
p?: any; /* message payload */
d?: number; /* message id */
e?: string; /* error code */
c?: ChannelMessageErrorCode; /* error message */
}

/**
Expand Down Expand Up @@ -153,11 +198,26 @@ implements Port<T, InferDirectionByPort<T, U>> {

public channel?: EnhancedChannel;

public channelReceiveMessageListener?: (message: ChannelMessage) => unknown;

public setChannelReceiveMessageListener(listener: (message: ChannelMessage) => unknown) {
if (typeof listener === 'function') {
this.channelReceiveMessageListener = listener;
}
}

public rpc() {
return new Unrpc(this);
}

implementChannel(channel: Channel | (() => Channel)): EnhancedChannel {
// @ts-expect-error We will assign it immediately
this.channel = typeof channel === 'function' ? channel() : channel;
if (typeof this.channel === 'object' && typeof this.channel.send === 'function') {
this.channel.pipe = (message: ChannelMessage) => {
if (typeof this.channelReceiveMessageListener === 'function') {
this.channelReceiveMessageListener(message);
}
if (typeof message === 'object' && message._$ === 'un') {
const { t, p } = message;
const handler = this.handlers[t];
Expand All @@ -175,11 +235,11 @@ implements Port<T, InferDirectionByPort<T, U>> {
return this.channel;
}

postMessage: Port<T, InferDirectionByPort<T, U>>['postMessage'] = (t, p) => {
postMessage: Port<T, InferDirectionByPort<T, U>>['postMessage'] = (t, p, extra) => {
if (!this.channel) {
throw new Error('[2] Port is not implemented or has been destroyed');
}
this.channel.send({ t, p, _$: 'un' });
this.channel.send({ ...(extra || {}), t, p, _$: 'un' });
};

onMessage: Port<T, InferDirectionByPort<T, U>>['onMessage'] = (t, handler) => {
Expand All @@ -195,3 +255,80 @@ implements Port<T, InferDirectionByPort<T, U>> {
delete this.channel;
}
}

const CALLBACK_SUFFIX: CallbackSuffix = '__CALLBACK';

/**
* Expose Unrpc class
*/
export class Unrpc<T extends MessageDefinition, U extends InferPorts<T>> implements Rpc<T, InferDirectionByPort<T, U>> {
private callbackMap = new Map<number, Callback<[any]>>();

private currentCallbackId = 0;

private implementations = new Map<string | number | symbol, Callback<[any]>>();

constructor(private port: Unport<T, U>) {
this.port.setChannelReceiveMessageListener(message => {
if (typeof message === 'object' && message._$ === 'un') {
const { t, p, d } = message;
const messageKey = String(t);
if (typeof d === 'number') {
// Receive callback
if (messageKey.endsWith(CALLBACK_SUFFIX)) {
const callback = this.callbackMap.get(d);
if (callback) {
callback(p);
}
} else {
// Receive request
const handler = this.implementations.get(t);
const callbackMessageKey = `${messageKey}${CALLBACK_SUFFIX}` as keyof T[InferDirectionByPort<T, U>];
if (handler) {
const handleCallback = (result: any) => {
this.port.postMessage(callbackMessageKey, result, {
d,
});
};
const handleExecutionError = (e: any) => {
this.port.postMessage(callbackMessageKey, undefined, {
d,
c: ChannelMessageErrorCode.ExecutionError,
e: e instanceof Error ? e.message : String(e),
});
};
let result: any;
try {
result = handler(p);
} catch (e) {
handleExecutionError(e);
}
if (result instanceof Promise) {
result.then(handleCallback).catch(handleExecutionError);
}
handleCallback(result);
} else {
this.port.postMessage(callbackMessageKey, undefined, {
d,
c: ChannelMessageErrorCode.NotImplemented,
});
}
}
}
}
});
}

call: Rpc<T, InferDirectionByPort<T, U>>['call'] = async (t, p) => {
const callbackId = this.currentCallbackId++;
const res = new Promise<CallbackPayload<T, InferDirectionByPort<T, U>, typeof t>>(resolve => {
this.callbackMap.set(callbackId, resolve);
});
this.port.postMessage(t, p, { d: callbackId });
return res;
};

implement: Rpc<T, InferDirectionByPort<T, U>>['implement'] = (t, p) => {
this.implementations.set(t, p);
};
}

0 comments on commit ca6910f

Please sign in to comment.