Skip to content

Commit

Permalink
Fix return wrong TTL when pumping job from the secondary storage (#218)
Browse files Browse the repository at this point in the history
Currently, we put now() + TTL as the expired time when storing jobs in the database,
but didn't remove the now() after pumping jobs. This PR also removes
the unused function `BatchGetJobs` which is similar to GetReadyJobs.
  • Loading branch information
git-hulk authored Sep 27, 2023
1 parent 6122a4b commit dccad8c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 181 deletions.
7 changes: 7 additions & 0 deletions storage/persistence/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ type DBJob struct {
CreatedTime int64 `spanner:"created_time" json:"created_time"`
}

func (j *DBJob) TTL(now int64) int64 {
if j.ExpiredTime == 0 {
return 0
}
return j.ExpiredTime - now
}

type DBJobReq struct {
PoolName string
Namespace string
Expand Down
55 changes: 11 additions & 44 deletions storage/persistence/spanner/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package spanner
import (
"context"
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"testing"

"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
Expand All @@ -15,14 +15,6 @@ import (
"google.golang.org/grpc/codes"

"github.com/bitleak/lmstfy/config"
"github.com/bitleak/lmstfy/engine"
"github.com/bitleak/lmstfy/storage/persistence/model"
)

var (
poolName = "default"
jobIDs = []string{"1", "2", "3"}
ctx = context.Background()
)

func CreateInstance(ctx context.Context, cfg *config.SpannerConfig) error {
Expand Down Expand Up @@ -67,7 +59,7 @@ func CreateDatabase(ctx context.Context, cfg *config.SpannerConfig) error {
return nil
}

ddlBytes, err := ioutil.ReadFile("../../../scripts/schemas/spanner/ddls.sql")
ddlBytes, err := os.ReadFile("../../../scripts/schemas/spanner/ddls.sql")
if err != nil {
return fmt.Errorf("read ddls file: %w", err)
}
Expand All @@ -90,40 +82,15 @@ func CreateDatabase(ctx context.Context, cfg *config.SpannerConfig) error {
return err
}

func createTestJobsData() []engine.Job {
jobs := make([]engine.Job, 0)
j1 := engine.NewJob("n1", "q1", []byte("hello_j1"), 120, 30, 1, "1")
j2 := engine.NewJob("n1", "q2", []byte("hello_j2"), 120, 60, 1, "2")
j3 := engine.NewJob("n1", "q1", []byte("hello_j3"), 120, 90, 1, "3")
jobs = append(jobs, j1, j2, j3)
return jobs
}

func createTestReqData() []*model.DBJobReq {
req := make([]*model.DBJobReq, 0)
r1 := &model.DBJobReq{
PoolName: poolName,
Namespace: "n1",
Queue: "q1",
ReadyTime: 0,
Count: 10,
func TestMain(m *testing.M) {
if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
panic("SPANNER_EMULATOR_HOST is not set")
}
r2 := &model.DBJobReq{
PoolName: poolName,
Namespace: "n1",
Queue: "q2",
ReadyTime: 0,
Count: 10,
if err := CreateInstance(context.Background(), config.SpannerEmulator); err != nil {
panic("Create instance: " + err.Error())
}
req = append(req, r1, r2)
return req
}

func createTestReqData2() *model.DBJobReq {
req := &model.DBJobReq{
PoolName: poolName,
ReadyTime: time.Now().Unix() + 80,
Count: 10,
if err := CreateDatabase(context.Background(), config.SpannerEmulator); err != nil {
panic("Create database: " + err.Error())
}
return req
os.Exit(m.Run())
}
59 changes: 14 additions & 45 deletions storage/persistence/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ func (s *Spanner) BatchAddJobs(ctx context.Context, poolName string, jobs []engi
now := time.Now().Unix()
dbJobs := make([]*model.DBJob, 0)
for _, job := range jobs {
expiredTime := int64(0)
if job.TTL() > 0 {
expiredTime = now + int64(job.TTL())
}
j := &model.DBJob{
PoolName: poolName,
JobID: job.ID(),
Namespace: job.Namespace(),
Queue: job.Queue(),
Body: job.Body(),
ExpiredTime: now + int64(job.TTL()),
ExpiredTime: expiredTime,
ReadyTime: now + int64(job.Delay()),
Tries: int64(job.Tries()),
CreatedTime: now,
Expand All @@ -82,42 +86,6 @@ func (s *Spanner) BatchAddJobs(ctx context.Context, poolName string, jobs []engi
return err
}

// BatchGetJobs pumps data that are due before certain due time
func (s *Spanner) BatchGetJobs(ctx context.Context, req []*model.DBJobReq) (jobs []engine.Job, err error) {
txn := s.cli.ReadOnlyTransaction()
now := time.Now().Unix()
defer txn.Close()

for _, r := range req {
iter := txn.Query(ctx, spanner.Statement{
SQL: "SELECT pool_name, job_id, namespace, queue, body, ready_time, expired_time, created_time, tries " +
"FROM lmstfy_jobs WHERE pool_name = @poolname and namespace = @namespace and queue = @queue and ready_time >= @readytime LIMIT @limit",
Params: map[string]interface{}{
"poolname": r.PoolName,
"namespace": r.Namespace,
"queue": r.Queue,
"readytime": r.ReadyTime,
"limit": r.Count,
},
})
err = iter.Do(func(row *spanner.Row) error {
elem := &model.DBJob{}
if err = row.ToStruct(elem); err != nil {
return err
}
j := engine.NewJob(elem.Namespace, elem.Queue, elem.Body, uint32(elem.ExpiredTime),
uint32(elem.ReadyTime-now), uint16(elem.Tries), elem.JobID)
jobs = append(jobs, j)
return nil
})

if err != nil {
return nil, err
}
}
return jobs, nil
}

// GetQueueSize returns the size of data in storage which are due before certain due time
func (s *Spanner) GetQueueSize(ctx context.Context, req []*model.DBJobReq) (count map[string]int64, err error) {
txn := s.cli.ReadOnlyTransaction()
Expand Down Expand Up @@ -180,13 +148,14 @@ func (s *Spanner) GetReadyJobs(ctx context.Context, req *model.DBJobReq) (jobs [
"limit": req.Count,
},
})

err = iter.Do(func(row *spanner.Row) error {
elem := &model.DBJob{}
if err = row.ToStruct(elem); err != nil {
dbJob := &model.DBJob{}
if err = row.ToStruct(dbJob); err != nil {
return err
}
j := engine.NewJob(elem.Namespace, elem.Queue, elem.Body, uint32(elem.ExpiredTime),
uint32(elem.ReadyTime-now), uint16(elem.Tries), elem.JobID)
j := engine.NewJob(dbJob.Namespace, dbJob.Queue, dbJob.Body, uint32(dbJob.TTL(now)),
uint32(dbJob.ReadyTime-now), uint16(dbJob.Tries), dbJob.JobID)
jobs = append(jobs, j)
return nil
})
Expand All @@ -211,12 +180,12 @@ func (s *Spanner) BatchGetJobsByID(ctx context.Context, IDs []string) (jobs []en
},
})
err = iter.Do(func(row *spanner.Row) error {
elem := &model.DBJob{}
if err = row.ToStruct(elem); err != nil {
dbJob := &model.DBJob{}
if err = row.ToStruct(dbJob); err != nil {
return err
}
j := engine.NewJob(elem.Namespace, elem.Queue, elem.Body, uint32(elem.ExpiredTime),
uint32(elem.ReadyTime-now), uint16(elem.Tries), elem.JobID)
j := engine.NewJob(dbJob.Namespace, dbJob.Queue, dbJob.Body, uint32(dbJob.TTL(now)),
uint32(dbJob.ReadyTime-now), uint16(dbJob.Tries), dbJob.JobID)
jobs = append(jobs, j)
return nil
})
Expand Down
173 changes: 83 additions & 90 deletions storage/persistence/spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,115 +3,108 @@ package spanner
import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/bitleak/lmstfy/config"
"github.com/bitleak/lmstfy/engine"
"github.com/bitleak/lmstfy/storage/persistence/model"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
dummyCtx = context.TODO()
const (
poolName = "test_pool"
namespace = "test_ns"
)

func init() {
if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
panic(fmt.Sprintf("failed to find $SPANNER_EMULATOR_HOST value"))
}
err := CreateInstance(dummyCtx, config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("create instance error: %v", err))
func TestSpanner_Basic(t *testing.T) {
ctx := context.Background()
mgr, err := NewSpanner(config.SpannerEmulator)
require.NoError(t, err)

jobCnt := int64(10)
jobIDs := make([]string, jobCnt)
createJobs := make([]engine.Job, jobCnt)
for i := int64(0); i < jobCnt; i++ {
queue := "q1"
if i%2 == 0 {
queue = "q2"
}
createJobs[i] = engine.NewJob(namespace, queue, []byte("hello"), 10, 4, 3, "")
jobIDs[i] = createJobs[i].ID()
}
err = CreateDatabase(dummyCtx, config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("create db error: %v", err))
require.NoError(t, mgr.BatchAddJobs(ctx, poolName, createJobs))

validateJob := func(t *testing.T, job engine.Job) {
assert.NotEmpty(t, job.ID())
assert.EqualValues(t, job.Namespace(), namespace)
assert.EqualValues(t, job.Tries(), 3)
assert.GreaterOrEqual(t, job.Delay(), uint32(1))
assert.LessOrEqual(t, job.Delay(), uint32(4))
assert.GreaterOrEqual(t, job.TTL(), uint32(1))
assert.LessOrEqual(t, job.TTL(), uint32(10))
}
}

func TestCreateSpannerClient(t *testing.T) {
_, err := createSpannerClient(config.SpannerEmulator)
assert.Nil(t, err)
}
t.Run("Batch Get Jobs By ID", func(t *testing.T) {
jobs, err := mgr.BatchGetJobsByID(ctx, jobIDs)
assert.Nil(t, err)
assert.EqualValues(t, len(jobIDs), len(jobs))
for _, job := range jobs {
validateJob(t, job)
}
})

func TestSpanner_BatchAddDelJobs(t *testing.T) {
mgr, err := NewSpanner(config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("Failed to create spanner client with error: %s", err))
}
jobs := createTestJobsData()
err = mgr.BatchAddJobs(ctx, poolName, jobs)
if err != nil {
panic(fmt.Sprintf("Failed to add jobs with error: %s", err))
}
t.Logf("add jobs success %v rows", len(jobs))
t.Run("Get Ready Jobs", func(t *testing.T) {
readyJobs, err := mgr.GetReadyJobs(ctx, &model.DBJobReq{
PoolName: poolName,
ReadyTime: time.Now().Unix() + 10,
Count: jobCnt,
})
require.NoError(t, err)
require.EqualValues(t, jobCnt, len(readyJobs))
for _, job := range readyJobs {
validateJob(t, job)
}
})

count, err := mgr.DelJobs(ctx, jobIDs)
if err != nil {
panic(fmt.Sprintf("failed to delete job: %v", err))
}
t.Logf("del jobs success %v rows", count)
}
t.Run("Get Queue Size", func(t *testing.T) {
queueSizes, err := mgr.GetQueueSize(ctx, []*model.DBJobReq{
{PoolName: poolName, Namespace: namespace, Queue: "q1", ReadyTime: time.Now().Unix() - 10, Count: jobCnt},
{PoolName: poolName, Namespace: namespace, Queue: "q2", ReadyTime: time.Now().Unix() - 10, Count: jobCnt},
})
require.NoError(t, err)
assert.EqualValues(t, jobCnt/2, queueSizes[fmt.Sprintf("%s/%s", namespace, "q1")])
assert.EqualValues(t, jobCnt/2, queueSizes[fmt.Sprintf("%s/%s", namespace, "q2")])
})

func TestSpanner_BatchGetJobs(t *testing.T) {
mgr, err := NewSpanner(config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("Failed to create spanner client with error: %s", err))
}
jobs := createTestJobsData()
mgr.BatchAddJobs(ctx, poolName, jobs)
req := createTestReqData()
jobs, err = mgr.BatchGetJobs(ctx, req)
if err != nil {
panic(fmt.Sprintf("BatchGetJobs failed with error: %s", err))
}
assert.EqualValues(t, 3, len(jobs))
mgr.DelJobs(ctx, jobIDs)
t.Run("Del Jobs", func(t *testing.T) {
count, err := mgr.DelJobs(context.Background(), jobIDs)
require.NoError(t, err)
require.EqualValues(t, jobCnt, count)
})
}

func TestSpanner_GetQueueSize(t *testing.T) {
func TestSpanner_NoExpiredJob(t *testing.T) {
ctx := context.Background()
mgr, err := NewSpanner(config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("Failed to create spanner client with error: %s", err))
}
jobs := createTestJobsData()
mgr.BatchAddJobs(ctx, poolName, jobs)
req := createTestReqData()
count, err := mgr.GetQueueSize(ctx, req)
if err != nil || len(count) == 0 {
panic(fmt.Sprintf("BatchGetJobs failed with error: %s", err))
}
key1, key2 := fmt.Sprintf("%s/%s", "n1", "q1"), fmt.Sprintf("%s/%s", "n1", "q2")
assert.EqualValues(t, 2, count[key1])
assert.EqualValues(t, 1, count[key2])
mgr.DelJobs(ctx, jobIDs)
}
require.NoError(t, err)

func TestSpanner_GetReadyJobs(t *testing.T) {
mgr, err := NewSpanner(config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("Failed to create spanner client with error: %s", err))
}
jobs := createTestJobsData()
mgr.BatchAddJobs(ctx, poolName, jobs)
req := createTestReqData2()
jobs, err = mgr.GetReadyJobs(ctx, req)
if err != nil {
panic(fmt.Sprintf("GetReadyJobs failed with error: %s", err))
jobCnt := int64(10)
jobIDs := make([]string, jobCnt)
createJobs := make([]engine.Job, jobCnt)
for i := int64(0); i < jobCnt; i++ {
queue := "q3"
createJobs[i] = engine.NewJob(namespace, queue, []byte("hello"), 0, 4, 3, "")
jobIDs[i] = createJobs[i].ID()
}
assert.EqualValues(t, 2, len(jobs))
mgr.DelJobs(ctx, jobIDs)
}
require.NoError(t, mgr.BatchAddJobs(ctx, poolName, createJobs))

func TestSpanner_BatchGetJobsByID(t *testing.T) {
mgr, err := NewSpanner(config.SpannerEmulator)
if err != nil {
panic(fmt.Sprintf("Failed to create spanner client with error: %s", err))
}
jobs := createTestJobsData()
mgr.BatchAddJobs(ctx, poolName, jobs)
IDs := []string{"1", "2", "3"}
jobs, err = mgr.BatchGetJobsByID(ctx, IDs)
jobs, err := mgr.BatchGetJobsByID(ctx, jobIDs)
assert.Nil(t, err)
assert.EqualValues(t, 3, len(jobs))
mgr.DelJobs(ctx, jobIDs)
assert.EqualValues(t, len(jobIDs), len(jobs))
for _, job := range jobs {
assert.EqualValues(t, job.TTL(), 0)
}
}
Loading

0 comments on commit dccad8c

Please sign in to comment.