Here is some brief documentation of how the lua scripts work and what they
expect. Each invocation begins with a number, which describes how many of the
provided values are considered KEYS
, as they refer in some sense to a Redis
key. The remaining values are considered ARGV
. This is a distinction that
Redis makes internally, and should be considered a magic number.
All times are specified as UTC timestamps. I imagine this is something that might become somewhat contentious. Lua scripts are not allowed access to the system clock. They actually have a pretty good reason for that, but it also means that each client must provide their own times. This has the side effect of requiring that all clients have relatively synchronized clocks.
id
-- the id of the job, a hexadecimal uuiddata
-- a JSON blob representing the user data associated with a jobqueue
-- the name of a queueworker
-- a unique string identifying a process on a host
Cancel a job from taking place. It will be deleted from the system, and any attempts to renew a heartbeat will fail, and any attempts to complete it will fail. If you try to get the data on the object, you will get nothing.
Complete(0, jid, worker, queue, now, data, ['next', n, [('delay', d) | ('depends', '["jid1","jid2",...]')])
Complete a job and optionally put it in another queue, either scheduled or to be considered waiting immediately. Job dependencies can also be injected at this time. Returns: The updated state, or False on error
Add or remove dependencies a job has. If 'on' is provided, the provided jids are added as dependencies. If 'off' and 'all' are provided, then all the current dependencies are removed. If 'off' is provided and the next argument is not 'all', then those jids are removed as dependencies.
If a job is not already in the 'depends' state, then this call will return false. Otherwise, it will return true
Mark the particular job as failed, with the provided type, and a more specific
message. By type
, we mean some phrase that might be one of several
categorical modes of failure. The message
is something more job-specific,
like perhaps a traceback.
This method should not be used to note that a job has been dropped or has failed in a transient way. This method should be used to note that a job has something really wrong with it that must be remedied.
The motivation behind the type
is so that similar errors can be grouped
together. Optionally, updated data can be provided for the job. A job in any
state can be marked as failed. If it has been given to a worker as a job, then
its subsequent requests to heartbeat or complete that job will fail. Failed
jobs are kept until they are canceled or completed. Returns the id of the
failed job if successful, or False
on failure.
If no type is provided, this returns a JSON blob of the counts of the various
types of failures known. If a type is provided, it will report up to limit
from start
of the jobs affected by that issue. Returns a JSON blob.
# If no type, then...
{
'type1': 1,
'type2': 5,
...
}
# If a type is provided, then...
{
'total': 100,
'jobs': [
{
# All the normal keys for a job
'jid': ...,
'data': ...
# The message for this particular instance
'message': ...,
'type': ...,
}, ...
]
}
Get the data associated with a job. Returns: JSON blob describing the job.
Get the current configuration value for that option, or if option is omitted, then get all the configuration values. Returns: The value of the option
Renew the heartbeat, if possible, and optionally update the job's user data.
Returns: a JSON blob with False
if the job was not renewed, or the
updated expiration time
Return all the job ids currently considered to be in the provided state in a particular queue. The response is a list of job ids:
[
jid1,
jid2,
...
]
Similar to the Pop
command, except that it merely peeks at the next items
in the queue.
Passing in the queue from which to pull items, the current time, when the locks for these returned items should expire, and the number of items to be popped off.
Accepts a jid, and a new priority for the job. If the job doesn't exist, then return false. Otherwise, return the updated priority. If the job is waiting, then the change will be reflected in the order in which it's popped
Put(1, queue, jid, klass, data, now, delay, [priority, p], [tags, t], [retries, r], [depends, '[...]'])
Either create a new job in the provided queue with the provided attributes,
or move that job into that queue. If the job is being serviced by a worker,
subsequent attempts by that worker to either heartbeat
or complete
the
job should fail and return false
.
The priority
argument should be negative to be run sooner rather than
later, and positive if it's less important. The tags
argument should be
a JSON array of the tags associated with the instance and the delay
argument should be in how many seconds the instance should be considered
actionable. The retries
argument describes the maximum number of retries
that should be permitted for a job before it is considered automatically
failed.
The depends
argument is an optional JSON array of job ids on which this
job depends. See the section on dependency for more.
Returns: The id of the put job, or raises an error on failure
Return all the queues we know about, with how many jobs are scheduled, waiting, and running in that queue. If a queue name is provided, then only the appropriate response hash should be returned. The response is JSON:
[
{
'name': 'testing',
'stalled': 2,
'waiting': 5,
'running': 5,
'scheduled': 10
}, {
...
}
]
This script accepts jid, queue, worker and delay for retrying a job. This is
similar in functionality to put
, except that this counts against the retries
a job has for a stage.
If the worker is not the worker with a lock on the job, then it returns false. If the job is not actually running, then it returns false. Otherwise, it returns the number of retries remaining. If the allowed retries have been exhausted, then it is automatically failed, and a negative number is returned.
Set the configuration value for the provided option. If value
is omitted,
then it will remove that configuration option. Returns: nothing
Return the current statistics for a given queue on a given date. The results are returned are a JSON blob:
{
'failed': 3,
'retries': 5,
'wait' : {
'total' : ...,
'mean' : ...,
'variance' : ...,
'histogram': [
...
]
}, 'run': {
'total' : ...,
'mean' : ...,
'variance' : ...,
'histogram': [
...
]
}
}
The histogram's data points are at the second resolution for the first minute,
the minute resolution for the first hour, the 15-minute resolution for the
first day, the hour resolution for the first 3 days, and then at the day
resolution from there on out. The histogram
key is a list of those values.
Accepts a jid, 'add' or 'remove', and then a list of tags to either add or remove from the job. Alternatively, 'get', a tag to get jobs associated with that tag, and offset and count.
If 'add' or 'remove', the response is a list of the jobs current tags, or False if the job doesn't exist. If 'get', the response is of the form:
{
total: ...,
jobs: [
jid,
...
]
}
If no arguments are provided, it returns details of all currently-tracked jobs. If the first argument is 'track', then it will start tracking the job associated with that id, and 'untrack' stops tracking it. In this context, tracking is nothing more than saving the job to a list of jobs that are considered special. Returns JSON:
{
'jobs': [
{
'jid': ...,
# All the other details you'd get from 'get'
}, {
...
}
], 'expired': [
# These are all the jids that are completed and whose data expired
'deadbeef',
...,
...,
]
}
Provide data about all the workers, or if a specific worker is provided, then which jobs that worker is responsible for. If no worker is provided, expect a response of the form:
[
# This is sorted by the recency of activity from that worker
{
'name' : 'hostname1-pid1',
'jobs' : 20,
'stalled': 0
}, {
...
}
]
If a worker id is provided, then expect a response of the form:
{
'jobs': [
jid1,
jid2,
...
], 'stalled': [
jid1,
...
]
}
Move the first count
jobs from failure group
to queue
. This is
significantly faster than moving each of the jobs individually, so when moving
lots of failed jobs, use this.
Unimplemented This is something I may implement at some point to serve as a method for checking the consistency of qless.
A worker is given an exclusive lock on a piece of work when it is given that piece of work. That lock may be renewed periodically so long as it's before the provided 'heartbeat' timestamp. Likewise, it may be completed.
If a worker attempts to heartbeat a job, it may optionally provide an updated
JSON blob to describe the job. If the job has been given to another worker,
the heartbeat should return false
and the worker should yield.
When a node attempts to heartbeat, the lua script should check to see if the node attempting to renew the lock is the same node that currently owns the lock. If so, then the lock's expiration should be pushed back accordingly, and the updated expiration returned. If not, it only has to return false.
Qless also collects statistics for job wait time (time popped - time put), and job completion time (time completed - time popped). By 'statistics', I mean average, variange, count and a histogram. Stats for the number of failures and retries for a given queue are also available.
Stats are grouped by day. In the case of job wait time, its stats are aggregated on the day when the job was popped. In the case of completion time, they are grouped by the day it was completed.
Jobs can be tracked, which just means that they are accessible and displayable.
This can be useful if you just want to keep tabs on the progress of jobs
through the pipeline. All the currently-tracked jobs are stored in a sorted
set, ql:tracked
.
Failures are stored in such a way that we can quickly summarize the number of
failures of a given type, but also which items have succumb to that type of
failure. With that in mind, there is a Redis set, ql:failures
whose members
are the names of the various failure lists. Each type of failure then has its
own list of instance ids that encountered such a failure. For example, we
might have:
ql:failures
=============
upload error
widget failure
ql:f:upload error
==================
deadbeef
...
We'll keep a sorted set of workers sorted by the last time they had any
activity. We'll store this set at ql:workers
.
In addition to this list, we'll keep a set of the jids that a worker currently
has locks for at ql:w:<worker>:jobs
. This should be sorted by the time when
we last saw a heartbeat (or pop) for that worker from that job.
TBD We will likely store data about each worker. Perhaps this, too, can be kept by day.
We should delete data about completed jobs periodically. We should prune both
by the policies for the maximum number of retained completed jobs, and by the
maximum age for retained jobs. To accomplish this, we'll use a sorted list to
keep track of which items should be expired. This list should be stored in the
key ql:completed
The configuration should go in the key ql:config
, and here are some of the
configuration options that qless
is meant to support:
heartbeat
(60) -- The default heartbeat in seconds for queuesstats-history
(30) -- The number of days to store summary statshistogram-history
(7) -- The number of days to store histogram datajobs-history-count
(50k) -- How many jobs to keep data for after they're completedjobs-history
(7 * 24 * 60 * 60) -- How many seconds to keep jobs after they're completedheartbeat-<queue name>
-- The heartbeat interval (in seconds) for a particular queuemax-worker-age
-- How long before workers are considered disappeared
This section stands to speak to the internal structure and naming conventions.
Each job is stored primarily in a key ql:j:<jid>
, a Redis hash, which
contains most of the keys that describe the job. A set (possibly empty)
of jids on which this job depends is stored in ql:j:<jid>-dependencies
.
A set (also possibly empty) of jids that rely on the completion of this
job is stored in ql:j:<jid>-dependents
. For example, ql:j:<jid>
:
{
# This is the same id as identifies it in the key. It should be
# a hex value of a uuid
'jid' : 'deadbeef...',
# This is a 'type' identifier. Clients may choose to ignore it,
# or use it as a language-specific identifier for determining
# what code to run. For instance, it might be 'foo.bar.FooJob'
'type' : '...',
# This is the priority of the job -- lower means more priority.
# The default is 0
'priority' : 0,
# This is the user data associated with the job. (JSON blob)
'data' : '{"hello": "how are you"}',
# A JSON array of tags associated with this job
'tags' : '["testing", "experimental"]',
# The worker ID of the worker that owns it. Currently the worker
# id is <hostname>-<pid>
'worker' : 'ec2-...-4925',
# This is the time when it must next check in
'expires' : 1352375209,
# The current state of the job: 'waiting', 'pending', 'complete'
'state' : 'waiting',
# The queue that it's associated with. 'null' if complete
'queue' : 'example',
# The maximum number of retries this job is allowed per queue
'retries' : 3,
# The number of retries remaining
'remaining' : 3,
# The jids that depend on this job's completion
'dependents' : [...],
# The jids that this job is dependent upon
'dependencies': [...],
# A list of all the stages that this node has gone through, and
# when it was put in that queue, given to a worker, which worker,
# and when it was completed. (JSON blob)
'history' : [
{
'q' : 'test1',
'put' : 1352075209,
'popped': 1352075300,
'done' : 1352076000,
'worker': 'some-hostname-pid'
}, {
...
}
]
}
A queue is a priority queue and consists of three parts:
ql:q:<name>-scheduled
-- sorted set of all scheduled job idsql:q:<name>-work
-- sorted set (by priority) of all jobs waitingql:q:<name>-locks
-- sorted set of job locks and expirationsql:q:<name>-depends
-- sorted set of jobs in a queue, but waiting on other jobs
When looking for a unit of work, the client should first choose from the
next expired lock. If none are expired, then we should next make sure that
any jobs that should now be considered eligible (the scheduled time is in
the past) are then inserted into the work queue. A sorted set of all the
known queues is maintained at ql:queues
. Currently we're keeping it
sorted based on the time when we first saw the queue, but that's a little
bit at odd with only keeping queues around while they're being used.
When a job is completed, it removes itself as a dependency of all the jobs that depend on it. If it was the last job that a job depended on, it is then inserted into the queue's work.
Stats are grouped by day and queue. The day portion of the stats key is an integer timestamp of midnight for that day:
<day> = time - (time % (24 * 60 * 60))
Stats are stored under two hashes: ql:s:wait:<day>:<queue>
and
ql:s:run:<day>:<queue>
respectively. Each has the keys:
total
-- The total number of data points containedmean
-- The current mean valuevk
-- Not the actual variance, but a number that can be used to both numerically stable-ly find the variance, and compute it in a streaming fashions1
,s2
, ..., -- second-resolution histogram counts for the first minutem1
,m2
, ..., -- minute-resolution for the first hourh1
,h2
, ..., -- hour-resolution for the first dayd1
,d2
, ..., -- day-resolution for the rest
This is also another hash, ql:s:stats:<day>:<queue>
with keys:
failures
-- This is how many failures there have been. If a job is run twice and fails repeatedly, this is incremented twice.failed
-- This is how many are currently failedretries
-- This is how many jobs we've had to retry
All jobs store a JSON array of the tags that are associated with it. In
addition, the keys ql:t:<tag>
store a sorted set of all the jobs associated
with that particular tag. The score of each jid in that tag is the time when
that tag was added to that job. When jobs are tagged a second time with an
existing tag, then it's a no-op.
There are a few nuanced aspects of implementing bindings for your particular language that are worth bringing up.
Jobs with identical priorities are popped in the order they were inserted. The caveat is that it's only true to the precision of the timestamps your bindings provide. For example, if you provide timestamps to the second granularity, then jobs with the same priority inserted in the same second can be popped in any order. Timestamps at the thousandths of a second granularity will maintain this property better. While for most applications it's likely not important, it is something to be aware of when writing language bindings.
It's intended to be a common usecase that bindings provide a worker script or binary that runs several worker subprocesses. These should run with their working directory as a sandbox.
Resque holds the philosphy that each worker consists of two processes: a master process that grabs jobs, and an actual worker, which is a fork of the master process and actually does the work associated with the job. The major advantage of this as far as we can see it is that it's a good strategy for sandboxing any havoc that might occur when processing the job.
However, apparently Ruby doesn't handle copy-on-write very well, and so there is a lot of overhead in not only system calls to fork a process, but also to load modules into memory. Other languages, however, may or may not suffer from this problem, but it's important to be aware of.
Another project in the vein of Resque (in fact, it uses the same job structure) is sidekiq, which uses threads in a master process to do work. The performance boost is substantial, not to mention the memory footprint. Most of this performance appears to be gained from the memory profile and not constantly allocating memory for every job.
Ultimately, the choice is yours, but we thought it bore mentioning.
Workers are allowed (and encouraged) to pop off of more than one queue. But
then we get into the problem of what order they should be polled. Workers
should support two modes of popping: ordered and round-robin. Consider queues
A
, B
, and C
with job counts:
A: 5
B: 2
C: 3
In an ordered verion, the order in which the queues are specified has
significance in the order in which jobs are popped. For example, if our queued
were ordered C, B, A
in the worker, we'd pop jobs off:
C, C, C, B, B, A, A, A, A, A
In the round-robin implementation, a worker pops off a job from each queue as it progress through all queues:
C, B, A, C, B, A, C, A, A, A
These aren't meant to be stringent, but just to keep myself sane so that when moving between different chunks of code that it's all formatted similarly, and the same variable names have the same meaning.
- Parameter sanitization should be performed as early as possible. This
includes making use of
assert
anderror
based on the number and type of arguments. - Job ids should be referred to as
jid
, both internally and in the clients. - Failure types should be described with
group
. I'm not terribly thrilled with the term, but I thought it was better than 'kind.' After spending some time with a Thesaurus, I didn't find anything that appealed to me more - Job types should be described as
klass
(nod to Resque), because both 'type' and 'class' are commonly used in languages.