forked from seomoz/qless-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fail.lua
108 lines (92 loc) · 3.77 KB
/
fail.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
-- Fail(0, jid, worker, group, message, now, [data])
-- -------------------------------------------------
-- Mark the particular job as failed, with the provided group, and a more
-- specific message. By `group`, we mean some phrase that might be one of
-- several categorical modes of failure. The `message` is something more
-- job-specific, like perhaps a traceback.
--
-- This method should __not__ be used to note that a job has been dropped or
-- has failed in a transient way. This method __should__ be used to note that
-- a job has something really wrong with it that must be remedied.
--
-- The motivation behind the `group` is so that similar errors can be grouped
-- together. Optionally, updated data can be provided for the job. A job in
-- any state can be marked as failed. If it has been given to a worker as a
-- job, then its subsequent requests to heartbeat or complete that job will
-- fail. Failed jobs are kept until they are canceled or completed.
--
-- __Returns__ the id of the failed job if successful, or `False` on failure.
--
-- Args:
-- 1) jid
-- 2) worker
-- 3) group
-- 4) message
-- 5) the current time
-- 6) [data]
if #KEYS > 0 then error('Fail(): No Keys should be provided') end
local jid = assert(ARGV[1] , 'Fail(): Arg "jid" missing')
local worker = assert(ARGV[2] , 'Fail(): Arg "worker" missing')
local group = assert(ARGV[3] , 'Fail(): Arg "group" missing')
local message = assert(ARGV[4] , 'Fail(): Arg "message" missing')
local now = assert(tonumber(ARGV[5]), 'Fail(): Arg "now" missing or malformed: ' .. (ARGV[5] or 'nil'))
local data = ARGV[6]
-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)
if data then
data = cjson.decode(data)
end
-- First things first, we should get the history
local history, queue, state = unpack(redis.call('hmget', 'ql:j:' .. jid, 'history', 'queue', 'state'))
-- If the job has been completed, we cannot fail it
if state ~= 'running' then
return false
end
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'failed', jid)
end
-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
-- Now, take the element of the history for which our provided worker is the worker, and update 'failed'
history = cjson.decode(history or '[]')
if #history > 0 then
for i=#history,1,-1 do
if history[i]['worker'] == worker then
history[i]['failed'] = math.floor(now)
end
end
else
history = {
{
worker = worker,
failed = math.floor(now)
}
}
end
-- Increment the number of failures for that queue for the
-- given day.
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failures', 1)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. queue, 'failed' , 1)
-- Now remove the instance from the schedule, and work queues for the queue it's in
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)
-- The reason that this appears here is that the above will fail if the job doesn't exist
if data then
redis.call('hset', 'ql:j:' .. jid, 'data', cjson.encode(data))
end
redis.call('hmset', 'ql:j:' .. jid, 'state', 'failed', 'worker', '',
'expires', '', 'history', cjson.encode(history), 'failure', cjson.encode({
['group'] = group,
['message'] = message,
['when'] = math.floor(now),
['worker'] = worker
}))
-- Add this group of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed groups
redis.call('lpush', 'ql:f:' .. group, jid)
-- Here is where we'd intcrement stats about the particular stage
-- and possibly the workers
return jid