diff --git a/.vscode/settings.json b/.vscode/settings.json index db75cae..ba8728e 100755 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,7 +8,7 @@ "markdown" ], "editor.codeActionsOnSave": { - "source.fixAll.eslint": true + "source.fixAll.eslint": "explicit" }, "search.exclude": { "**/.git": true, @@ -44,6 +44,7 @@ "js/ts.implicitProjectConfig.strictNullChecks": false, "cSpell.words": [ "insx", - "Unport" + "Unport", + "Unrpc" ] } diff --git a/examples/child-process-rpc/child.ts b/examples/child-process-rpc/child.ts new file mode 100644 index 0000000..3091255 --- /dev/null +++ b/examples/child-process-rpc/child.ts @@ -0,0 +1,29 @@ +import { Unport, Unrpc, ChannelMessage } from '../../lib'; +import { ChildPort } from './port'; + +// 1. Initialize a port +const childPort: ChildPort = new Unport(); + +// 2. Implement a Channel based on underlying IPC capabilities +childPort.implementChannel({ + send(message) { + process.send && process.send(message); + }, + accept(pipe) { + process.on('message', (message: ChannelMessage) => { + pipe(message); + }); + }, +}); + +// 3. Initialize a rpc client +const childRpcClient = new Unrpc(childPort); +childRpcClient.implement('getChildInfo', request => ({ + clientKey: `[child] ${request.name}`, +})); +childRpcClient.port.onMessage('syn', async payload => { + console.log('[child] [event] [syn] [result]', payload); + const response = await childRpcClient.call('getInfo', { id: '' }); + console.log('[child] [rpc] [getInfo] [response]', response); + childPort.postMessage('ack', { pid: 'child' }); +}); diff --git a/examples/child-process-rpc/parent.ts b/examples/child-process-rpc/parent.ts new file mode 100644 index 0000000..986c0c4 --- /dev/null +++ b/examples/child-process-rpc/parent.ts @@ -0,0 +1,43 @@ +import { join } from 'path'; +import { fork } from 'child_process'; +import { Unport, Unrpc, ChannelMessage } from '../../lib'; +import { ParentPort } from './port'; + +// 1. Initialize a port +const parentPort: ParentPort = new Unport(); + +// 2. Implement a Channel based on underlying IPC capabilities +const childProcess = fork(join(__dirname, './child.js')); +parentPort.implementChannel({ + send(message) { + childProcess.send(message); + }, + accept(pipe) { + childProcess.on('message', (message: ChannelMessage) => { + pipe(message); + }); + }, + destroy() { + childProcess.removeAllListeners('message'); + childProcess.kill(); + }, +}); + +// 3. Initialize a rpc client from port. +const parentRpcClient = new Unrpc(parentPort); + +parentRpcClient.implement('getInfo', request => ({ + user: `parent (${request.id})`, +})); +parentRpcClient.port.postMessage('syn', { pid: 'parent' }); +parentRpcClient.port.onMessage('ack', async payload => { + console.log('[parent] [event] [ack] [result]', payload); + const response = await parentRpcClient.call('getChildInfo', { + name: 'parent', + }); + console.log('[parent] [rpc] [getChildInfo] [response]', response); + setTimeout(() => { + console.log('destroy'); + parentPort.destroy(); + }, 1000); +}); diff --git a/examples/child-process-rpc/port.ts b/examples/child-process-rpc/port.ts new file mode 100644 index 0000000..aa20929 --- /dev/null +++ b/examples/child-process-rpc/port.ts @@ -0,0 +1,30 @@ +/* eslint-disable camelcase */ +import { Unport } from '../../lib'; + +export type Definition = { + parent2child: { + syn: { + pid: string; + }; + getInfo__callback: { + user: string; + }; + getChildInfo: { + name: string; + } + }; + child2parent: { + getInfo: { + id: string; + }; + getChildInfo__callback: { + clientKey: string; + }; + ack: { + pid: string; + }; + }; +}; + +export type ChildPort = Unport; +export type ParentPort = Unport; diff --git a/package.json b/package.json index 6e0bde6..32ab6aa 100644 --- a/package.json +++ b/package.json @@ -20,8 +20,8 @@ "build": "run-s build:cjs build:esm build:example", "dev:cjs": "npm run build:cjs -- --watch", "dev:esm": "npm run build:esm -- --watch", - "build:cjs": "tsc -p tsconfig.json --module commonjs --outDir lib", - "build:esm": "tsc -p tsconfig.json --module ES2015 --outDir esm", + "build:cjs": "tsc -p tsconfig.src.json --module commonjs --outDir lib", + "build:esm": "tsc -p tsconfig.src.json --module ES2015 --outDir esm", "dev:example": "tsc -p tsconfig.examples.json --watch", "build:example": "tsc -p tsconfig.examples.json", "prepublishOnly": "npm run build", diff --git a/src/index.ts b/src/index.ts index 5669b1d..4da5e77 100755 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ /** * @license * Copyright (c) ULIVZ. All Rights Reserved. @@ -100,30 +101,80 @@ type ReverseDirection< * `Payload` type is a utility to extract the payload type of a specific message, given its * direction and the name. */ -type Payload, U extends keyof T[D]> = T[D][U]; +export type Payload, U extends keyof T[D]> = T[D][U]; /** * `Callback` is a type representing a generic function */ type Callback = (...args: T) => U; - +/** + * A base interface used to describe a Message Port + */ interface Port> { - postMessage(t: U, p: Payload): void; + // eslint-disable-next-line no-use-before-define + postMessage(t: U, p?: Payload, extra?: Pick): void; onMessage]>( t: U, handler: Callback<[Payload, U>]>, ): void; } + +export type EnsureString = T extends string ? T : never; +export type CallbackSuffix = '__callback'; + +/** + * A generic type used to infer the return value type of an Rpc call. For example, when you call + * "foo" on one end of the port, the return value is of the type defined by "foo__callback" on + * the other end. + */ +export type CallbackPayload< + T extends MessageDefinition, + D extends Direction, + U extends keyof T[D], + S extends EnsureString = EnsureString +> = + `${S}${CallbackSuffix}` extends keyof T[ReverseDirection] + ? Payload, `${S}${CallbackSuffix}`> : unknown; + +/** + * We filtered the messages, only the message without {@type {CallbackSuffix}} is defined rpc method. + */ +export type RpcMethod, U extends keyof T[D]> + = U extends `${infer A}${CallbackSuffix}` ? never : U; + +/** + * A base interface used to describe a Rpc client instance. + */ +export interface Rpc, > { + call(t: RpcMethod, p: Payload): Promise>; + implement]>( + t: RpcMethod, R>, + handler: Callback< + [Payload, R>], + CallbackPayload, R> | Promise, R>> + >, + ): void; +} + +// eslint-disable-next-line no-shadow +export const enum ChannelMessageErrorCode { + NotImplemented = 'NOT_IMPLEMENTED', + ExecutionError = 'EXECUTION_ERROR', +} + /** * `ChannelMessage` interface defines the structure of a message that can be sent * or received through an `Channel`. * * It contains a `t` field for the name of the message, and a `p` field for the payload - * of the message. + * of the message, `d` for the message id. */ export interface ChannelMessage { - t: string | number | symbol; - p: any; _$: 'un'; + t: string | number | symbol; /* message key */ + p?: any; /* message payload */ + d?: number; /* message id */ + e?: string; /* error code */ + c?: ChannelMessageErrorCode; /* error message */ } /** @@ -147,17 +198,30 @@ export interface EnhancedChannel extends Channel { /** * Expose Unport class */ -export class Unport> -implements Port> { +export class Unport< + T extends MessageDefinition, + U extends InferPorts +> implements Port> { private handlers: Record[]> = {}; public channel?: EnhancedChannel; - implementChannel(channel: Channel | (() => Channel)): EnhancedChannel { + public channelReceiveMessageListener?: (message: ChannelMessage) => unknown; + + public setChannelReceiveMessageListener(listener: (message: ChannelMessage) => unknown) { + if (typeof listener === 'function') { + this.channelReceiveMessageListener = listener; + } + } + + public implementChannel(channel: Channel | (() => Channel)): EnhancedChannel { // @ts-expect-error We will assign it immediately this.channel = typeof channel === 'function' ? channel() : channel; if (typeof this.channel === 'object' && typeof this.channel.send === 'function') { this.channel.pipe = (message: ChannelMessage) => { + if (typeof this.channelReceiveMessageListener === 'function') { + this.channelReceiveMessageListener(message); + } if (typeof message === 'object' && message._$ === 'un') { const { t, p } = message; const handler = this.handlers[t]; @@ -175,23 +239,100 @@ implements Port> { return this.channel; } - postMessage: Port>['postMessage'] = (t, p) => { + public postMessage: Port>['postMessage'] = (t, p, extra) => { if (!this.channel) { throw new Error('[2] Port is not implemented or has been destroyed'); } - this.channel.send({ t, p, _$: 'un' }); + this.channel.send({ ...(extra || {}), t, p, _$: 'un' }); }; - onMessage: Port>['onMessage'] = (t, handler) => { + public onMessage: Port>['onMessage'] = (t, handler) => { if (!this.handlers[t]) { this.handlers[t] = []; } this.handlers[t].push(handler); }; - destroy() { + public destroy() { this.handlers = {}; this.channel?.destroy && this.channel.destroy(); delete this.channel; } } + +const CALLBACK_SUFFIX: CallbackSuffix = '__callback'; + +/** + * Expose Unrpc class + */ +export class Unrpc> implements Rpc> { + private callbackMap = new Map>(); + + private currentCallbackId = 0; + + private implementations = new Map>(); + + constructor(public readonly port: Unport) { + this.port.setChannelReceiveMessageListener(message => { + if (typeof message === 'object' && message._$ === 'un') { + const { t, p, d } = message; + const messageKey = String(t); + if (typeof d === 'number') { + // Receive callback + if (messageKey.endsWith(CALLBACK_SUFFIX)) { + const callback = this.callbackMap.get(d); + if (callback) { + callback(p); + } + } else { + // Receive request + const handler = this.implementations.get(t); + const callbackMessageKey = `${messageKey}${CALLBACK_SUFFIX}` as keyof T[InferDirectionByPort]; + if (handler) { + const handleCallback = (result: any) => { + this.port.postMessage(callbackMessageKey, result, { + d, + }); + }; + const handleExecutionError = (e: any) => { + this.port.postMessage(callbackMessageKey, undefined, { + d, + c: ChannelMessageErrorCode.ExecutionError, + e: e instanceof Error ? e.message : String(e), + }); + }; + let result: any; + try { + result = handler(p); + } catch (e) { + handleExecutionError(e); + } + if (result instanceof Promise) { + result.then(handleCallback).catch(handleExecutionError); + } + handleCallback(result); + } else { + this.port.postMessage(callbackMessageKey, undefined, { + d, + c: ChannelMessageErrorCode.NotImplemented, + }); + } + } + } + } + }); + } + + public call: Rpc>['call'] = async (t, p) => { + const callbackId = this.currentCallbackId++; + const res = new Promise, typeof t>>(resolve => { + this.callbackMap.set(callbackId, resolve); + }); + this.port.postMessage(t, p, { d: callbackId }); + return res; + }; + + public implement: Rpc>['implement'] = (t, p) => { + this.implementations.set(t, p); + }; +} diff --git a/tsconfig.examples.json b/tsconfig.examples.json index 23789e3..7297604 100755 --- a/tsconfig.examples.json +++ b/tsconfig.examples.json @@ -1,5 +1,4 @@ { - "extends": "./tsconfig.json", "compilerOptions": { "composite": true, "rootDir": "examples", @@ -13,7 +12,7 @@ ], "references": [ { - "path": "./tsconfig.json" + "path": "./tsconfig.src.json" } ] } \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index e20b024..2acf385 100755 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,47 +1,10 @@ { - "compilerOptions": { - "strict": true, - "strictFunctionTypes": true, - "incremental": true, - "noUncheckedIndexedAccess": true, - "allowSyntheticDefaultImports": true, - "esModuleInterop": true, - "experimentalDecorators": true, - "allowJs": false, - "alwaysStrict": true, - "skipLibCheck": true, - "module": "commonjs", - "moduleResolution": "node", - "noEmitOnError": false, - "noImplicitThis": true, - "noImplicitAny": true, - "rootDir": "src", - "outDir": "./lib", - "sourceMap": true, - "target": "ES2019", - "typeRoots": [ - "node_modules/@types", - "types" - ], - "declaration": true, - "declarationMap": true, - "resolveJsonModule": true, - "downlevelIteration": true, - "lib": [ - "es5", - "es6" - ], - "composite": true, - "paths": { - "unport": ["./src/index.ts"] + "references": [ + { + "path": "./tsconfig.src.json" + }, + { + "path": "./tsconfig.examples.json" } - }, - "include": [ - "src", - "types", - "*.d.ts" - ], - "exclude": [ - "node_modules" ] } \ No newline at end of file diff --git a/tsconfig.src.json b/tsconfig.src.json new file mode 100755 index 0000000..ec784df --- /dev/null +++ b/tsconfig.src.json @@ -0,0 +1,49 @@ +{ + "compilerOptions": { + "strict": true, + "strictFunctionTypes": true, + "incremental": true, + "noUncheckedIndexedAccess": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "experimentalDecorators": true, + "allowJs": false, + "alwaysStrict": true, + "skipLibCheck": true, + "module": "commonjs", + "moduleResolution": "node", + "noEmitOnError": false, + "noImplicitThis": true, + "noImplicitAny": true, + "rootDir": "src", + "outDir": "./lib", + "sourceMap": true, + "target": "ES2019", + "typeRoots": [ + "node_modules/@types", + "types" + ], + "declaration": true, + "declarationMap": true, + "resolveJsonModule": true, + "downlevelIteration": true, + "lib": [ + "es5", + "es6" + ], + "composite": true, + "paths": { + "unport": [ + "./src/index.ts" + ] + } + }, + "include": [ + "src", + "types", + "*.d.ts" + ], + "exclude": [ + "node_modules" + ] +} \ No newline at end of file