Skip to content

Commit

Permalink
feat: implement Runner proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
Dimava committed Oct 16, 2024
1 parent e8f6143 commit ec4cd7e
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ export interface TinypoolWorker {
threadId: number
}

export type TinypoolRunner<T> = {
[K in keyof T as T[K] extends (...args: any[]) => any
? K
: never]: T[K] extends (...args: infer A) => infer R
? [R] extends [Promise<any>]
? true extends R & 'any'
? (...args: A) => Promise<any>
: T[K]
: (...args: A) => Promise<R>
: never
}

/**
* Tinypool's internal messaging between main thread and workers.
* - Utilizers can use `__tinypool_worker_message__` property to identify
Expand Down
3 changes: 3 additions & 0 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) {
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}

// doesn't work with child_process
// Array.isArray(task) && '__tinypool_args__' in task ? task : [task]
const result = await handler(task)
response = {
source: 'port',
Expand Down
4 changes: 3 additions & 1 deletion src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ function onMessage(
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
let result = await handler(task)
const args =
Array.isArray(task) && '__tinypool_args__' in task ? task : [task]
let result = await handler(...args)
if (isMovable(result)) {
transferList = transferList.concat(result[kTransferable])
result = result[kValue]
Expand Down
31 changes: 31 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
type TinypoolData,
type TinypoolWorker,
type TinypoolChannel,
type TinypoolRunner,
} from './common'
import ThreadWorker from './runtime/thread-worker'
import ProcessWorker from './runtime/process-worker'
Expand Down Expand Up @@ -1159,6 +1160,36 @@ class Tinypool extends EventEmitterAsyncResource {
})
}

withRunner<Module>(): this & { runner: TinypoolRunner<Module> } {
return this as any
}

get runner(): TinypoolRunner<{ [K in never]: never }> {
const run_worker_thread =
(_: any, name: string) =>
(...a: any[]) => {
return this.run(Object.assign(a, { __tinypool_args__: true }), {
name: name,
})
}
const run_child_process =
(_: any, name: string) =>
(...a: any[]) => {
if (a.length > 1)
throw new Error(
'TinypoolRunner doesn’t support args array in child_process runtime'
)
return this.run(a[0], { name: name })
}

return new Proxy({} as TinypoolRunner<{ [K in never]: never }>, {
get:
this.#pool.options.runtime === 'child_process'
? run_child_process
: run_worker_thread,
})
}

async destroy() {
await this.#pool.destroy()
this.emitDestroy()
Expand Down
15 changes: 15 additions & 0 deletions test/fixtures/multiple.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function a(): 'a'

export function b(): 'b'

export default a

export function foobar<V>(o: { foobar: V }): V
export function asyncFoobar<V>(o: { foobar: V }): Promise<V>

export function args<A>(...args: A[]): A
export function asyncArgs<A extends any[]>(...args: A): Promise<A>

export const digit: 4

export function returnsAny(): any
22 changes: 22 additions & 0 deletions test/fixtures/multiple.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,25 @@ export function b() {
}

export default a

export function foobar({ foobar }) {
return foobar
}
export function asyncFoobar({ foobar }) {
return foobar
}

export function args(...args) {
return args
}

// eslint-disable-next-line @typescript-eslint/require-await
export async function asyncArgs(...args) {
return args
}

export const digit = 4

export function returnsAny() {
return 'any'
}
54 changes: 54 additions & 0 deletions test/runner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { dirname, resolve } from 'node:path'
import Tinypool from 'tinypool'
import { fileURLToPath } from 'node:url'

const __dirname = dirname(fileURLToPath(import.meta.url))

test('runner worker_threads test', async () => {
const { runner } = new Tinypool({
filename: resolve(__dirname, 'fixtures/multiple.js'),
runtime: 'worker_threads',
}).withRunner<typeof import('./fixtures/multiple.js')>()

expect((await runner.a()) satisfies 'a').toBe('a')
expect((await runner.b()) satisfies 'b').toBe('b')
expect(await runner.foobar({ foobar: 1 })).toBe(1)
expect((await runner.asyncFoobar({ foobar: 1 })) satisfies 1).toBe(1)
expect(await runner.args(1, 2, 3, { foobar: 1 })).toEqual([
1,
2,
3,
{ foobar: 1 },
])
expect(
(await runner.asyncArgs(1, 2, 3, { foobar: 1 })) satisfies [
1,
2,
3,
{ foobar: 1 },
]
).toEqual([1, 2, 3, { foobar: 1 }])
})

test('runner child_process test', async () => {
const { runner } = new Tinypool({
filename: resolve(__dirname, 'fixtures/multiple.js'),
runtime: 'child_process',
}).withRunner<typeof import('./fixtures/multiple.js')>()

expect((await runner.a()) satisfies 'a').toBe('a')
expect((await runner.b()) satisfies 'b').toBe('b')
expect(await runner.foobar({ foobar: 1 })).toBe(1)
expect((await runner.asyncFoobar({ foobar: 1 })) satisfies 1).toBe(1)
expect(await runner.args('baz')).toStrictEqual(['baz'])
expect(
(await runner.asyncArgs('baz' as const)) satisfies ['baz']
).toStrictEqual(['baz'])

expect(() => runner.args(1, 2, 3)).toThrow(
'doesn’t support args array in child_process runtime'
)
expect(() => runner.asyncArgs(1, 2, 3)).toThrow(
'doesn’t support args array in child_process runtime'
)
})

0 comments on commit ec4cd7e

Please sign in to comment.