Skip to content

Commit

Permalink
fix(socket): make sure cache is updated before responding
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 4, 2024
1 parent b55074c commit 5ad459b
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import { RedisOptions } from "ioredis";
import { pick } from "lodash";
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
import { WebSocketClient } from "./ws-autoreconnect";
import { FoundQueue, execRedisCommand, getRedisInfo, ping } from "./queue-factory";
import {
FoundQueue,
execRedisCommand,
getRedisInfo,
ping,
} from "./queue-factory";
import { getQueueType, redisOptsFromUrl } from "./utils";
import { Integration } from "./interfaces/integration";

Expand Down Expand Up @@ -30,7 +35,7 @@ export const Socket = (
integrations?: {
[key: string]: Integration;
};
queueNames?: string[]
queueNames?: string[];
} = {}
) => {
const { team, nodes } = opts;
Expand All @@ -53,8 +58,8 @@ export const Socket = (
ws.onopen = function open() {
console.log(
chalk.yellow("WebSocket:") +
chalk.blueBright(" opened connection to ") +
chalk.gray("Taskforce.sh")
chalk.blueBright(" opened connection to ") +
chalk.gray("Taskforce.sh")
);
};

Expand All @@ -81,7 +86,7 @@ export const Socket = (
if (input === "authorized") {
console.log(
chalk.yellow("WebSocket: ") +
chalk.green("Succesfully authorized to taskforce.sh service")
chalk.green("Succesfully authorized to taskforce.sh service")
);

//
Expand All @@ -91,7 +96,8 @@ export const Socket = (
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.green(
"sending connection:"
)} ${chalk.blueBright(name)} ${team ? chalk.green(" for team ") + chalk.blueBright(team) : ""
)} ${chalk.blueBright(name)} ${
team ? chalk.green(" for team ") + chalk.blueBright(team) : ""
}`
);
ws.send(
Expand All @@ -102,15 +108,16 @@ export const Socket = (
connection: name,
team,
version,
}), startTime
}),
startTime
);
} else {
const msg = JSON.parse(input);

if (!msg.data) {
console.error(
chalk.red("WebSocket:") +
chalk.blueBright(" missing message data "),
chalk.blueBright(" missing message data "),
msg
);
return;
Expand All @@ -124,21 +131,26 @@ export const Socket = (
break;
case "queues":
case "jobs":
const cache = getCache();
let cache = getCache();
if (!cache) {
await updateQueuesCache(redisOpts, opts);
cache = getCache();
if (!cache) {
throw new Error("Unable to update queues");
}
}
const { queue, responders } =
cache[
queueKey({ name: queueName, prefix: queuePrefix || "bull" })
queueKey({ name: queueName, prefix: queuePrefix || "bull" })
];

if (!queue) {
ws.send(
JSON.stringify({
id: msg.id,
err: "Queue not found",
}), startTime
}),
startTime
);
} else {
switch (res) {
Expand Down Expand Up @@ -175,11 +187,12 @@ export const Socket = (
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.green(
"sending connection:"
)} ${chalk.blueBright(name)} ${team ? chalk.green(" for team ") + chalk.blueBright(team) : ""
)} ${chalk.blueBright(name)} ${
team ? chalk.green(" for team ") + chalk.blueBright(team) : ""
}`
);

logSendingQueues(queues)
logSendingQueues(queues);

respond(msg.id, startTime, {
queues,
Expand Down Expand Up @@ -220,11 +233,9 @@ export const Socket = (
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.blueBright(
"Sending queue:"
)} ${chalk.green(name)} ${chalk.blueBright(
"type:"
)} ${chalk.green(type)} ${chalk.blueBright(
"prefix:"
)} ${chalk.green(prefix)}`
)} ${chalk.green(name)} ${chalk.blueBright("type:")} ${chalk.green(
type
)} ${chalk.blueBright("prefix:")} ${chalk.green(prefix)}`
);
}
}
Expand Down

0 comments on commit 5ad459b

Please sign in to comment.