Skip to content

Commit

Permalink
Parallelize task workers
Browse files Browse the repository at this point in the history
Close #106
  • Loading branch information
dahlia committed Sep 21, 2024
1 parent 276c412 commit 0cc5ffe
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 17 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ To be released.
the activity to be sent only once. It had added Object Integrity Proofs
to the activity for every recipient before.

- Added `ParallelMessageQueue` class. [[#106]]

- WebFinger responses now include <http://webfinger.net/rel/avatar> links
if the `Actor` object returned by the actor dispatcher has `icon`/`icons`
property.

- `DenoKvMessageQueue` now implements `Disposable` interface.

- The `fedify inbox` command now sends `Delete(Application)` activities when
it's terminated so that the peers can clean up data related to the temporary
actor. [[#135]]
Expand All @@ -123,6 +127,7 @@ To be released.
- `["fedify", "sig", "ld"]`

[Linked Data Signatures]: https://web.archive.org/web/20170923124140/https://w3c-dvcg.github.io/ld-signatures/
[#106]: https://github.com/dahlia/fedify/issues/106
[#135]: https://github.com/dahlia/fedify/issues/135
[#137]: https://github.com/dahlia/fedify/issues/137

Expand Down
10 changes: 10 additions & 0 deletions docs/manual/inbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ the `createFederation()` function.
> [!NOTE]
> Activities with invalid signatures/proofs are silently ignored and not queued.
> [!TIP]
> If your inbox listeners are mostly I/O-bound, consider parallelizing
> message processing by using the `ParallelMessageQueue` class. For more
> information, see the [*Parallel message processing*
> section](./mq.md#parallel-message-processing).
>
> If your inbox listeners are CPU-bound, consider running multiple nodes of
> your application so that each node can process messages in parallel with
> the shared message queue.
[`RedisMessageQueue`]: https://jsr.io/@fedify/redis/doc/mq/~/RedisMessageQueue
[`@fedify/redis`]: https://github.com/dahlia/fedify-redis

Expand Down
49 changes: 46 additions & 3 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const federation = createFederation<void>({
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new InProcessMessageQueue(),
queue: new InProcessMessageQueue(), // [!code highlight]
// ... other options
});
~~~~
Expand All @@ -79,7 +79,7 @@ import { DenoKvMessageQueue } from "@fedify/fedify/x/deno";

const kv = await Deno.openKv();
const federation = createFederation<void>({
queue: new DenoKvMessageQueue(kv),
queue: new DenoKvMessageQueue(kv), // [!code highlight]
// ... other options
});
~~~~
Expand Down Expand Up @@ -116,7 +116,7 @@ const federation = createFederation<void>({
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new RedisMessageQueue(() => new Redis()),
queue: new RedisMessageQueue(() => new Redis()), // [!code highlight]
// ... other options
});
~~~~
Expand Down Expand Up @@ -179,3 +179,46 @@ custom `MessageQueue`:

However, you don't need to implement retry logic yourself, as Fedify handles
retrying failed messages automatically.


Parallel message processing
---------------------------

*This API is available since Fedify 1.0.0.*

Fedify supports parallel message processing by running multiple workers
concurrently. To enable parallel processing, wrap your `MessageQueue` with
`ParallelMessageQueue`, a special implementation of the `MessageQueue` interface
designed to process messages in parallel. It acts as a decorator for another
`MessageQueue` implementation, allowing for concurrent processing of messages
up to a specified number of workers:

~~~~ typescript twoslash
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { ParallelMessageQueue } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";

const baseQueue = new RedisMessageQueue(() => new Redis());

// Use parallelQueue in your Federation configuration
const federation = createFederation<void>({
queue: new ParallelMessageQueue(baseQueue, 5), // [!code highlight]
// ... other options
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
});
~~~~

> [!NOTE]
> The workers do not run in truly parallel, in the sense that they are not
> running in separate threads or processes. They are running in the same
> process, but are scheduled to run in parallel. Hence, this is useful for
> I/O-bound tasks, but not for CPU-bound tasks, which is okay for Fedify's
> workloads.
>
> If your [inbox listeners](./inbox.md) are CPU-bound, you should consider
> running multiple nodes of your application so that each node can process
> messages in parallel with the shared message queue.
69 changes: 67 additions & 2 deletions src/federation/mq.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { assertEquals, assertGreater } from "@std/assert";
import { assertEquals, assertGreater, assertGreaterOrEqual } from "@std/assert";
import { delay } from "@std/async/delay";
import { test } from "../testing/mod.ts";
import { InProcessMessageQueue } from "./mq.ts";
import {
InProcessMessageQueue,
type MessageQueue,
ParallelMessageQueue,
} from "./mq.ts";

test("InProcessMessageQueue", async (t) => {
const mq = new InProcessMessageQueue();
Expand Down Expand Up @@ -38,6 +42,67 @@ test("InProcessMessageQueue", async (t) => {
});
});

const queues: Record<string, () => Promise<MessageQueue>> = {
InProcessMessageQueue: () => Promise.resolve(new InProcessMessageQueue()),
};
if (
"Deno" in globalThis && "openKv" in globalThis.Deno &&
typeof globalThis.Deno.openKv === "function"
) {
const { DenoKvMessageQueue } = await import("../x/denokv.ts");
queues.DenoKvMessageQueue = async () =>
new DenoKvMessageQueue(
await globalThis.Deno.openKv(),
);
}

for (const mqName in queues) {
test(`ParallelMessageQueue [${mqName}]`, async (t) => {
const mq = await queues[mqName]();
const workers = new ParallelMessageQueue(mq, 5);

const messages: string[] = [];
workers.listen(async (message: string) => {
for (let i = 0, cnt = 5 + Math.random() * 5; i < cnt; i++) {
await delay(250);
}
messages.push(message);
});

await t.step("enqueue() [single]", async () => {
await mq.enqueue("Hello, world!");
});

await waitFor(() => messages.length > 0, 15_000);

await t.step("listen() [single]", () => {
assertEquals(messages, ["Hello, world!"]);
});

messages.pop();

await t.step("enqueue() [multiple]", async () => {
for (let i = 0; i < 20; i++) {
await mq.enqueue(`Hello, ${i}!`);
}
});

await t.step("listen() [multiple]", async () => {
await delay(10 * 250 + 500);
assertGreaterOrEqual(messages.length, 5);
await waitFor(() => messages.length >= 20, 15_000);
assertEquals(messages.length, 20);
});

await waitFor(() => messages.length >= 20, 15_000);

if (Symbol.dispose in mq) {
const dispose = mq[Symbol.dispose];
if (typeof dispose === "function") dispose.call(mq);
}
});
}

async function waitFor(
predicate: () => boolean,
timeoutMs: number,
Expand Down
74 changes: 69 additions & 5 deletions src/federation/mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ export interface MessageQueue {
export class InProcessMessageQueue implements MessageQueue {
#handlers: ((message: any) => Promise<void> | void)[] = [];

/**
* {@inheritDoc Queue.enqueue}
*/
enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> {
const delay = options?.delay == null
? 0
Expand All @@ -56,10 +53,77 @@ export class InProcessMessageQueue implements MessageQueue {
return Promise.resolve();
}

listen(handler: (message: any) => Promise<void> | void): void {
this.#handlers.push(handler);
}
}

type Uuid = ReturnType<typeof crypto.randomUUID>;

/**
* A message queue that processes messages in parallel. It takes another
* {@link MessageQueue}, and processes messages in parallel up to a certain
* number of workers.
*
* Actually, it's rather a decorator than a queue itself.
*
* Note that the workers do not run in truly parallel, in the sense that they
* are not running in separate threads or processes. They are running in the
* same process, but are scheduled to run in parallel. Hence, this is useful
* for I/O-bound tasks, but not for CPU-bound tasks, which is okay for Fedify's
* workloads.
*
* @since 1.0.0
*/
export class ParallelMessageQueue implements MessageQueue {
readonly queue: MessageQueue;
readonly workers: number;

/**
* {@inheritDoc Queue.listen}
* Constructs a new {@link ParallelMessageQueue} with the given queue and
* number of workers.
* @param queue The message queue to use under the hood. Note that
* {@link ParallelMessageQueue} cannot be nested.
* @param workers The number of workers to process messages in parallel.
* @throws {TypeError} If the given queue is an instance of
* {@link ParallelMessageQueue}.
*/
constructor(queue: MessageQueue, workers: number) {
if (queue instanceof ParallelMessageQueue) {
throw new TypeError("Cannot nest ParallelMessageQueue.");
}
this.queue = queue;
this.workers = workers;
}

enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> {
return this.queue.enqueue(message, options);
}

listen(handler: (message: any) => Promise<void> | void): void {
this.#handlers.push(handler);
const workers = new Map<Uuid, Promise<Uuid>>();
this.queue.listen(async (message) => {
while (workers.size >= this.workers) {
const consumedId = await Promise.any(workers.values());
workers.delete(consumedId);
}
const workerId = crypto.randomUUID();
const promise = this.#work(workerId, handler, message);
workers.set(workerId, promise);
});
}

async #work(
workerId: Uuid,
handler: (message: any) => Promise<void> | void,
message: any,
): Promise<Uuid> {
await this.#sleep(0);
await handler(message);
return workerId;
}

#sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
12 changes: 5 additions & 7 deletions src/x/denokv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class DenoKvStore implements KvStore {
/**
* Represents a message queue adapter that uses Deno KV store.
*/
export class DenoKvMessageQueue implements MessageQueue {
export class DenoKvMessageQueue implements MessageQueue, Disposable {
#kv: Deno.Kv;

/**
Expand All @@ -78,9 +78,6 @@ export class DenoKvMessageQueue implements MessageQueue {
this.#kv = kv;
}

/**
* {@inheritDoc MessageQueue.enqueue}
*/
async enqueue(
// deno-lint-ignore no-explicit-any
message: any,
Expand All @@ -94,11 +91,12 @@ export class DenoKvMessageQueue implements MessageQueue {
);
}

/**
* {@inheritDoc MessageQueue.listen}
*/
// deno-lint-ignore no-explicit-any
listen(handler: (message: any) => void | Promise<void>): void {
this.#kv.listenQueue(handler);
}

[Symbol.dispose](): void {
this.#kv.close();
}
}

0 comments on commit 0cc5ffe

Please sign in to comment.