diff --git a/config/config.go b/config/config.go index 828bd2b..8500d22 100644 --- a/config/config.go +++ b/config/config.go @@ -9,61 +9,20 @@ import ( "github.com/sirupsen/logrus" ) -const ( - DefaultPoolName = "default" - - minSecondaryStorageThresholdSeconds = 60 * 60 -) - -type SpannerConfig struct { - Project string - Instance string - Database string - CredentialsFile string - TableName string -} - -func (spanner *SpannerConfig) validate() error { - if spanner == nil { - return nil - } - if spanner.Instance == "" || spanner.Project == "" || spanner.Database == "" || spanner.TableName == "" { - return errors.New("'Instance'/'Project'/'Database'/'TableName' should NOT be empty") - } - return nil -} - -type SecondaryStorage struct { - Spanner *SpannerConfig - // max number of jobs that storage pumps per batch - MaxJobPumpBatchSize int64 - // range from 0 to 1, when the redis memory usage is greater than this value, - // the storage won't pump jobs to redis anymore until the memory usage is lower than this value. - // - // Default is 1, means no limit - HighRedisMemoryWatermark float64 -} - -func (storage *SecondaryStorage) validate() error { - if storage.HighRedisMemoryWatermark < 0 || storage.HighRedisMemoryWatermark > 1 { - return fmt.Errorf("invalid HighRedisMemoryWatermark: %f, should be between 0 and 1", storage.HighRedisMemoryWatermark) - } - return storage.Spanner.validate() -} +const DefaultPoolName = "default" type Config struct { - Host string - Port int - AdminHost string - AdminPort int - LogLevel string - LogDir string - LogFormat string - Accounts map[string]string - EnableAccessLog bool - AdminRedis RedisConf - Pool RedisPool - SecondaryStorage *SecondaryStorage + Host string + Port int + AdminHost string + AdminPort int + LogLevel string + LogDir string + LogFormat string + Accounts map[string]string + EnableAccessLog bool + AdminRedis RedisConf + Pool RedisPool // Default publish params TTLSecond int @@ -147,15 +106,5 @@ func MustLoad(path string) (*Config, error) { if err != nil { return nil, errors.New("invalid log level") } - - if conf.SecondaryStorage != nil { - if err := conf.SecondaryStorage.validate(); err != nil { - return nil, err - } - if conf.SecondaryStorage.HighRedisMemoryWatermark == 0 { - // default to 1 - conf.SecondaryStorage.HighRedisMemoryWatermark = 1 - } - } return conf, nil } diff --git a/config/config_test.go b/config/config_test.go index e1cb661..019b5e5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2,19 +2,8 @@ package config import ( "testing" - - "github.com/stretchr/testify/assert" ) -func TestSecondaryStorageConfig_Validate(t *testing.T) { - secondaryStorageConfig := SecondaryStorage{} - assert.Nil(t, secondaryStorageConfig.validate()) - secondaryStorageConfig.Spanner = &SpannerConfig{} - assert.NotNil(t, secondaryStorageConfig.validate()) - secondaryStorageConfig.Spanner = SpannerEmulator - assert.Nil(t, secondaryStorageConfig.validate()) -} - func TestRedisConfig_Validate(t *testing.T) { conf := &RedisConf{} if err := conf.validate(); err == nil { diff --git a/config/preset.go b/config/preset.go index c4504f3..4523bf0 100644 --- a/config/preset.go +++ b/config/preset.go @@ -5,13 +5,6 @@ import ( "github.com/orlangure/gnomock/preset/redis" ) -var SpannerEmulator = &SpannerConfig{ - Project: "test-project", - Instance: "test-instance", - Database: "test-db", - TableName: "lmstfy_jobs", -} - type PresetConfigForTest struct { *Config containers []*gnomock.Container @@ -25,9 +18,6 @@ func CreatePresetForTest(version string, pools ...string) (*PresetConfigForTest, AdminPort: 7778, LogLevel: "INFO", Pool: make(map[string]RedisConf), - SecondaryStorage: &SecondaryStorage{ - Spanner: SpannerEmulator, - }, } p := redis.Preset() @@ -38,7 +28,6 @@ func CreatePresetForTest(version string, pools ...string) (*PresetConfigForTest, addr := defaultContainer.DefaultAddress() cfg.AdminRedis.Addr = addr cfg.Pool[DefaultPoolName] = RedisConf{Addr: addr, Version: version} - cfg.Pool["test-v2"] = RedisConf{Addr: addr, Version: "v2"} containers := []*gnomock.Container{defaultContainer} for _, extraPool := range pools { diff --git a/engine/engine.go b/engine/engine.go index 7e62a05..0613b04 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -1,12 +1,9 @@ package engine import ( - "errors" "io" ) -var ErrJobExisted = errors.New("job existed") - type Engine interface { Publish(job Job) (jobID string, err error) Consume(namespace string, queues []string, ttrSecond, timeoutSecond uint32) (job Job, err error) diff --git a/engine/job.go b/engine/job.go index d88dd01..0a11f00 100644 --- a/engine/job.go +++ b/engine/job.go @@ -25,17 +25,6 @@ type Job interface { encoding.TextMarshaler } -type CreateJobReq struct { - Namespace string - Queue string - ID string - Body []byte - TTL uint32 - Delay uint32 - Tries uint16 - Attributes map[string]string -} - type jobImpl struct { namespace string queue string @@ -67,17 +56,16 @@ func NewJob(namespace, queue string, body []byte, ttl, delay uint32, tries uint1 } } -func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string, attrs map[string]string) Job { +func NewJobWithID(namespace, queue string, body []byte, ttl uint32, tries uint16, jobID string) Job { delay, _ := uuid.ExtractDelaySecondFromUniqueID(jobID) return &jobImpl{ - namespace: namespace, - queue: queue, - id: jobID, - body: body, - ttl: ttl, - delay: delay, - tries: tries, - attributes: attrs, + namespace: namespace, + queue: queue, + id: jobID, + body: body, + ttl: ttl, + delay: delay, + tries: tries, } } diff --git a/engine/pool.go b/engine/pool.go index ab8868e..6d2edd4 100644 --- a/engine/pool.go +++ b/engine/pool.go @@ -4,7 +4,6 @@ import "github.com/bitleak/lmstfy/config" const ( KindRedis = "redis" - KindRedisV2 = "redis_v2" KindMigration = "migration" ) @@ -48,7 +47,7 @@ func GetEngine(pool string) Engine { if pool == "" { pool = config.DefaultPoolName } - kinds := []string{KindRedis, KindRedisV2, KindMigration} + kinds := []string{KindRedis, KindMigration} for _, kind := range kinds { if e := GetEngineByKind(kind, pool); e != nil { return e diff --git a/engine/redis/engine.go b/engine/redis/engine.go index 9599954..9f9b47e 100644 --- a/engine/redis/engine.go +++ b/engine/redis/engine.go @@ -28,9 +28,6 @@ type Engine struct { timer *Timer meta *MetaManager monitor *SizeMonitor - // number of seconds. when job's delay second is greater than pumpStorageThresh, - //it will be written to storage if enabled - storageThresh uint32 } func NewEngine(redisName string, conn *go_redis.Client) (engine.Engine, error) { @@ -180,7 +177,7 @@ func (e *Engine) consumeMulti(namespace string, queues []string, ttrSecond, time default: return nil, fmt.Errorf("pool: %s", err) } - job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID, nil) + job = engine.NewJobWithID(namespace, queueName.Queue, body, ttl, tries, jobID) metrics.jobElapsedMS.WithLabelValues(e.redis.Name, namespace, queueName.Queue).Observe(float64(job.ElapsedMS())) return job, nil } @@ -217,12 +214,12 @@ func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, e // was assigned we should return the not fond error. if optionalJobID == "" && err == engine.ErrNotFound { // return jobID with nil body if the job is expired - return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID, nil), nil + return engine.NewJobWithID(namespace, queue, nil, 0, 0, jobID), nil } if err != nil { return nil, err } - return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID, nil), err + return engine.NewJobWithID(namespace, queue, body, ttl, tries, jobID), err } func (e *Engine) Size(namespace, queue string) (size int64, err error) { diff --git a/server/handlers/middleware.go b/server/handlers/middleware.go index 7946fe1..00f088a 100644 --- a/server/handlers/middleware.go +++ b/server/handlers/middleware.go @@ -102,11 +102,3 @@ func ValidateMultiConsume(c *gin.Context) { return } } - -func CheckPoolExists(c *gin.Context) { - pool := c.Query("pool") - if exists := engine.ExistsPool(pool); !exists { - c.JSON(http.StatusBadRequest, auth.ErrPoolNotExist) - c.Abort() - } -} diff --git a/server/handlers/queue.go b/server/handlers/queue.go index 7604b65..0e3e381 100644 --- a/server/handlers/queue.go +++ b/server/handlers/queue.go @@ -595,16 +595,3 @@ func DestroyQueue(c *gin.Context) { }).Info("queue destroyed") c.Status(http.StatusNoContent) } - -func parseAttributes(c *gin.Context) map[string]string { - attributes := make(map[string]string) - for key, vals := range c.Request.Header { - lowerKey := strings.ToLower(key) - if !strings.HasPrefix(lowerKey, jobAttributeHeaderPrefix) || len(vals) == 0 { - continue - } - field := strings.TrimPrefix(lowerKey, jobAttributeHeaderPrefix) - attributes[field] = vals[0] - } - return attributes -}