Skip to content

Commit

Permalink
Let Federation.startQueue() take an AbortSignal
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Sep 22, 2024
1 parent 07eeddb commit 33f0f30
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 35 deletions.
12 changes: 12 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ To be released.
the activity to be sent only once. It had added Object Integrity Proofs
to the activity for every recipient before.

- The message queue is now able to be stopped manually by providing
an `AbortSignal` object to the `Federation.startQueue()` method.

- Added the optional second parameter to `Federation.startQueue()` method,
which is a `FederationStartQueueOptions` object.
- Added `FederationStartQueueOptions` interface.
- Added the optional second parameter to `MessageQueue.listen()` method,
which is a `MessageQueueListenOptions` object.
- Added `MessageQueueListenOptions` interface.
- The return type of `MessageQueue.listen()` method became `Promise<void>`
(was `void`).

- Added `ParallelMessageQueue` class. [[#106]]

- WebFinger responses now include <http://webfinger.net/rel/avatar> links
Expand Down
27 changes: 21 additions & 6 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ Create a class that implements the `MessageQueue` interface, which includes
the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods:

~~~~ typescript twoslash
import type { MessageQueue, MessageQueueEnqueueOptions } from "@fedify/fedify";
import type {
MessageQueue,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from "@fedify/fedify";

class CustomMessageQueue implements MessageQueue {
async enqueue(
Expand All @@ -147,7 +151,10 @@ class CustomMessageQueue implements MessageQueue {
// Implementation here
}

listen(handler: (message: any) => Promise<void> | void): void {
listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
// Implementation here
}
}
Expand All @@ -166,6 +173,10 @@ This method should start a process that listens for new messages.
When a message is received, it should call the provided `handler` function.
Ensure proper error handling to prevent the listener from crashing.

> [!NOTE]
> A `Promise` object it returns should never resolve unless the given
> `~MessageQueueListenOptions.signal` is triggered.
### Consider additional features

Here's a list of additional features you might want to implement in your
Expand Down Expand Up @@ -255,7 +266,9 @@ const federation = createFederation<void>({
// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (Deno.env.get("NODE_TYPE") === "worker") {
await federation.startQueue();
const controller = new AbortController();
Deno.addSignalListener("SIGINT", () => controller.abort());
await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

Expand All @@ -265,7 +278,7 @@ import type { KvStore } from "@fedify/fedify";
import { createFederation } from "@fedify/fedify";
import { RedisMessageQueue } from "@fedify/redis";
import Redis from "ioredis";
import { env } from "node:process";
import process from "node:process";
const federation = createFederation<void>({
queue: new RedisMessageQueue(() => new Redis()),
Expand All @@ -278,8 +291,10 @@ const federation = createFederation<void>({
// Start the message queue manually only in worker nodes.
// On non-worker nodes, the queue won't be started.
if (env.NODE_TYPE === "worker") {
await federation.startQueue();
if (process.env.NODE_TYPE === "worker") {
const controller = new AbortController();
process.on("SIGINT", () => controller.abort());
await federation.startQueue(undefined, { signal: controller.signal });
}
~~~~

Expand Down
17 changes: 16 additions & 1 deletion src/federation/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import type {
} from "./callback.ts";
import type { Context, RequestContext } from "./context.ts";

/**
* Options for {@link Federation.startQueue} method.
* @since 1.0.0
*/
export interface FederationStartQueueOptions {
/**
* The signal to abort the task queue.
*/
signal?: AbortSignal;
}

/**
* An object that registers federation-related business logic and dispatches
* requests to the appropriate handlers.
Expand All @@ -33,8 +44,12 @@ export interface Federation<TContextData> {
* This method is useful when you set the `manuallyStartQueue` option to
* `true` in the {@link createFederation} function.
* @param contextData The context data to pass to the context.
* @param options Additional options for starting the queue.
*/
startQueue(contextData: TContextData): Promise<void>;
startQueue(
contextData: TContextData,
options?: FederationStartQueueOptions,
): Promise<void>;

/**
* Create a new context.
Expand Down
19 changes: 14 additions & 5 deletions src/federation/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import type {
CollectionCallbackSetters,
Federation,
FederationFetchOptions,
FederationStartQueueOptions,
InboxListenerSetters,
ObjectCallbackSetters,
} from "./federation.ts";
Expand Down Expand Up @@ -365,12 +366,18 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
createExponentialBackoffPolicy();
}

#startQueue(ctxData: TContextData) {
async #startQueue(
ctxData: TContextData,
signal?: AbortSignal,
): Promise<void> {
if (this.queue != null && !this.queueStarted) {
const logger = getLogger(["fedify", "federation", "queue"]);
logger.debug("Starting a task queue.");
this.queue?.listen((msg) => this.#listenQueue(ctxData, msg));
this.queueStarted = true;
await this.queue?.listen(
(msg) => this.#listenQueue(ctxData, msg),
{ signal },
);
}
}

Expand Down Expand Up @@ -582,9 +589,11 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
);
}

startQueue(contextData: TContextData): Promise<void> {
this.#startQueue(contextData);
return Promise.resolve();
startQueue(
contextData: TContextData,
options: FederationStartQueueOptions = {},
): Promise<void> {
return this.#startQueue(contextData, options.signal);
}

createContext(baseUrl: URL, contextData: TContextData): Context<TContextData>;
Expand Down
19 changes: 14 additions & 5 deletions src/federation/mq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ test("InProcessMessageQueue", async (t) => {
const mq = new InProcessMessageQueue();

const messages: string[] = [];
mq.listen((message: string) => {
const controller = new AbortController();
const listening = mq.listen((message: string) => {
messages.push(message);
});
}, controller);

await t.step("enqueue()", async () => {
await mq.enqueue("Hello, world!");
Expand All @@ -32,6 +33,7 @@ test("InProcessMessageQueue", async (t) => {
"Delayed message",
{ delay: Temporal.Duration.from({ seconds: 3 }) },
);
assertEquals(messages, ["Hello, world!"]);
});

await waitFor(() => messages.length > 1, 15_000);
Expand All @@ -40,6 +42,9 @@ test("InProcessMessageQueue", async (t) => {
assertEquals(messages, ["Hello, world!", "Delayed message"]);
assertGreater(Date.now() - started, 3_000);
});

controller.abort();
await listening;
});

const queues: Record<string, () => Promise<MessageQueue>> = {
Expand All @@ -58,7 +63,7 @@ if (
new DenoKvMessageQueue(
// @ts-ignore: Works on Deno
// dnt-shim-ignore
await globalThis.Deno.openKv(),
await globalThis.Deno.openKv(":memory:"),
);
}

Expand All @@ -68,12 +73,13 @@ for (const mqName in queues) {
const workers = new ParallelMessageQueue(mq, 5);

const messages: string[] = [];
workers.listen(async (message: string) => {
const controller = new AbortController();
const listening = workers.listen(async (message: string) => {
for (let i = 0, cnt = 5 + Math.random() * 5; i < cnt; i++) {
await delay(250);
}
messages.push(message);
});
}, controller);

await t.step("enqueue() [single]", async () => {
await mq.enqueue("Hello, world!");
Expand Down Expand Up @@ -102,6 +108,9 @@ for (const mqName in queues) {

await waitFor(() => messages.length >= 20, 15_000);

controller.abort();
await listening;

if (Symbol.dispose in mq) {
const dispose = mq[Symbol.dispose];
if (typeof dispose === "function") dispose.call(mq);
Expand Down
108 changes: 97 additions & 11 deletions src/federation/mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ export interface MessageQueueEnqueueOptions {
delay?: Temporal.Duration;
}

/**
* Additional options for listening to a message queue.
*
* @since 1.0.0
*/
export interface MessageQueueListenOptions {
/**
* The signal to abort listening to the message queue.
*/
signal?: AbortSignal;
}

/**
* An abstract interface for a message queue.
*
Expand All @@ -30,31 +42,102 @@ export interface MessageQueue {
/**
* Listens for messages in the queue.
* @param handler The handler for messages in the queue.
* @param options Additional options for listening to the message queue.
* @returns A promise that resolves when the listening is done. It never
* rejects, and is resolved when the signal is aborted. If no
* signal is provided, it never resolves.
*/
listen(
handler: (message: any) => Promise<void> | void,
options?: MessageQueueListenOptions,
): Promise<void>;
}

/**
* Additional options for {@link InProcessMessageQueue}.
* @since 1.0.0
*/
export interface InProcessMessageQueueOptions {
/**
* The interval to poll for messages in the queue. 5 seconds by default.
* @default `{ seconds: 5 }`
*/
listen(handler: (message: any) => Promise<void> | void): void;
pollInterval?: Temporal.Duration | Temporal.DurationLike;
}

/**
* A message queue that processes messages in the same process.
* Do not use this in production as it does not persist messages.
* Do not use this in production as it does neither persist messages nor
* distribute them across multiple processes.
*
* @since 0.5.0
*/
export class InProcessMessageQueue implements MessageQueue {
#handlers: ((message: any) => Promise<void> | void)[] = [];
#messages: any[];
#monitors: Record<ReturnType<typeof crypto.randomUUID>, () => void>;
#pollIntervalMs: number;

/**
* Constructs a new {@link InProcessMessageQueue} with the given options.
* @param options Additional options for the in-process message queue.
*/
constructor(options: InProcessMessageQueueOptions = {}) {
this.#messages = [];
this.#monitors = {};
this.#pollIntervalMs = Temporal.Duration.from(
options.pollInterval ?? { seconds: 5 },
).total("millisecond");
}

enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise<void> {
const delay = options?.delay == null
? 0
: Math.max(options.delay.total("millisecond"), 0);
setTimeout(() => {
for (const handler of this.#handlers) handler(message);
}, delay);
if (delay > 0) {
setTimeout(
() => this.enqueue(message, { ...options, delay: undefined }),
delay,
);
return Promise.resolve();
}
this.#messages.push(message);
for (const monitorId in this.#monitors) {
this.#monitors[monitorId as ReturnType<typeof crypto.randomUUID>]();
}
return Promise.resolve();
}

listen(handler: (message: any) => Promise<void> | void): void {
this.#handlers.push(handler);
async listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
const signal = options.signal;
while (signal == null || !signal.aborted) {
while (this.#messages.length > 0) {
const message = this.#messages.shift();
await handler(message);
}
await this.#wait(this.#pollIntervalMs, signal);
}
}

#wait(ms: number, signal?: AbortSignal): Promise<void> {
let timer: ReturnType<typeof setTimeout> | null = null;
return Promise.any([
new Promise<void>((resolve) => {
signal?.addEventListener("abort", () => {
if (timer != null) clearTimeout(timer);
resolve();
}, { once: true });
const monitorId = crypto.randomUUID();
this.#monitors[monitorId] = () => {
delete this.#monitors[monitorId];
if (timer != null) clearTimeout(timer);
resolve();
};
}),
new Promise<void>((resolve) => timer = setTimeout(resolve, ms)),
]);
}
}

Expand Down Expand Up @@ -100,17 +183,20 @@ export class ParallelMessageQueue implements MessageQueue {
return this.queue.enqueue(message, options);
}

listen(handler: (message: any) => Promise<void> | void): void {
listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
const workers = new Map<Uuid, Promise<Uuid>>();
this.queue.listen(async (message) => {
return this.queue.listen(async (message) => {
while (workers.size >= this.workers) {
const consumedId = await Promise.any(workers.values());
workers.delete(consumedId);
}
const workerId = crypto.randomUUID();
const promise = this.#work(workerId, handler, message);
workers.set(workerId, promise);
});
}, options);
}

async #work(
Expand Down
Loading

0 comments on commit 33f0f30

Please sign in to comment.