Skip to content

Zig client for Beanstalkd - simple and fast general purpose work queue

License

Notifications You must be signed in to change notification settings

g41797/beanstalkz

Repository files navigation

Zig client for Beanstalkd

CI

Beanstalkd is

        Simple and fast general purpose work queue.
    The beauty of Beanstalkd is its absolute simplicity.

Actually you can use just 3 commands

  • submit(put) job into the queue
  • take(reserve) job from the queue for processing
  • delete job from the queue
    // On producer side
    _ = try producer.put(1, 0, 120, "job data");
    
    // On worker side
    var job: Job = .{};
    try job.init(allocator);
    defer job.deinit();
    
    try worker.reserve(10, &job);
        
    // job.body().? - contains "job data"
    // process job
    // ...........
    
    // job.id().?   - contains job id
    try worker.delete(job.id().?);

Beanstalkd is the part of main distros, you can install it using appropriate package manager.

And of course you can use Beanstalkd with Docker.

If you don't have experience using Beanstalkd, it's a good idea to read:

Job

Job is opaque array of bytes. Beanstalkd does not force you to use a specific data format.

After being placed in a queue, job can be in the following states:

  • delayed (waiting for time-out before moving to next state)
  • ready (for processing)
  • reserved (processed)
  • buried (failed)

Job lifecycle supported by beanstalkz

  'put' with delay                                'delete'             
  ----------------> [DELAYED] ---------------------------X
                        |     
                        | 'kick-job' or time passes
                        |              
  'put'                 v     'reserve'           'delete'
  -----------------> [READY] ---------> [RESERVED] ------X
                       |  ^                 |  
                       |  |                 | 'bury'
                       |  |   'kick-job'    v     'delete'
                       |   `------------ [BURIED]  ------X    
                       |                  
                       |                          'delete'
                        `--------------------------------X

Tube

Instead of the term queue Beanstalkd uses term tube, this explains the picture above.

Tube is named queue.

Tubes are created on demand whenever they are referenced.

If a tube is empty (that is, it contains no ready, delayed, or buried jobs)

and no client refers to it, it will be deleted.

If tube was not referenced, Beanstalkd creates "default" tube.

Every tube has 3 sub-queues:

  • delay - contains jobs in delayed state
  • ready - contains jobs in ready or reserved states
  • bury (dead-letter) - contains failed jobs

Supported commands

Name Description API
use Set current tube(queue) use(tname: []const u8)
put Submit job to current tube put(pri: u32, delay: u32, ttr: u32, job: []const u8)
watch Subscribe to jobs submitted to the tube watch(tname: []const u8)
reserve Consume job reserve(timeout: u32, job: *Job)
bury Put job to the failed("buried") state bury(id: u32, pri: u32)
kick-job Put delayed or failed job to the ready state kick_job(id: u32)
ignore Un-subscribe ignore(tname: []const u8)
delete Remove job from the system delete(id: u32)
state Get job state state(id: u32)
connect Connect connect(allocator: Allocator, addr: ?[]const u8, port: ?u16)
disconnect Disconnect disconnect()

Installation

Add beanstalkz to build.zig.zon:

zig fetch --save=beanstalkz git+https://github.com/g41797/beanstalkz

Add beanstalkz to build.zig:

    const beanstalkz = b.dependency("beanstalkz", .{
        .target = target,
        .optimize = optimize,
    });

    const lib = b.addStaticLibrary(..);
    lib.root_module.addImport("beanstalkz", beanstalkz.module("beanstalkz"));

    const lib_unit_tests = b.addTest(...);
    lib_unit_tests.root_module.addImport("beanstalkz", beanstalkz.module("beanstalkz"));

Import beanstalkz:

const beanstalkz = @import("beanstalkz");

Credits

Content of README is heavily inspired by

License

MIT