diff --git a/README.md b/README.md index 0d7b9c2..66edbae 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ cmdstalk -help # -cmd="": Command to run in worker. # -per-tube=1: Number of workers per tube. # -tubes=[default]: Comma separated list of tubes. +# -max-jobs=0: Maximum number of items to process before exitting. Zero for no limit. # Watch three specific tubes. cmdstalk -cmd="/path/to/your/worker --your=flags --here" -tubes="one,two,three" diff --git a/broker/broker.go b/broker/broker.go index d2c8cf6..7cf2fb4 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -5,6 +5,7 @@ package broker import ( + "context" "fmt" "log" "os" @@ -41,8 +42,10 @@ type Broker struct { // Tube name this broker will service. Tube string - log *log.Logger - results chan<- *JobResult + log *log.Logger + results chan<- *JobResult + jobReceived chan<- struct{} + ctx context.Context } type JobResult struct { @@ -71,13 +74,15 @@ type JobResult struct { } // New broker instance. -func New(address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) { +func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult, jobReceived chan<- struct{}) (b Broker) { b.Address = address b.Tube = tube b.Cmd = cmd b.log = log.New(os.Stdout, fmt.Sprintf("[%s:%d] ", tube, slot), log.LstdFlags) b.results = results + b.jobReceived = jobReceived + b.ctx = ctx return } @@ -94,6 +99,7 @@ func (b *Broker) Run(ticks chan bool) { b.log.Println("watching", b.Tube) ts := beanstalk.NewTubeSet(conn, b.Tube) + b.log.Println("starting reserve loop (waiting for job)") for { if ticks != nil { if _, ok := <-ticks; !ok { @@ -101,10 +107,20 @@ func (b *Broker) Run(ticks chan bool) { } } - b.log.Println("reserve (waiting for job)") - id, body := bs.MustReserveWithoutTimeout(ts) + if isCancelled(b.ctx) { + break + } + + id, body, err := bs.MustReserveWithTimeout(ts, 1*time.Second) + if err == bs.ErrTimeout { + // Doing this to be able to gracefully handle cancelled context. + continue + } + job := bs.NewJob(id, body, conn) + b.jobReceived <- struct{}{} + t, err := job.Timeouts() if err != nil { b.log.Panic(err) @@ -154,6 +170,15 @@ func (b *Broker) Run(ticks chan bool) { b.log.Println("broker finished") } +func isCancelled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + func (b *Broker) executeJob(job bs.Job, shellCmd string) (result *JobResult, err error) { result = &JobResult{JobId: job.Id, Executed: true} diff --git a/broker/broker_dispatcher.go b/broker/broker_dispatcher.go index 47e0b72..bff5dfb 100644 --- a/broker/broker_dispatcher.go +++ b/broker/broker_dispatcher.go @@ -1,7 +1,9 @@ package broker import ( + "context" "log" + "sync" "time" "github.com/kr/beanstalk" @@ -18,19 +20,27 @@ const ( // created. The `perTube` option determines how many brokers are started for // each tube. type BrokerDispatcher struct { - address string - cmd string - conn *beanstalk.Conn - perTube uint64 - tubeSet map[string]bool + address string + cmd string + conn *beanstalk.Conn + perTube uint64 + tubeSet map[string]bool + jobReceived chan<- struct{} + ctx context.Context + wg sync.WaitGroup } -func NewBrokerDispatcher(address, cmd string, perTube uint64) *BrokerDispatcher { +func NewBrokerDispatcher(parentCtx context.Context, address, cmd string, perTube, maxJobs uint64) *BrokerDispatcher { + ctx, cancel := context.WithCancel(parentCtx) + jobReceived := make(chan struct{}) + go limittedCountGenerator(maxJobs, cancel, jobReceived) return &BrokerDispatcher{ - address: address, - cmd: cmd, - perTube: perTube, - tubeSet: make(map[string]bool), + address: address, + cmd: cmd, + perTube: perTube, + tubeSet: make(map[string]bool), + jobReceived: jobReceived, + ctx: ctx, } } @@ -72,13 +82,33 @@ func (bd *BrokerDispatcher) RunAllTubes() (err error) { return } +// limittedCountGenerator creates a channel that returns a boolean channel with +// nlimit true's and false otherwise. If nlimit is 0 it the channel will always +// be containing true. +func limittedCountGenerator(nlimit uint64, cancel context.CancelFunc, eventHappened <-chan struct{}) { + ngenerated := uint64(1) + for range eventHappened { + if nlimit != 0 && ngenerated == nlimit { + log.Println("reached job limit. quitting.") + cancel() + } + ngenerated++ + } +} + func (bd *BrokerDispatcher) runBroker(tube string, slot uint64) { + bd.wg.Add(1) go func() { - b := New(bd.address, tube, slot, bd.cmd, nil) + defer bd.wg.Done() + b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil, bd.jobReceived) b.Run(nil) }() } +func (bd *BrokerDispatcher) Wait() { + bd.wg.Wait() +} + func (bd *BrokerDispatcher) watchNewTubes() (err error) { tubes, err := bd.conn.ListTubes() if err != nil { diff --git a/bs/bs.go b/bs/bs.go index 2d8835a..1fe6347 100644 --- a/bs/bs.go +++ b/bs/bs.go @@ -5,6 +5,7 @@ package bs import ( + "errors" "time" "github.com/kr/beanstalk" @@ -16,18 +17,22 @@ const ( DeadlineSoonDelay = 1 * time.Second ) +var ( + ErrTimeout = errors.New("timeout for reserving a job") +) + // reserve-with-timeout until there's a job or something panic-worthy. // Handles beanstalk.ErrTimeout by retrying immediately. // Handles beanstalk.ErrDeadline by sleeping DeadlineSoonDelay before retry. // panics for other errors. -func MustReserveWithoutTimeout(ts *beanstalk.TubeSet) (id uint64, body []byte) { - var err error +func MustReserveWithTimeout(ts *beanstalk.TubeSet, timeout time.Duration) (id uint64, body []byte, err error) { for { - id, body, err = ts.Reserve(1 * time.Hour) + id, body, err = ts.Reserve(timeout) if err == nil { return } else if err.(beanstalk.ConnError).Err == beanstalk.ErrTimeout { - continue + err = ErrTimeout + return } else if err.(beanstalk.ConnError).Err == beanstalk.ErrDeadline { time.Sleep(DeadlineSoonDelay) continue diff --git a/cli/options.go b/cli/options.go index 5d53443..a2aab8c 100644 --- a/cli/options.go +++ b/cli/options.go @@ -29,6 +29,9 @@ type Options struct { // The beanstalkd tubes to watch. Tubes TubeList + + // Maximum number of jobs to process before exitting. + MaxJobs uint64 } // TubeList is a list of beanstalkd tube names. @@ -54,6 +57,7 @@ func ParseFlags() (o Options, err error) { flag.BoolVar(&o.All, "all", false, "Listen to all tubes, instead of -tubes=...") flag.StringVar(&o.Cmd, "cmd", "", "Command to run in worker.") flag.Uint64Var(&o.PerTube, "per-tube", 1, "Number of workers per tube.") + flag.Uint64Var(&o.MaxJobs, "max-jobs", 0, "Maximum number of items to process before exitting. Zero for no limit.") flag.Var(&o.Tubes, "tubes", "Comma separated list of tubes.") flag.Parse() diff --git a/cmdstalk.go b/cmdstalk.go index 7d197e9..3ab08b2 100644 --- a/cmdstalk.go +++ b/cmdstalk.go @@ -20,6 +20,11 @@ package main import ( + "context" + "log" + "os" + "os/signal" + "github.com/99designs/cmdstalk/broker" "github.com/99designs/cmdstalk/cli" ) @@ -27,7 +32,16 @@ import ( func main() { opts := cli.MustParseFlags() - bd := broker.NewBrokerDispatcher(opts.Address, opts.Cmd, opts.PerTube) + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-c + log.Println("received interrupt. quitting.") + cancel() + }() + + bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube, opts.MaxJobs) if opts.All { bd.RunAllTubes() @@ -35,7 +49,5 @@ func main() { bd.RunTubes(opts.Tubes) } - // TODO: wire up to SIGTERM handler etc. - exitChan := make(chan bool) - <-exitChan + bd.Wait() }