-
Notifications
You must be signed in to change notification settings - Fork 0
/
AmqpInterface.ts
109 lines (94 loc) · 3.12 KB
/
AmqpInterface.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { getChannel } from "./MessageBrokerChannel.ts";
export const QUEUES = {
reconnect: "reconnect",
reconnectComplete: "reconnectComplete",
};
async function createDefaultQueues() {
for (const queue of Object.entries(QUEUES)) {
await createQueue(queue[1]);
}
}
await createDefaultQueues();
const shadowSubscribes = new Map<string, () => void>();
export async function subscribe(
queue: string,
callback: (message: any) => void,
): Promise<void> {
const channel = await getChannel();
await channel.consume(
{ queue: queue, consumerTag: queue },
async (args, _, data) => {
await channel.ack({ deliveryTag: args.deliveryTag });
if (args.consumerTag !== queue) return; // todo there must be a better way for this
if (shadowSubscribes.has(queue)) shadowSubscribes.get(queue)!();
const json = JSON.parse(new TextDecoder().decode(data));
callback(json);
},
);
console.log(`[DEBUG] Subscribed to ${queue}`);
}
export async function unsubscribe(queue: string): Promise<void> {
const channel = await getChannel();
await channel.cancel({ consumerTag: queue });
}
export async function publish(queue: string, message: any) {
console.log(`[DEBUG] Sending AMQP to ${queue}: ${JSON.stringify(message)}`);
const channel = await getChannel();
await channel.publish(
{ routingKey: queue },
{ contentType: "application/json" },
new TextEncoder().encode(JSON.stringify(message)),
);
}
// MUST be awaited BEFORE publishing or subscribing
export async function createQueue(queue: string) {
const channel = await getChannel();
await channel.declareQueue({ queue: queue }); // fixme this does not work on a channel that has already been used
console.log(`[DEBUG] Created queue ${queue}`)
}
// This will DELETE ALL messages in the queue
export async function destroyQueue(queue: string) {
const channel = await getChannel();
await channel.deleteQueue({ queue: queue });
}
export async function queueExists(queue: string) {
const apiResponse = await fetch(
`https://${Deno.env.get("AMQP_HOST")}/api/queues/${
Deno.env.get("AMQP_USER")
}/${queue}`,
{
method: "GET",
headers: {
"Authorization": `Basic ${
btoa(`${Deno.env.get("AMQP_USER")}:${Deno.env.get("AMQP_PASSWORD")}`)
}`,
},
},
);
const data = await apiResponse.json();
if (data.name === queue) return true;
else return false;
}
export async function createAndSubscribeToIdQueue(
id: string,
callback: (message: any) => void,
) {
await createQueue(id);
await subscribe(id, callback);
await new Promise((resolve) => setTimeout(resolve, 1000));
sanityCheckOrResubscribe(id, callback);
}
async function sanityCheckOrResubscribe(
id: string,
callback: (message: any) => void,
) {
let hasBeenCalled = false;
shadowSubscribes.set(id, () => hasBeenCalled = true);
await publish(id, { type: "sanityCheck" });
// Wait 500ms for the message to come back
await new Promise((resolve) => setTimeout(resolve, 500));
if (!hasBeenCalled) {
console.warn("Queue subscription sanity check has failed");
createAndSubscribeToIdQueue(id, callback);
}
}