From d4c7d21c60a4ebb8f0e1a5171d3128e5d10dcb6e Mon Sep 17 00:00:00 2001 From: Shun Date: Mon, 4 Dec 2023 16:49:52 +0800 Subject: [PATCH] feat(sentinel): Add support for Redis Sentinel (#29) --- options.go | 16 ++++++++++++++++ redis.go | 7 +++++++ redis_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/options.go b/options.go index 2d773d5..1af8993 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,8 @@ type options struct { channelName string channelSize int cluster bool + sentinel bool + masterName string } // WithAddr setup the addr of redis @@ -43,6 +45,20 @@ func WithCluster(enable bool) Option { } } +// WithSentinel redis sentinel +func WithSentinel(enable bool) Option { + return func(w *options) { + w.sentinel = enable + } +} + +// WithMasterName sentinel master name +func WithMasterName(masterName string) Option { + return func(w *options) { + w.masterName = masterName + } +} + // WithChannelSize redis channel size func WithChannelSize(size int) Option { return func(w *options) { diff --git a/redis.go b/redis.go index b1ce153..98738df 100644 --- a/redis.go +++ b/redis.go @@ -49,6 +49,13 @@ func NewWorker(opts ...Option) *Worker { Addrs: strings.Split(w.opts.addr, ","), Password: w.opts.password, }) + } else if w.opts.sentinel { + w.rdb = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: w.opts.masterName, + SentinelAddrs: strings.Split(w.opts.addr, ","), + Password: w.opts.password, + DB: w.opts.db, + }) } else { options := &redis.Options{ Addr: w.opts.addr, diff --git a/redis_test.go b/redis_test.go index c67ef73..ceb2c29 100644 --- a/redis_test.go +++ b/redis_test.go @@ -130,6 +130,37 @@ func TestRedisCluster(t *testing.T) { // you will see the execute time > 1000ms } +func TestRedisSentinel(t *testing.T) { + t.Helper() + m := &mockMessage{ + Message: "foo", + } + hosts := []string{host + ":26379", host + ":26380"} + + w := NewWorker( + WithAddr(strings.Join(hosts, ",")), + WithMasterName("mymaster"), + WithChannel("testSentinel"), + WithSentinel(true), + WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error { + time.Sleep(500 * time.Millisecond) + return nil + }), + ) + q := queue.NewPool( + 5, + queue.WithWorker(w), + ) + time.Sleep(100 * time.Millisecond) + assert.NoError(t, q.Queue(m)) + assert.NoError(t, q.Queue(m)) + assert.NoError(t, q.Queue(m)) + assert.NoError(t, q.Queue(m)) + time.Sleep(1000 * time.Millisecond) + q.Release() + // you will see the execute time > 1000ms +} + func TestEnqueueJobAfterShutdown(t *testing.T) { m := mockMessage{ Message: "foo",