Skip to content

Commit

Permalink
Let Federation.startQueue() take an AbortSignal
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Sep 22, 2024
1 parent 07eeddb commit 8793b61
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 70 deletions.
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ To be released.
the activity to be sent only once. It had added Object Integrity Proofs
to the activity for every recipient before.

- The message queue is now able to be stopped manually by providing
an `AbortSignal` object to the `Federation.startQueue()` method.

- Added the optional second parameter to `Federation.startQueue()` method,
which is a `FederationStartQueueOptions` object.
- Added `FederationStartQueueOptions` interface.
- Added the optional second parameter to `MessageQueue.listen()` method,
which is a `MessageQueueListenOptions` object.
- Added `MessageQueueListenOptions` interface.
- The return type of `MessageQueue.listen()` method became `Promise<void>`
(was `void`).

- Added `ParallelMessageQueue` class. [[#106]]

- WebFinger responses now include <http://webfinger.net/rel/avatar> links
Expand Down
27 changes: 21 additions & 6 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ Create a class that implements the `MessageQueue` interface, which includes
the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods:

~~~~ typescript twoslash
import type { MessageQueue, MessageQueueEnqueueOptions } from "@fedify/fedify";
import type {
MessageQueue,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from "@fedify/fedify";

class CustomMessageQueue implements MessageQueue {
async enqueue(
Expand All @@ -147,7 +151,10 @@ class CustomMessageQueue implements MessageQueue {
// Implementation here
}

listen(handler: (message: any) => Promise<void> | void): void {
async listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
// Implementation here
}
}
Expand All @@ -166,6 +173,10 @@ This method should start a process that listens for new messages.
When a message is received, it should call the provided `handler` function.
Ensure proper error handling to prevent the listener from crashing.

> [!NOTE]
> A `Promise` object it returns should never resolve unless the given
> `~MessageQueueListenOptions.signal` is triggered.
### Consider additional features

Here's a list of additional features you might want to implement in your
Expand Down Expand Up @@ -255,7 +266,9 @@ const federation = createFederation<void>({
// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (Deno.env.get("NODE_TYPE") === "worker") {
await federation.startQueue();
const controller = new AbortController();
Deno.addSignalListener("SIGINT", () => controller.abort());
await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

Expand All @@ -265,7 +278,7 @@ import type { KvStore } from "@fedify/fedify";
import { createFederation } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";
import { env } from "node:process";
import process from "node:process";
const federation = createFederation<void>({
queue: new RedisMessageQueue(() => new Redis()),
Expand All @@ -278,8 +291,10 @@ const federation = createFederation<void>({
// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (env.NODE_TYPE === "worker") {
await federation.startQueue();
if (process.env.NODE_TYPE === "worker") {
const controller = new AbortController();
process.on("SIGINT", () => controller.abort());
await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

Expand Down
4 changes: 2 additions & 2 deletions docs/package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"devDependencies": {
"@deno/kv": "^0.8.2",
"@fedify/fedify": "1.0.0-dev.408",
"@fedify/redis": "^0.1.1",
"@fedify/fedify": "1.0.0-dev.409",
"@fedify/redis": "0.2.0-dev.10",
"@hono/node-server": "^1.12.2",
"@js-temporal/polyfill": "^0.4.4",
"@logtape/logtape": "^0.5.1",
Expand Down
44 changes: 11 additions & 33 deletions docs/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion src/federation/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import type {
} from "./callback.ts";
import type { Context, RequestContext } from "./context.ts";

/**
* Options for {@link Federation.startQueue} method.
* @since 1.0.0
*/
export interface FederationStartQueueOptions {
/**
* The signal to abort the task queue.
*/
signal?: AbortSignal;
}

/**
* An object that registers federation-related business logic and dispatches
* requests to the appropriate handlers.
Expand All @@ -33,8 +44,12 @@ export interface Federation<TContextData> {
* This method is useful when you set the `manuallyStartQueue` option to
* `true` in the {@link createFederation} function.
* @param contextData The context data to pass to the context.
* @param options Additional options for starting the queue.
*/
startQueue(contextData: TContextData): Promise<void>;
startQueue(
contextData: TContextData,
options?: FederationStartQueueOptions,
): Promise<void>;

/**
* Create a new context.
Expand Down
19 changes: 14 additions & 5 deletions src/federation/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import type {
CollectionCallbackSetters,
Federation,
FederationFetchOptions,
FederationStartQueueOptions,
InboxListenerSetters,
ObjectCallbackSetters,
} from "./federation.ts";
Expand Down Expand Up @@ -365,12 +366,18 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
createExponentialBackoffPolicy();
}

#startQueue(ctxData: TContextData) {
async #startQueue(
ctxData: TContextData,
signal?: AbortSignal,
): Promise<void> {
if (this.queue != null && !this.queueStarted) {
const logger = getLogger(["fedify", "federation", "queue"]);
logger.debug("Starting a task queue.");
this.queue?.listen((msg) => this.#listenQueue(ctxData, msg));
this.queueStarted = true;
await this.queue?.listen(
(msg) => this.#listenQueue(ctxData, msg),
{ signal },
);
}
}

Expand Down Expand Up @@ -582,9 +589,11 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
);
}

startQueue(contextData: TContextData): Promise<void> {
this.#startQueue(contextData);
return Promise.resolve();
startQueue(
contextData: TContextData,
options: FederationStartQueueOptions = {},
): Promise<void> {
return this.#startQueue(contextData, options.signal);
}

createContext(baseUrl: URL, contextData: TContextData): Context<TContextData>;
Expand Down
19 changes: 14 additions & 5 deletions src/federation/mq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ test("InProcessMessageQueue", async (t) => {
const mq = new InProcessMessageQueue();

const messages: string[] = [];
mq.listen((message: string) => {
const controller = new AbortController();
const listening = mq.listen((message: string) => {
messages.push(message);
});
}, controller);

await t.step("enqueue()", async () => {
await mq.enqueue("Hello, world!");
Expand All @@ -32,6 +33,7 @@ test("InProcessMessageQueue", async (t) => {
"Delayed message",
{ delay: Temporal.Duration.from({ seconds: 3 }) },
);
assertEquals(messages, ["Hello, world!"]);
});

await waitFor(() => messages.length > 1, 15_000);
Expand All @@ -40,6 +42,9 @@ test("InProcessMessageQueue", async (t) => {
assertEquals(messages, ["Hello, world!", "Delayed message"]);
assertGreater(Date.now() - started, 3_000);
});

controller.abort();
await listening;
});

const queues: Record<string, () => Promise<MessageQueue>> = {
Expand All @@ -58,7 +63,7 @@ if (
new DenoKvMessageQueue(
// @ts-ignore: Works on Deno
// dnt-shim-ignore
await globalThis.Deno.openKv(),
await globalThis.Deno.openKv(":memory:"),
);
}

Expand All @@ -68,12 +73,13 @@ for (const mqName in queues) {
const workers = new ParallelMessageQueue(mq, 5);

const messages: string[] = [];
workers.listen(async (message: string) => {
const controller = new AbortController();
const listening = workers.listen(async (message: string) => {
for (let i = 0, cnt = 5 + Math.random() * 5; i < cnt; i++) {
await delay(250);
}
messages.push(message);
});
}, controller);

await t.step("enqueue() [single]", async () => {
await mq.enqueue("Hello, world!");
Expand Down Expand Up @@ -102,6 +108,9 @@ for (const mqName in queues) {

await waitFor(() => messages.length >= 20, 15_000);

controller.abort();
await listening;

if (Symbol.dispose in mq) {
const dispose = mq[Symbol.dispose];
if (typeof dispose === "function") dispose.call(mq);
Expand Down
Loading

0 comments on commit 8793b61

Please sign in to comment.