Beanstalkd is
Simple and fast general purpose work queue. The beauty of Beanstalkd is its absolute simplicity.
Actually you can use just 3 commands
- submit(put) job into the queue
- take(reserve) job from the queue for processing
- delete job from the queue
// On producer side
_ = try producer.put(1, 0, 120, "job data");
// On worker side
var job: Job = .{};
try job.init(allocator);
defer job.deinit();
try worker.reserve(10, &job);
// job.body().? - contains "job data"
// process job
// ...........
// job.id().? - contains job id
try worker.delete(job.id().?);
Beanstalkd is the part of main distros, you can install it using appropriate package manager.
And of course you can use Beanstalkd with Docker.
If you don't have experience using Beanstalkd
, it's a good idea to read:
Job is opaque array of bytes. Beanstalkd does not force you to use a specific data format.
After being placed in a queue, job can be in the following states:
- delayed (waiting for time-out before moving to next state)
- ready (for processing)
- reserved (processed)
- buried (failed)
'put' with delay 'delete'
----------------> [DELAYED] ---------------------------X
|
| 'kick-job' or time passes
|
'put' v 'reserve' 'delete'
-----------------> [READY] ---------> [RESERVED] ------X
| ^ |
| | | 'bury'
| | 'kick-job' v 'delete'
| `------------ [BURIED] ------X
|
| 'delete'
`--------------------------------X
Instead of the term queue Beanstalkd uses term tube, this explains the picture above.
Tube is named queue.
Tubes are created on demand whenever they are referenced.
If a tube is empty (that is, it contains no ready, delayed, or buried jobs)
and no client refers to it, it will be deleted.
If tube was not referenced, Beanstalkd creates "default" tube.
Every tube has 3 sub-queues:
- delay - contains jobs in delayed state
- ready - contains jobs in ready or reserved states
- bury (dead-letter) - contains failed jobs
Name | Description | API |
---|---|---|
use | Set current tube(queue) | use(tname: []const u8) |
put | Submit job to current tube | put(pri: u32, delay: u32, ttr: u32, job: []const u8) |
watch | Subscribe to jobs submitted to the tube | watch(tname: []const u8) |
reserve | Consume job | reserve(timeout: u32, job: *Job) |
bury | Put job to the failed("buried") state | bury(id: u32, pri: u32) |
kick-job | Put delayed or failed job to the ready state | kick_job(id: u32) |
ignore | Un-subscribe | ignore(tname: []const u8) |
delete | Remove job from the system | delete(id: u32) |
state | Get job state | state(id: u32) |
connect | Connect | connect(allocator: Allocator, addr: ?[]const u8, port: ?u16) |
disconnect | Disconnect | disconnect() |
Add beanstalkz to build.zig.zon:
zig fetch --save=beanstalkz git+https://github.com/g41797/beanstalkz
Add beanstalkz to build.zig:
const beanstalkz = b.dependency("beanstalkz", .{
.target = target,
.optimize = optimize,
});
const lib = b.addStaticLibrary(..);
lib.root_module.addImport("beanstalkz", beanstalkz.module("beanstalkz"));
const lib_unit_tests = b.addTest(...);
lib_unit_tests.root_module.addImport("beanstalkz", beanstalkz.module("beanstalkz"));
Import beanstalkz:
const beanstalkz = @import("beanstalkz");
Content of README is heavily inspired by