Skip to content

Commit

Permalink
Follow up #224 to clean the redis v2
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Jul 9, 2024
1 parent d55c106 commit dd55a0a
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 137 deletions.
75 changes: 12 additions & 63 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,20 @@ import (
"github.com/sirupsen/logrus"
)

const (
DefaultPoolName = "default"

minSecondaryStorageThresholdSeconds = 60 * 60
)

type SpannerConfig struct {
Project string
Instance string
Database string
CredentialsFile string
TableName string
}

func (spanner *SpannerConfig) validate() error {
if spanner == nil {
return nil
}
if spanner.Instance == "" || spanner.Project == "" || spanner.Database == "" || spanner.TableName == "" {
return errors.New("'Instance'/'Project'/'Database'/'TableName' should NOT be empty")
}
return nil
}

type SecondaryStorage struct {
Spanner *SpannerConfig
// max number of jobs that storage pumps per batch
MaxJobPumpBatchSize int64
// range from 0 to 1, when the redis memory usage is greater than this value,
// the storage won't pump jobs to redis anymore until the memory usage is lower than this value.
//
// Default is 1, means no limit
HighRedisMemoryWatermark float64
}

func (storage *SecondaryStorage) validate() error {
if storage.HighRedisMemoryWatermark < 0 || storage.HighRedisMemoryWatermark > 1 {
return fmt.Errorf("invalid HighRedisMemoryWatermark: %f, should be between 0 and 1", storage.HighRedisMemoryWatermark)
}
return storage.Spanner.validate()
}
const DefaultPoolName = "default"

type Config struct {
Host string
Port int
AdminHost string
AdminPort int
LogLevel string
LogDir string
LogFormat string
Accounts map[string]string
EnableAccessLog bool
AdminRedis RedisConf
Pool RedisPool
SecondaryStorage *SecondaryStorage
Host string
Port int
AdminHost string
AdminPort int
LogLevel string
LogDir string
LogFormat string
Accounts map[string]string
EnableAccessLog bool
AdminRedis RedisConf
Pool RedisPool

// Default publish params
TTLSecond int
Expand Down Expand Up @@ -147,15 +106,5 @@ func MustLoad(path string) (*Config, error) {
if err != nil {
return nil, errors.New("invalid log level")
}

if conf.SecondaryStorage != nil {
if err := conf.SecondaryStorage.validate(); err != nil {
return nil, err
}
if conf.SecondaryStorage.HighRedisMemoryWatermark == 0 {
// default to 1
conf.SecondaryStorage.HighRedisMemoryWatermark = 1
}
}
return conf, nil
}
11 changes: 0 additions & 11 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,8 @@ package config

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSecondaryStorageConfig_Validate(t *testing.T) {
secondaryStorageConfig := SecondaryStorage{}
assert.Nil(t, secondaryStorageConfig.validate())
secondaryStorageConfig.Spanner = &SpannerConfig{}
assert.NotNil(t, secondaryStorageConfig.validate())
secondaryStorageConfig.Spanner = SpannerEmulator
assert.Nil(t, secondaryStorageConfig.validate())
}

func TestRedisConfig_Validate(t *testing.T) {
conf := &RedisConf{}
if err := conf.validate(); err == nil {
Expand Down
11 changes: 0 additions & 11 deletions config/preset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ import (
"github.com/orlangure/gnomock/preset/redis"
)

var SpannerEmulator = &SpannerConfig{
Project: "test-project",
Instance: "test-instance",
Database: "test-db",
TableName: "lmstfy_jobs",
}

type PresetConfigForTest struct {
*Config
containers []*gnomock.Container
Expand All @@ -25,9 +18,6 @@ func CreatePresetForTest(version string, pools ...string) (*PresetConfigForTest,
AdminPort: 7778,
LogLevel: "INFO",
Pool: make(map[string]RedisConf),
SecondaryStorage: &SecondaryStorage{
Spanner: SpannerEmulator,
},
}

p := redis.Preset()
Expand All @@ -38,7 +28,6 @@ func CreatePresetForTest(version string, pools ...string) (*PresetConfigForTest,
addr := defaultContainer.DefaultAddress()
cfg.AdminRedis.Addr = addr
cfg.Pool[DefaultPoolName] = RedisConf{Addr: addr, Version: version}
cfg.Pool["test-v2"] = RedisConf{Addr: addr, Version: "v2"}

containers := []*gnomock.Container{defaultContainer}
for _, extraPool := range pools {
Expand Down
3 changes: 0 additions & 3 deletions engine/engine.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package engine

import (
"errors"
"io"
)

var ErrJobExisted = errors.New("job existed")

type Engine interface {
Publish(job Job) (jobID string, err error)
Consume(namespace string, queues []string, ttrSecond, timeoutSecond uint32) (job Job, err error)
Expand Down
28 changes: 8 additions & 20 deletions engine/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ type Job interface {
encoding.TextMarshaler
}

type CreateJobReq struct {
Namespace string
Queue string
ID string
Body []byte
TTL uint32
Delay uint32
Tries uint16
Attributes map[string]string
}

type jobImpl struct {
namespace string
queue string
Expand Down Expand Up @@ -67,17 +56,16 @@ func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint1
}
}

func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string, attrs map[string]string) Job {
func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string) Job {
delay, _ := uuid.ExtractDelaySecondFromUniqueID(jobID)
return &jobImpl{
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
attributes: attrs,
namespace: namespace,
queue: queue,
id: jobID,
body: body,
ttl: ttl,
delay: delay,
tries: tries,
}
}

Expand Down
3 changes: 1 addition & 2 deletions engine/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import "github.com/bitleak/lmstfy/config"

const (
KindRedis = "redis"
KindRedisV2 = "redis_v2"
KindMigration = "migration"
)

Expand Down Expand Up @@ -48,7 +47,7 @@ func GetEngine(pool string) Engine {
if pool == "" {
pool = config.DefaultPoolName
}
kinds := []string{KindRedis, KindRedisV2, KindMigration}
kinds := []string{KindRedis, KindMigration}
for _, kind := range kinds {
if e := GetEngineByKind(kind, pool); e != nil {
return e
Expand Down
9 changes: 3 additions & 6 deletions engine/redis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type Engine struct {
timer *Timer
meta *MetaManager
monitor *SizeMonitor
// number of seconds. when job's delay second is greater than pumpStorageThresh,
//it will be written to storage if enabled
storageThresh uint32
}

func NewEngine(redisName string, conn *go_redis.Client) (engine.Engine, error) {
Expand Down Expand Up @@ -180,7 +177,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time
default:
return nil, fmt.Errorf("pool: %s", err)
}
job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID, nil)
job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID)
metrics.jobElapsedMS.WithLabelValues(e.redis.Name, namespace, queueName.Queue).Observe(float64(job.ElapsedMS()))
return job, nil
}
Expand Down Expand Up @@ -217,12 +214,12 @@ func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, e
// was assigned we should return the not fond error.
if optionalJobID == "" && err == engine.ErrNotFound {
// return jobID with nil body if the job is expired
return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID, nil), nil
return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID), nil
}
if err != nil {
return nil, err
}
return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID, nil), err
return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID), err
}

func (e *Engine) Size(namespace, queue string) (size int64, err error) {
Expand Down
8 changes: 0 additions & 8 deletions server/handlers/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,3 @@ func ValidateMultiConsume(c *gin.Context) {
return
}
}

func CheckPoolExists(c *gin.Context) {
pool := c.Query("pool")
if exists := engine.ExistsPool(pool); !exists {
c.JSON(http.StatusBadRequest, auth.ErrPoolNotExist)
c.Abort()
}
}
13 changes: 0 additions & 13 deletions server/handlers/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,3 @@ func DestroyQueue(c *gin.Context) {
}).Info("queue destroyed")
c.Status(http.StatusNoContent)
}

func parseAttributes(c *gin.Context) map[string]string {
attributes := make(map[string]string)
for key, vals := range c.Request.Header {
lowerKey := strings.ToLower(key)
if !strings.HasPrefix(lowerKey, jobAttributeHeaderPrefix) || len(vals) == 0 {
continue
}
field := strings.TrimPrefix(lowerKey, jobAttributeHeaderPrefix)
attributes[field] = vals[0]
}
return attributes
}

0 comments on commit dd55a0a

Please sign in to comment.