From ec4cd7e19abfc323d15c2d3d27d352af39b770fe Mon Sep 17 00:00:00 2001 From: Dimava Date: Wed, 16 Oct 2024 23:37:47 +0300 Subject: [PATCH] feat: implement Runner proxy --- src/common.ts | 12 +++++++++ src/entry/process.ts | 3 +++ src/entry/worker.ts | 4 ++- src/index.ts | 31 +++++++++++++++++++++ test/fixtures/multiple.d.ts | 15 +++++++++++ test/fixtures/multiple.js | 22 +++++++++++++++ test/runner.test.ts | 54 +++++++++++++++++++++++++++++++++++++ 7 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 test/fixtures/multiple.d.ts create mode 100644 test/runner.test.ts diff --git a/src/common.ts b/src/common.ts index 70ba648..1f58a48 100644 --- a/src/common.ts +++ b/src/common.ts @@ -30,6 +30,18 @@ export interface TinypoolWorker { threadId: number } +export type TinypoolRunner = { + [K in keyof T as T[K] extends (...args: any[]) => any + ? K + : never]: T[K] extends (...args: infer A) => infer R + ? [R] extends [Promise] + ? true extends R & 'any' + ? (...args: A) => Promise + : T[K] + : (...args: A) => Promise + : never +} + /** * Tinypool's internal messaging between main thread and workers. * - Utilizers can use `__tinypool_worker_message__` property to identify diff --git a/src/entry/process.ts b/src/entry/process.ts index f620f14..36206e3 100644 --- a/src/entry/process.ts +++ b/src/entry/process.ts @@ -71,6 +71,9 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) { if (handler === null) { throw new Error(`No handler function exported from ${filename}`) } + + // doesn't work with child_process + // Array.isArray(task) && '__tinypool_args__' in task ? task : [task] const result = await handler(task) response = { source: 'port', diff --git a/src/entry/worker.ts b/src/entry/worker.ts index 6e41f8e..7100217 100644 --- a/src/entry/worker.ts +++ b/src/entry/worker.ts @@ -102,7 +102,9 @@ function onMessage( if (handler === null) { throw new Error(`No handler function exported from ${filename}`) } - let result = await handler(task) + const args = + Array.isArray(task) && '__tinypool_args__' in task ? task : [task] + let result = await handler(...args) if (isMovable(result)) { transferList = transferList.concat(result[kTransferable]) result = result[kValue] diff --git a/src/index.ts b/src/index.ts index 2309ed8..bf880f0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -32,6 +32,7 @@ import { type TinypoolData, type TinypoolWorker, type TinypoolChannel, + type TinypoolRunner, } from './common' import ThreadWorker from './runtime/thread-worker' import ProcessWorker from './runtime/process-worker' @@ -1159,6 +1160,36 @@ class Tinypool extends EventEmitterAsyncResource { }) } + withRunner(): this & { runner: TinypoolRunner } { + return this as any + } + + get runner(): TinypoolRunner<{ [K in never]: never }> { + const run_worker_thread = + (_: any, name: string) => + (...a: any[]) => { + return this.run(Object.assign(a, { __tinypool_args__: true }), { + name: name, + }) + } + const run_child_process = + (_: any, name: string) => + (...a: any[]) => { + if (a.length > 1) + throw new Error( + 'TinypoolRunner doesn’t support args array in child_process runtime' + ) + return this.run(a[0], { name: name }) + } + + return new Proxy({} as TinypoolRunner<{ [K in never]: never }>, { + get: + this.#pool.options.runtime === 'child_process' + ? run_child_process + : run_worker_thread, + }) + } + async destroy() { await this.#pool.destroy() this.emitDestroy() diff --git a/test/fixtures/multiple.d.ts b/test/fixtures/multiple.d.ts new file mode 100644 index 0000000..69788d2 --- /dev/null +++ b/test/fixtures/multiple.d.ts @@ -0,0 +1,15 @@ +export function a(): 'a' + +export function b(): 'b' + +export default a + +export function foobar(o: { foobar: V }): V +export function asyncFoobar(o: { foobar: V }): Promise + +export function args(...args: A[]): A +export function asyncArgs(...args: A): Promise + +export const digit: 4 + +export function returnsAny(): any diff --git a/test/fixtures/multiple.js b/test/fixtures/multiple.js index 916871f..7ed7416 100644 --- a/test/fixtures/multiple.js +++ b/test/fixtures/multiple.js @@ -9,3 +9,25 @@ export function b() { } export default a + +export function foobar({ foobar }) { + return foobar +} +export function asyncFoobar({ foobar }) { + return foobar +} + +export function args(...args) { + return args +} + +// eslint-disable-next-line @typescript-eslint/require-await +export async function asyncArgs(...args) { + return args +} + +export const digit = 4 + +export function returnsAny() { + return 'any' +} diff --git a/test/runner.test.ts b/test/runner.test.ts new file mode 100644 index 0000000..b115f5b --- /dev/null +++ b/test/runner.test.ts @@ -0,0 +1,54 @@ +import { dirname, resolve } from 'node:path' +import Tinypool from 'tinypool' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +test('runner worker_threads test', async () => { + const { runner } = new Tinypool({ + filename: resolve(__dirname, 'fixtures/multiple.js'), + runtime: 'worker_threads', + }).withRunner() + + expect((await runner.a()) satisfies 'a').toBe('a') + expect((await runner.b()) satisfies 'b').toBe('b') + expect(await runner.foobar({ foobar: 1 })).toBe(1) + expect((await runner.asyncFoobar({ foobar: 1 })) satisfies 1).toBe(1) + expect(await runner.args(1, 2, 3, { foobar: 1 })).toEqual([ + 1, + 2, + 3, + { foobar: 1 }, + ]) + expect( + (await runner.asyncArgs(1, 2, 3, { foobar: 1 })) satisfies [ + 1, + 2, + 3, + { foobar: 1 }, + ] + ).toEqual([1, 2, 3, { foobar: 1 }]) +}) + +test('runner child_process test', async () => { + const { runner } = new Tinypool({ + filename: resolve(__dirname, 'fixtures/multiple.js'), + runtime: 'child_process', + }).withRunner() + + expect((await runner.a()) satisfies 'a').toBe('a') + expect((await runner.b()) satisfies 'b').toBe('b') + expect(await runner.foobar({ foobar: 1 })).toBe(1) + expect((await runner.asyncFoobar({ foobar: 1 })) satisfies 1).toBe(1) + expect(await runner.args('baz')).toStrictEqual(['baz']) + expect( + (await runner.asyncArgs('baz' as const)) satisfies ['baz'] + ).toStrictEqual(['baz']) + + expect(() => runner.args(1, 2, 3)).toThrow( + 'doesn’t support args array in child_process runtime' + ) + expect(() => runner.asyncArgs(1, 2, 3)).toThrow( + 'doesn’t support args array in child_process runtime' + ) +})