diff --git a/lib/commands/includes/collectMetrics.lua b/lib/commands/includes/collectMetrics.lua new file mode 100644 index 000000000..a2a4a0cb3 --- /dev/null +++ b/lib/commands/includes/collectMetrics.lua @@ -0,0 +1,46 @@ +--[[ + Functions to collect metrics based on a current and previous count of jobs. + Granualarity is fixed at 1 minute. +]] + +-- Includes +--- @include "batches" + +local function collectMetrics(metaKey, dataPointsList, maxDataPoints, timestamp) + -- Increment current count + local count = rcall("HINCRBY", metaKey, "count", 1) - 1 + + -- Compute how many data points we need to add to the list, N. + local prevTS = rcall("HGET", metaKey, "prevTS") + + if not prevTS then + -- If prevTS is nil, set it to the current timestamp + rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0) + return + end + + local N = math.min(math.floor((timestamp - prevTS) / 60000), tonumber(maxDataPoints)) + + if N > 0 then + local delta = count - rcall("HGET", metaKey, "prevCount") + -- If N > 1, add N-1 zeros to the list + if N > 1 then + local points = {} + points[1] = delta + for i = 2, N do points[i] = 0 end + + for from, to in batches(#points, 7000) do + rcall("LPUSH", dataPointsList, unpack(points, from, to)) + end + else + -- LPUSH delta to the list + rcall("LPUSH", dataPointsList, delta) + end + + -- LTRIM to keep list to its max size + rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1) + + -- update prev count with current count + rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp) + end +end diff --git a/lib/commands/moveToFinished-9.lua b/lib/commands/moveToFinished-9.lua index 1d69f94a1..2d63cf0a4 100644 --- a/lib/commands/moveToFinished-9.lua +++ b/lib/commands/moveToFinished-9.lua @@ -43,52 +43,9 @@ local rcall = redis.call -- Includes +--- @include "includes/collectMetrics" --- @include "includes/removeLock" --- @include "includes/removeDebounceKeyIfNeeded" ---- @include "includes/batches" - ---[[ - Functions to collect metrics based on a current and previous count of jobs. - Granualarity is fixed at 1 minute. -]] -local function collectMetrics(metaKey, dataPointsList, maxDataPoints, timestamp) - -- Increment current count - local count = rcall("HINCRBY", metaKey, "count", 1) - 1 - - -- Compute how many data points we need to add to the list, N. - local prevTS = rcall("HGET", metaKey, "prevTS") - - if not prevTS then - -- If prevTS is nil, set it to the current timestamp - rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0) - return - end - - local N = math.floor((timestamp - prevTS) / 60000) - - if N > 0 then - local delta = count - rcall("HGET", metaKey, "prevCount") - -- If N > 1, add N-1 zeros to the list - if N > 1 then - local points = {} - points[1] = delta - for i = 2, N do points[i] = 0 end - - for from, to in batches(#points, 7000) do - rcall("LPUSH", dataPointsList, unpack(points, from, to)) - end - else - -- LPUSH delta to the list - rcall("LPUSH", dataPointsList, delta) - end - - -- LTRIM to keep list to its max size - rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1) - - -- update prev count with current count - rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp) - end -end if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])