From 5ad459b45707f1b2ed5ed299d9578225fc9dba13 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 4 Aug 2024 20:54:52 +0200 Subject: [PATCH] fix(socket): make sure cache is updated before responding --- lib/socket.ts | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 3def2fd..f9edfef 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -2,7 +2,12 @@ import { RedisOptions } from "ioredis"; import { pick } from "lodash"; import { getCache, updateQueuesCache, queueKey } from "./queues-cache"; import { WebSocketClient } from "./ws-autoreconnect"; -import { FoundQueue, execRedisCommand, getRedisInfo, ping } from "./queue-factory"; +import { + FoundQueue, + execRedisCommand, + getRedisInfo, + ping, +} from "./queue-factory"; import { getQueueType, redisOptsFromUrl } from "./utils"; import { Integration } from "./interfaces/integration"; @@ -30,7 +35,7 @@ export const Socket = ( integrations?: { [key: string]: Integration; }; - queueNames?: string[] + queueNames?: string[]; } = {} ) => { const { team, nodes } = opts; @@ -53,8 +58,8 @@ export const Socket = ( ws.onopen = function open() { console.log( chalk.yellow("WebSocket:") + - chalk.blueBright(" opened connection to ") + - chalk.gray("Taskforce.sh") + chalk.blueBright(" opened connection to ") + + chalk.gray("Taskforce.sh") ); }; @@ -81,7 +86,7 @@ export const Socket = ( if (input === "authorized") { console.log( chalk.yellow("WebSocket: ") + - chalk.green("Succesfully authorized to taskforce.sh service") + chalk.green("Succesfully authorized to taskforce.sh service") ); // @@ -91,7 +96,8 @@ export const Socket = ( console.log( `${chalk.yellow("WebSocket:")} ${chalk.green( "sending connection:" - )} ${chalk.blueBright(name)} ${team ? chalk.green(" for team ") + chalk.blueBright(team) : "" + )} ${chalk.blueBright(name)} ${ + team ? chalk.green(" for team ") + chalk.blueBright(team) : "" }` ); ws.send( @@ -102,7 +108,8 @@ export const Socket = ( connection: name, team, version, - }), startTime + }), + startTime ); } else { const msg = JSON.parse(input); @@ -110,7 +117,7 @@ export const Socket = ( if (!msg.data) { console.error( chalk.red("WebSocket:") + - chalk.blueBright(" missing message data "), + chalk.blueBright(" missing message data "), msg ); return; @@ -124,13 +131,17 @@ export const Socket = ( break; case "queues": case "jobs": - const cache = getCache(); + let cache = getCache(); if (!cache) { await updateQueuesCache(redisOpts, opts); + cache = getCache(); + if (!cache) { + throw new Error("Unable to update queues"); + } } const { queue, responders } = cache[ - queueKey({ name: queueName, prefix: queuePrefix || "bull" }) + queueKey({ name: queueName, prefix: queuePrefix || "bull" }) ]; if (!queue) { @@ -138,7 +149,8 @@ export const Socket = ( JSON.stringify({ id: msg.id, err: "Queue not found", - }), startTime + }), + startTime ); } else { switch (res) { @@ -175,11 +187,12 @@ export const Socket = ( console.log( `${chalk.yellow("WebSocket:")} ${chalk.green( "sending connection:" - )} ${chalk.blueBright(name)} ${team ? chalk.green(" for team ") + chalk.blueBright(team) : "" + )} ${chalk.blueBright(name)} ${ + team ? chalk.green(" for team ") + chalk.blueBright(team) : "" }` ); - logSendingQueues(queues) + logSendingQueues(queues); respond(msg.id, startTime, { queues, @@ -220,11 +233,9 @@ export const Socket = ( console.log( `${chalk.yellow("WebSocket:")} ${chalk.blueBright( "Sending queue:" - )} ${chalk.green(name)} ${chalk.blueBright( - "type:" - )} ${chalk.green(type)} ${chalk.blueBright( - "prefix:" - )} ${chalk.green(prefix)}` + )} ${chalk.green(name)} ${chalk.blueBright("type:")} ${chalk.green( + type + )} ${chalk.blueBright("prefix:")} ${chalk.green(prefix)}` ); } }