From ebf86dfb03380f98429c2f1f3312650749e6c2da Mon Sep 17 00:00:00 2001 From: Jens Rantil Date: Fri, 29 Jun 2018 19:04:19 +0200 Subject: [PATCH 1/2] fea: graceful shutdown ...on interrupt signal. --- broker/broker.go | 27 ++++++++++++++++++++++++--- broker/broker_dispatcher.go | 15 +++++++++++++-- bs/bs.go | 13 +++++++++---- cmdstalk.go | 20 ++++++++++++++++---- 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index d2c8cf6..5064b6e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -5,6 +5,7 @@ package broker import ( + "context" "fmt" "log" "os" @@ -43,6 +44,7 @@ type Broker struct { log *log.Logger results chan<- *JobResult + ctx context.Context } type JobResult struct { @@ -71,13 +73,14 @@ type JobResult struct { } // New broker instance. -func New(address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) { +func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) { b.Address = address b.Tube = tube b.Cmd = cmd b.log = log.New(os.Stdout, fmt.Sprintf("[%s:%d] ", tube, slot), log.LstdFlags) b.results = results + b.ctx = ctx return } @@ -94,6 +97,7 @@ func (b *Broker) Run(ticks chan bool) { b.log.Println("watching", b.Tube) ts := beanstalk.NewTubeSet(conn, b.Tube) + b.log.Println("starting reserve loop (waiting for job)") for { if ticks != nil { if _, ok := <-ticks; !ok { @@ -101,8 +105,16 @@ func (b *Broker) Run(ticks chan bool) { } } - b.log.Println("reserve (waiting for job)") - id, body := bs.MustReserveWithoutTimeout(ts) + if isCancelled(b.ctx) { + break + } + + id, body, err := bs.MustReserveWithTimeout(ts, 1*time.Second) + if err == bs.ErrTimeout { + // Doing this to be able to gracefully handle cancelled context. + continue + } + job := bs.NewJob(id, body, conn) t, err := job.Timeouts() @@ -154,6 +166,15 @@ func (b *Broker) Run(ticks chan bool) { b.log.Println("broker finished") } +func isCancelled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + func (b *Broker) executeJob(job bs.Job, shellCmd string) (result *JobResult, err error) { result = &JobResult{JobId: job.Id, Executed: true} diff --git a/broker/broker_dispatcher.go b/broker/broker_dispatcher.go index 47e0b72..75398c8 100644 --- a/broker/broker_dispatcher.go +++ b/broker/broker_dispatcher.go @@ -1,7 +1,9 @@ package broker import ( + "context" "log" + "sync" "time" "github.com/kr/beanstalk" @@ -23,14 +25,17 @@ type BrokerDispatcher struct { conn *beanstalk.Conn perTube uint64 tubeSet map[string]bool + ctx context.Context + wg sync.WaitGroup } -func NewBrokerDispatcher(address, cmd string, perTube uint64) *BrokerDispatcher { +func NewBrokerDispatcher(ctx context.Context, address, cmd string, perTube uint64) *BrokerDispatcher { return &BrokerDispatcher{ address: address, cmd: cmd, perTube: perTube, tubeSet: make(map[string]bool), + ctx: ctx, } } @@ -73,12 +78,18 @@ func (bd *BrokerDispatcher) RunAllTubes() (err error) { } func (bd *BrokerDispatcher) runBroker(tube string, slot uint64) { + bd.wg.Add(1) go func() { - b := New(bd.address, tube, slot, bd.cmd, nil) + defer bd.wg.Done() + b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil) b.Run(nil) }() } +func (bd *BrokerDispatcher) Wait() { + bd.wg.Wait() +} + func (bd *BrokerDispatcher) watchNewTubes() (err error) { tubes, err := bd.conn.ListTubes() if err != nil { diff --git a/bs/bs.go b/bs/bs.go index 2d8835a..1fe6347 100644 --- a/bs/bs.go +++ b/bs/bs.go @@ -5,6 +5,7 @@ package bs import ( + "errors" "time" "github.com/kr/beanstalk" @@ -16,18 +17,22 @@ const ( DeadlineSoonDelay = 1 * time.Second ) +var ( + ErrTimeout = errors.New("timeout for reserving a job") +) + // reserve-with-timeout until there's a job or something panic-worthy. // Handles beanstalk.ErrTimeout by retrying immediately. // Handles beanstalk.ErrDeadline by sleeping DeadlineSoonDelay before retry. // panics for other errors. -func MustReserveWithoutTimeout(ts *beanstalk.TubeSet) (id uint64, body []byte) { - var err error +func MustReserveWithTimeout(ts *beanstalk.TubeSet, timeout time.Duration) (id uint64, body []byte, err error) { for { - id, body, err = ts.Reserve(1 * time.Hour) + id, body, err = ts.Reserve(timeout) if err == nil { return } else if err.(beanstalk.ConnError).Err == beanstalk.ErrTimeout { - continue + err = ErrTimeout + return } else if err.(beanstalk.ConnError).Err == beanstalk.ErrDeadline { time.Sleep(DeadlineSoonDelay) continue diff --git a/cmdstalk.go b/cmdstalk.go index 7d197e9..f3f6cf7 100644 --- a/cmdstalk.go +++ b/cmdstalk.go @@ -20,6 +20,11 @@ package main import ( + "context" + "log" + "os" + "os/signal" + "github.com/99designs/cmdstalk/broker" "github.com/99designs/cmdstalk/cli" ) @@ -27,7 +32,16 @@ import ( func main() { opts := cli.MustParseFlags() - bd := broker.NewBrokerDispatcher(opts.Address, opts.Cmd, opts.PerTube) + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-c + log.Println("received interrupt. quitting.") + cancel() + }() + + bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube) if opts.All { bd.RunAllTubes() @@ -35,7 +49,5 @@ func main() { bd.RunTubes(opts.Tubes) } - // TODO: wire up to SIGTERM handler etc. - exitChan := make(chan bool) - <-exitChan + bd.Wait() } From 2468bf0000374d4604b8c428b683f459b86fe64b Mon Sep 17 00:00:00 2001 From: Jens Rantil Date: Fri, 29 Jun 2018 19:06:26 +0200 Subject: [PATCH 2/2] fea: support job count limit Use case: I have 100k items I'd like to process when my system is under low load. By submitting 100 `batch`[1] jobs doing cmdstalk -cmd=my-script.sh -max-jobs 1000 I know processing will halt as soon as load is above 0.8 (or whatever `batch` is configured to). [1] https://www.centos.org/docs/5/html/Deployment_Guide-en-US/s1-autotasks-at-batch.html --- README.md | 1 + broker/broker.go | 12 ++++++---- broker/broker_dispatcher.go | 47 ++++++++++++++++++++++++++----------- cli/options.go | 4 ++++ cmdstalk.go | 2 +- 5 files changed, 47 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 0d7b9c2..66edbae 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ cmdstalk -help # -cmd="": Command to run in worker. # -per-tube=1: Number of workers per tube. # -tubes=[default]: Comma separated list of tubes. +# -max-jobs=0: Maximum number of items to process before exitting. Zero for no limit. # Watch three specific tubes. cmdstalk -cmd="/path/to/your/worker --your=flags --here" -tubes="one,two,three" diff --git a/broker/broker.go b/broker/broker.go index 5064b6e..7cf2fb4 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -42,9 +42,10 @@ type Broker struct { // Tube name this broker will service. Tube string - log *log.Logger - results chan<- *JobResult - ctx context.Context + log *log.Logger + results chan<- *JobResult + jobReceived chan<- struct{} + ctx context.Context } type JobResult struct { @@ -73,13 +74,14 @@ type JobResult struct { } // New broker instance. -func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult) (b Broker) { +func New(ctx context.Context, address, tube string, slot uint64, cmd string, results chan<- *JobResult, jobReceived chan<- struct{}) (b Broker) { b.Address = address b.Tube = tube b.Cmd = cmd b.log = log.New(os.Stdout, fmt.Sprintf("[%s:%d] ", tube, slot), log.LstdFlags) b.results = results + b.jobReceived = jobReceived b.ctx = ctx return } @@ -117,6 +119,8 @@ func (b *Broker) Run(ticks chan bool) { job := bs.NewJob(id, body, conn) + b.jobReceived <- struct{}{} + t, err := job.Timeouts() if err != nil { b.log.Panic(err) diff --git a/broker/broker_dispatcher.go b/broker/broker_dispatcher.go index 75398c8..bff5dfb 100644 --- a/broker/broker_dispatcher.go +++ b/broker/broker_dispatcher.go @@ -20,22 +20,27 @@ const ( // created. The `perTube` option determines how many brokers are started for // each tube. type BrokerDispatcher struct { - address string - cmd string - conn *beanstalk.Conn - perTube uint64 - tubeSet map[string]bool - ctx context.Context - wg sync.WaitGroup + address string + cmd string + conn *beanstalk.Conn + perTube uint64 + tubeSet map[string]bool + jobReceived chan<- struct{} + ctx context.Context + wg sync.WaitGroup } -func NewBrokerDispatcher(ctx context.Context, address, cmd string, perTube uint64) *BrokerDispatcher { +func NewBrokerDispatcher(parentCtx context.Context, address, cmd string, perTube, maxJobs uint64) *BrokerDispatcher { + ctx, cancel := context.WithCancel(parentCtx) + jobReceived := make(chan struct{}) + go limittedCountGenerator(maxJobs, cancel, jobReceived) return &BrokerDispatcher{ - address: address, - cmd: cmd, - perTube: perTube, - tubeSet: make(map[string]bool), - ctx: ctx, + address: address, + cmd: cmd, + perTube: perTube, + tubeSet: make(map[string]bool), + jobReceived: jobReceived, + ctx: ctx, } } @@ -77,11 +82,25 @@ func (bd *BrokerDispatcher) RunAllTubes() (err error) { return } +// limittedCountGenerator creates a channel that returns a boolean channel with +// nlimit true's and false otherwise. If nlimit is 0 it the channel will always +// be containing true. +func limittedCountGenerator(nlimit uint64, cancel context.CancelFunc, eventHappened <-chan struct{}) { + ngenerated := uint64(1) + for range eventHappened { + if nlimit != 0 && ngenerated == nlimit { + log.Println("reached job limit. quitting.") + cancel() + } + ngenerated++ + } +} + func (bd *BrokerDispatcher) runBroker(tube string, slot uint64) { bd.wg.Add(1) go func() { defer bd.wg.Done() - b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil) + b := New(bd.ctx, bd.address, tube, slot, bd.cmd, nil, bd.jobReceived) b.Run(nil) }() } diff --git a/cli/options.go b/cli/options.go index 5d53443..a2aab8c 100644 --- a/cli/options.go +++ b/cli/options.go @@ -29,6 +29,9 @@ type Options struct { // The beanstalkd tubes to watch. Tubes TubeList + + // Maximum number of jobs to process before exitting. + MaxJobs uint64 } // TubeList is a list of beanstalkd tube names. @@ -54,6 +57,7 @@ func ParseFlags() (o Options, err error) { flag.BoolVar(&o.All, "all", false, "Listen to all tubes, instead of -tubes=...") flag.StringVar(&o.Cmd, "cmd", "", "Command to run in worker.") flag.Uint64Var(&o.PerTube, "per-tube", 1, "Number of workers per tube.") + flag.Uint64Var(&o.MaxJobs, "max-jobs", 0, "Maximum number of items to process before exitting. Zero for no limit.") flag.Var(&o.Tubes, "tubes", "Comma separated list of tubes.") flag.Parse() diff --git a/cmdstalk.go b/cmdstalk.go index f3f6cf7..3ab08b2 100644 --- a/cmdstalk.go +++ b/cmdstalk.go @@ -41,7 +41,7 @@ func main() { cancel() }() - bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube) + bd := broker.NewBrokerDispatcher(ctx, opts.Address, opts.Cmd, opts.PerTube, opts.MaxJobs) if opts.All { bd.RunAllTubes()