diff --git a/.eslintrc.js b/.eslintrc.js index a433465..f29a9b2 100755 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -1,3 +1,6 @@ module.exports = { - extends: "eslint-config-typescript-library", + extends: 'eslint-config-typescript-library', + rules: { + camelcase: 'off', + }, }; diff --git a/.vscode/settings.json b/.vscode/settings.json index db75cae..90b4b73 100755 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,7 +8,7 @@ "markdown" ], "editor.codeActionsOnSave": { - "source.fixAll.eslint": true + "source.fixAll.eslint": "explicit" }, "search.exclude": { "**/.git": true, @@ -43,7 +43,9 @@ "liveServer.settings.port": 5501, "js/ts.implicitProjectConfig.strictNullChecks": false, "cSpell.words": [ + "camelcase", "insx", - "Unport" + "Unport", + "Unrpc" ] } diff --git a/README.md b/README.md index b89cf1c..59123bc 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Each of these JSContexts exhibits distinct methods of communicating with the ext - [Channel](#channel-1) - [.pipe()](#pipe) - [ChannelMessage](#channelmessage) + - [Unrpc (Experimental)](#unrpc-experimental) - [🤝 Contributing](#-contributing) - [🤝 Credits](#-credits) - [LICENSE](#license) @@ -299,6 +300,133 @@ The `ChannelMessage` type is used for the message in the `onMessage` method. import { ChannelMessage } from 'unport'; ``` +### Unrpc (Experimental) + +Starting with the 0.6.0 release, we are experimentally introducing support for Typed [RPC (Remote Procedure Call)](https://en.wikipedia.org/wiki/Remote_procedure_call). + +When dealing with a single Port that requires RPC definition, we encounter a problem related to the programming paradigm. It's necessary to define `Request` and `Response` messages such as: + +```ts +export type AppletIpcDefinition = { + a2b: { + callFoo: { + input: string; + }; + }; + b2a: { + callFooCallback: { + result: string; + }; + }; +}; +``` + +In the case where an RPC call needs to be encapsulated, the API might look like this: + +```ts +function rpcCall(request: { input: string; }): Promise<{ result: string; }>; +``` + +Consequently, it becomes a requirement to include a `CallbackId` at the **application layer** for every RPC method: + +```diff + export type AppletIpcDefinition = { + a2b: { + callFoo: { + input: string; ++ callbackId: string; + }; + }; + b2a: { + callFooCallback: { + result: string; ++ callbackId: string; + }; + }; + }; +``` + +`Unrpc` is provided to address this issue, enabling support for Typed RPC starting from the **protocol layer**: + +```ts +import { Unrpc } from 'unport'; + +// "parentPort" is a Port defined based on Unport in the previous example. +const parent = new Unrpc(parentPort); + +// Implementing an RPC method. +parent.implement('callFoo', request => ({ + user: `parent (${request.id})`, +})); + +// Emit a SYN event. +parent.port.postMessage('syn', { pid: 'parent' }); + +// Listen for the ACK message. +parent.port.onMessage('ack', async payload => { + // Call an RPC method as defined by the "child" port. + const response = await parent.call('getChildInfo', { + name: 'parent', + }); +}); +``` + +The implementation on the `child` side is as follows: + +```ts +import { Unrpc } from 'unport'; + +// "parentPort" is a Port also defined based on Unport. +const child = new Unrpc(childPort); + +child.implement('getChildInfo', request => ({ + clientKey: `[child] ${request.name}`, +})); + +// Listen for the SYN message. +child.port.onMessage('syn', async payload => { + const response = await child.call('getInfo', { id: '' }); + // Acknowledge the SYN event. + child.port.postMessage('ack', { pid: 'child' }); +}); +``` + +The types are defined as such: + +```ts +import { Unport } from 'unport'; + +export type Definition = { + parent2child: { + syn: { + pid: string; + }; + getInfo__callback: { + user: string; + }; + getChildInfo: { + name: string; + } + }; + child2parent: { + getInfo: { + id: string; + }; + getChildInfo__callback: { + clientKey: string; + }; + ack: { + pid: string; + }; + }; +}; + +export type ChildPort = Unport; +export type ParentPort = Unport; +``` + +In comparison to Unport, the only new concept to grasp is that the RPC response message key must end with `__callback`. Other than that, no additional changes are necessary! `Unrpc` also offers comprehensive type inference based on this convention; for instance, you won't be able to implement an RPC method that is meant to serve as a response. + ## 🤝 Contributing Contributions, issues and feature requests are welcome! diff --git a/__tests__/rpc.test.ts b/__tests__/rpc.test.ts new file mode 100644 index 0000000..3e22396 --- /dev/null +++ b/__tests__/rpc.test.ts @@ -0,0 +1,112 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { ChannelMessage, Unport, Unrpc, UnrpcExecutionErrorError, UnrpcNotImplementationError } from '../src'; + +export type Definition = { + parent2child: { + syn: { + pid: string; + }; + getInfo__callback: { + user: string; + }; + getChildInfo: { + name: string; + } + }; + child2parent: { + getInfo: { + id: string; + }; + getChildInfo__callback: { + clientKey: string; + }; + ack: { + pid: string; + }; + }; +}; + +describe('Unrpc', () => { + let childPort: Unport; + let parentPort: Unport; + let child: Unrpc; + let parent: Unrpc; + + beforeEach(() => { + const messageChannel = new MessageChannel(); + childPort = new Unport(); + childPort.implementChannel({ + send(message) { + messageChannel.port1.postMessage(message); + }, + accept(pipe) { + messageChannel.port1.onmessage = (message: MessageEvent) => pipe(message.data); + }, + }); + child = new Unrpc(childPort); + + parentPort = new Unport(); + parentPort.implementChannel({ + send(message) { + console.log(message); + messageChannel.port2.postMessage(message); + }, + accept(pipe) { + messageChannel.port2.onmessage = (message: MessageEvent) => pipe(message.data); + }, + }); + + parent = new Unrpc(parentPort); + }); + + it('implemented method - asynchronous implementation', async () => { + parent.implement('getInfo', async ({ id }) => ({ user: id })); + const response = child.call('getInfo', { id: 'name' }); + expect(response).resolves.toMatchObject({ user: 'name' }); + }); + + it('implemented method - synchronous implementation', async () => { + parent.implement('getInfo', ({ id }) => ({ user: id })); + const response = child.call('getInfo', { id: 'name' }); + expect(response).resolves.toMatchObject({ user: 'name' }); + }); + + it('Error: UnrpcNotImplementationError', async () => { + expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject( + new UnrpcNotImplementationError('Method getInfo is not implemented'), + ); + }); + + it('Error: UnrpcExecutionErrorError - asynchronous implementation', async () => { + parent.implement('getInfo', async () => { + // @ts-expect-error mock execution error here. + const result = foo; + return result; + }); + expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject( + new UnrpcExecutionErrorError('foo is not defined'), + ); + }); + + it('Error: UnrpcExecutionErrorError - synchronous implementation', async () => { + parent.implement('getInfo', () => { + // @ts-expect-error mock execution error here. + const result = foo; + return result; + }); + expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject( + new UnrpcExecutionErrorError('foo is not defined'), + ); + }); + + it('complicated case', async () => { + parent.implement('getInfo', async ({ id }) => ({ user: id })); + child.implement('getChildInfo', async ({ name }) => ({ clientKey: name })); + const [response1, response2] = await Promise.all([ + child.call('getInfo', { id: 'child' }), + parent.call('getChildInfo', { name: 'parent' }), + ]); + expect(response1).toMatchObject({ user: 'child' }); + expect(response2).toMatchObject({ clientKey: 'parent' }); + }); +}); diff --git a/examples/child-process-rpc/child.ts b/examples/child-process-rpc/child.ts new file mode 100644 index 0000000..3091255 --- /dev/null +++ b/examples/child-process-rpc/child.ts @@ -0,0 +1,29 @@ +import { Unport, Unrpc, ChannelMessage } from '../../lib'; +import { ChildPort } from './port'; + +// 1. Initialize a port +const childPort: ChildPort = new Unport(); + +// 2. Implement a Channel based on underlying IPC capabilities +childPort.implementChannel({ + send(message) { + process.send && process.send(message); + }, + accept(pipe) { + process.on('message', (message: ChannelMessage) => { + pipe(message); + }); + }, +}); + +// 3. Initialize a rpc client +const childRpcClient = new Unrpc(childPort); +childRpcClient.implement('getChildInfo', request => ({ + clientKey: `[child] ${request.name}`, +})); +childRpcClient.port.onMessage('syn', async payload => { + console.log('[child] [event] [syn] [result]', payload); + const response = await childRpcClient.call('getInfo', { id: '' }); + console.log('[child] [rpc] [getInfo] [response]', response); + childPort.postMessage('ack', { pid: 'child' }); +}); diff --git a/examples/child-process-rpc/parent.ts b/examples/child-process-rpc/parent.ts new file mode 100644 index 0000000..986c0c4 --- /dev/null +++ b/examples/child-process-rpc/parent.ts @@ -0,0 +1,43 @@ +import { join } from 'path'; +import { fork } from 'child_process'; +import { Unport, Unrpc, ChannelMessage } from '../../lib'; +import { ParentPort } from './port'; + +// 1. Initialize a port +const parentPort: ParentPort = new Unport(); + +// 2. Implement a Channel based on underlying IPC capabilities +const childProcess = fork(join(__dirname, './child.js')); +parentPort.implementChannel({ + send(message) { + childProcess.send(message); + }, + accept(pipe) { + childProcess.on('message', (message: ChannelMessage) => { + pipe(message); + }); + }, + destroy() { + childProcess.removeAllListeners('message'); + childProcess.kill(); + }, +}); + +// 3. Initialize a rpc client from port. +const parentRpcClient = new Unrpc(parentPort); + +parentRpcClient.implement('getInfo', request => ({ + user: `parent (${request.id})`, +})); +parentRpcClient.port.postMessage('syn', { pid: 'parent' }); +parentRpcClient.port.onMessage('ack', async payload => { + console.log('[parent] [event] [ack] [result]', payload); + const response = await parentRpcClient.call('getChildInfo', { + name: 'parent', + }); + console.log('[parent] [rpc] [getChildInfo] [response]', response); + setTimeout(() => { + console.log('destroy'); + parentPort.destroy(); + }, 1000); +}); diff --git a/examples/child-process-rpc/port.ts b/examples/child-process-rpc/port.ts new file mode 100644 index 0000000..aa20929 --- /dev/null +++ b/examples/child-process-rpc/port.ts @@ -0,0 +1,30 @@ +/* eslint-disable camelcase */ +import { Unport } from '../../lib'; + +export type Definition = { + parent2child: { + syn: { + pid: string; + }; + getInfo__callback: { + user: string; + }; + getChildInfo: { + name: string; + } + }; + child2parent: { + getInfo: { + id: string; + }; + getChildInfo__callback: { + clientKey: string; + }; + ack: { + pid: string; + }; + }; +}; + +export type ChildPort = Unport; +export type ParentPort = Unport; diff --git a/package.json b/package.json index 6e0bde6..32ab6aa 100644 --- a/package.json +++ b/package.json @@ -20,8 +20,8 @@ "build": "run-s build:cjs build:esm build:example", "dev:cjs": "npm run build:cjs -- --watch", "dev:esm": "npm run build:esm -- --watch", - "build:cjs": "tsc -p tsconfig.json --module commonjs --outDir lib", - "build:esm": "tsc -p tsconfig.json --module ES2015 --outDir esm", + "build:cjs": "tsc -p tsconfig.src.json --module commonjs --outDir lib", + "build:esm": "tsc -p tsconfig.src.json --module ES2015 --outDir esm", "dev:example": "tsc -p tsconfig.examples.json --watch", "build:example": "tsc -p tsconfig.examples.json", "prepublishOnly": "npm run build", diff --git a/src/index.ts b/src/index.ts index 5669b1d..b6f5810 100755 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-classes-per-file */ /** * @license * Copyright (c) ULIVZ. All Rights Reserved. @@ -100,30 +101,85 @@ type ReverseDirection< * `Payload` type is a utility to extract the payload type of a specific message, given its * direction and the name. */ -type Payload, U extends keyof T[D]> = T[D][U]; +export type Payload, U extends keyof T[D]> = T[D][U]; /** * `Callback` is a type representing a generic function */ type Callback = (...args: T) => U; - +/** + * A base interface used to describe a Message Port + */ interface Port> { - postMessage(t: U, p: Payload): void; + // eslint-disable-next-line no-use-before-define + postMessage(t: U, p?: Payload, extra?: Pick): void; onMessage]>( t: U, handler: Callback<[Payload, U>]>, ): void; } + +export type EnsureString = T extends string ? T : never; +export type CallbackSuffix = '__callback'; + +/** + * A generic type used to infer the return value type of an Rpc call. For example, when you call + * "foo" on one end of the port, the return value is of the type defined by "foo__callback" on + * the other end. + */ +export type CallbackPayload< + T extends MessageDefinition, + D extends Direction, + U extends keyof T[D], + S extends EnsureString = EnsureString +> = + `${S}${CallbackSuffix}` extends keyof T[ReverseDirection] + ? Payload, `${S}${CallbackSuffix}`> : unknown; + +/** + * We filtered the messages, only the message without {@type {CallbackSuffix}} is defined rpc method. + */ +export type RpcMethod, U extends keyof T[D]> + = U extends `${infer A}${CallbackSuffix}` ? never : U; + +/** + * A base interface used to describe a Rpc client instance. + */ +export interface Rpc, > { + call(t: RpcMethod, p: Payload): Promise>; + implement]>( + t: RpcMethod, R>, + handler: Callback< + [Payload, R>], + CallbackPayload, R> | Promise, R>> + >, + ): void; +} + +// eslint-disable-next-line no-shadow +export const enum ChannelMessageErrorCode { + NotImplemented = 'NOT_IMPLEMENTED', + ExecutionError = 'EXECUTION_ERROR', +} + +/** + * Different messages or methods define different Responses, so it is an any + */ +export type Result = any; + /** * `ChannelMessage` interface defines the structure of a message that can be sent * or received through an `Channel`. * * It contains a `t` field for the name of the message, and a `p` field for the payload - * of the message. + * of the message, `d` for the message id. */ export interface ChannelMessage { - t: string | number | symbol; - p: any; _$: 'un'; + t: string | number | symbol; /* message key */ + p?: Result; /* message payload */ + d?: number; /* message id */ + e?: string; /* error message */ + c?: ChannelMessageErrorCode; /* error code */ } /** @@ -147,17 +203,30 @@ export interface EnhancedChannel extends Channel { /** * Expose Unport class */ -export class Unport> -implements Port> { +export class Unport< + T extends MessageDefinition, + U extends InferPorts +> implements Port> { private handlers: Record[]> = {}; public channel?: EnhancedChannel; - implementChannel(channel: Channel | (() => Channel)): EnhancedChannel { + public channelReceiveMessageListener?: (message: ChannelMessage) => unknown; + + public setChannelReceiveMessageListener(listener: (message: ChannelMessage) => unknown) { + if (typeof listener === 'function') { + this.channelReceiveMessageListener = listener; + } + } + + public implementChannel(channel: Channel | (() => Channel)): EnhancedChannel { // @ts-expect-error We will assign it immediately this.channel = typeof channel === 'function' ? channel() : channel; if (typeof this.channel === 'object' && typeof this.channel.send === 'function') { this.channel.pipe = (message: ChannelMessage) => { + if (typeof this.channelReceiveMessageListener === 'function') { + this.channelReceiveMessageListener(message); + } if (typeof message === 'object' && message._$ === 'un') { const { t, p } = message; const handler = this.handlers[t]; @@ -175,23 +244,150 @@ implements Port> { return this.channel; } - postMessage: Port>['postMessage'] = (t, p) => { + public postMessage: Port>['postMessage'] = (t, p, extra) => { if (!this.channel) { throw new Error('[2] Port is not implemented or has been destroyed'); } - this.channel.send({ t, p, _$: 'un' }); + this.channel.send({ ...(extra || {}), t, p, _$: 'un' }); }; - onMessage: Port>['onMessage'] = (t, handler) => { + public onMessage: Port>['onMessage'] = (t, handler) => { if (!this.handlers[t]) { this.handlers[t] = []; } this.handlers[t].push(handler); }; - destroy() { + public destroy() { this.handlers = {}; this.channel?.destroy && this.channel.destroy(); delete this.channel; } } + +const CALLBACK_SUFFIX: CallbackSuffix = '__callback'; + +export class UnrpcNotImplementationError extends Error { + constructor(message?: string) { + super(message); + this.name = ChannelMessageErrorCode.NotImplemented; + } +} + +export class UnrpcExecutionErrorError extends Error { + constructor(message?: string) { + super(message); + this.name = ChannelMessageErrorCode.ExecutionError; + } +} + +/** + * Check if the given object is a Promise or PromiseLike. + * + * @param value - The object to check. + * @returns True if the object is a Promise or PromiseLike, otherwise false. + */ +function isPromise(value: any): value is Promise { + // Check if the value is an object and not null, then check if it has a 'then' function + return !!value && (typeof value === 'object' || typeof value === 'function') && typeof value.then === 'function'; +} + +/** + * Expose Unrpc class + */ +export class Unrpc> implements Rpc> { + private callbackMap = new Map, Callback<[any]>]>(); + + private currentCallbackId = 0; + + private implementations = new Map>(); + + constructor(public readonly port: Unport) { + /** + * The implementation of Rpc is based on the message protocol layer {@type {ChannelMessage}} at {@type {Unport}}. + */ + this.port.setChannelReceiveMessageListener(message => { + if (typeof message === 'object' && message._$ === 'un') { + const { t, p, d } = message; + const messageKey = String(t); + /** + * If a message contains "d" field, it is considered a message sent by Rpc. + * Therefore, messages sent directly by {@type {Unport#postMessage}} will not be affected in any way. + */ + if (typeof d === 'number') { + /** + * If a message ends with {@type {CALLBACK_SUFFIX}}, it is considered a Response message of Rpc + */ + if (messageKey.endsWith(CALLBACK_SUFFIX)) { + const callbacks = this.callbackMap.get(d); + if (callbacks) { + const [resolve, reject] = callbacks; + if (message.c) { + console.log('message.c', message.c); + + switch (message.c) { + case ChannelMessageErrorCode.NotImplemented: + reject(new UnrpcNotImplementationError(message.e)); break; + case ChannelMessageErrorCode.ExecutionError: + reject(new UnrpcExecutionErrorError(message.e)); break; + default: + reject(new Error(message.e)); + } + } else { + resolve(p); + } + } + } else { + // If a message is not a Callback, it is considered a Request, so we need to handle it. + const handler = this.implementations.get(t); + const callbackMessageKey = `${messageKey}${CALLBACK_SUFFIX}` as keyof T[InferDirectionByPort]; + if (handler) { + const handleCallback = (result: Result) => { + this.port.postMessage(callbackMessageKey, result, { + d, + }); + }; + const handleExecutionError = (e: Result) => { + this.port.postMessage(callbackMessageKey, undefined, { + d, + c: ChannelMessageErrorCode.ExecutionError, + e: e instanceof Error ? e.message : String(e), + }); + }; + let result: Result; + try { + result = handler(p); + } catch (e) { + handleExecutionError(e); + } + if (isPromise(result)) { + result.then(handleCallback).catch(handleExecutionError); + } else { + handleCallback(result); + } + } else { + this.port.postMessage(callbackMessageKey, undefined, { + d, + c: ChannelMessageErrorCode.NotImplemented, + e: `Method ${messageKey} is not implemented`, + }); + } + } + } + } + }); + } + + public call: Rpc>['call'] = async (t, p) => { + const callbackId = this.currentCallbackId++; + const response = new Promise, typeof t>>((resolve, reject) => { + this.callbackMap.set(callbackId, [resolve, reject]); + }); + this.port.postMessage(t, p, { d: callbackId }); + return response; + }; + + public implement: Rpc>['implement'] = (t, p) => { + this.implementations.set(t, p); + }; +} diff --git a/tsconfig.examples.json b/tsconfig.examples.json index 23789e3..7297604 100755 --- a/tsconfig.examples.json +++ b/tsconfig.examples.json @@ -1,5 +1,4 @@ { - "extends": "./tsconfig.json", "compilerOptions": { "composite": true, "rootDir": "examples", @@ -13,7 +12,7 @@ ], "references": [ { - "path": "./tsconfig.json" + "path": "./tsconfig.src.json" } ] } \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index e20b024..2ad3610 100755 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,47 +1,17 @@ { "compilerOptions": { - "strict": true, - "strictFunctionTypes": true, - "incremental": true, - "noUncheckedIndexedAccess": true, - "allowSyntheticDefaultImports": true, - "esModuleInterop": true, - "experimentalDecorators": true, - "allowJs": false, - "alwaysStrict": true, - "skipLibCheck": true, - "module": "commonjs", - "moduleResolution": "node", - "noEmitOnError": false, - "noImplicitThis": true, - "noImplicitAny": true, - "rootDir": "src", - "outDir": "./lib", - "sourceMap": true, - "target": "ES2019", - "typeRoots": [ - "node_modules/@types", - "types" - ], - "declaration": true, - "declarationMap": true, - "resolveJsonModule": true, - "downlevelIteration": true, - "lib": [ - "es5", - "es6" - ], - "composite": true, "paths": { - "unport": ["./src/index.ts"] + "unport": [ + "./src/index.ts" + ] } }, - "include": [ - "src", - "types", - "*.d.ts" - ], - "exclude": [ - "node_modules" + "references": [ + { + "path": "./tsconfig.src.json" + }, + { + "path": "./tsconfig.examples.json" + } ] } \ No newline at end of file diff --git a/tsconfig.src.json b/tsconfig.src.json new file mode 100755 index 0000000..eb4d7f8 --- /dev/null +++ b/tsconfig.src.json @@ -0,0 +1,44 @@ +{ + "compilerOptions": { + "strict": true, + "strictFunctionTypes": true, + "incremental": true, + "noUncheckedIndexedAccess": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "experimentalDecorators": true, + "allowJs": false, + "alwaysStrict": true, + "skipLibCheck": true, + "module": "commonjs", + "moduleResolution": "node", + "noEmitOnError": false, + "noImplicitThis": true, + "noImplicitAny": true, + "rootDir": "src", + "outDir": "./lib", + "sourceMap": true, + "target": "ES2019", + "typeRoots": [ + "node_modules/@types", + "types" + ], + "declaration": true, + "declarationMap": true, + "resolveJsonModule": true, + "downlevelIteration": true, + "lib": [ + "es5", + "es6" + ], + "composite": true + }, + "include": [ + "src", + "types", + "*.d.ts" + ], + "exclude": [ + "node_modules" + ] +} \ No newline at end of file