From f0ea41d200b3c88db04497573749570260174b24 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Mon, 25 Sep 2023 17:08:16 +0800 Subject: [PATCH] feat: persistent task worker queues We create a new boltdb for manager to persistent task worker queues in boltdb. Manager will load the task queue from the database and sync new tasks with database. Signed-off-by: Yadong Ding --- pkg/adapter/adapter.go | 10 +++- pkg/task/manager.go | 114 +++++++++++++++++++++++++++++++++++------ 2 files changed, 106 insertions(+), 18 deletions(-) diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 7dd02268..596f362a 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -77,6 +77,10 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { return nil, err } + if err := task.Manager.Init(cfg.Provider.WorkDir); err != nil { + return nil, errors.Wrap(err, "task manager init") + } + worker, err := NewWorker(cfg.Converter.Worker) if err != nil { return nil, errors.Wrap(err, "create worker") @@ -140,8 +144,10 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) (*converter } func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error { - taskID := task.Manager.Create(ref) - + taskID, err := task.Manager.Create(ref) + if err != nil { + return err + } if sync { // FIXME: The synchronous conversion task should also be // executed in a limited worker queue. diff --git a/pkg/task/manager.go b/pkg/task/manager.go index b12b26ad..13b93e38 100644 --- a/pkg/task/manager.go +++ b/pkg/task/manager.go @@ -15,14 +15,20 @@ package task import ( + "encoding/json" + "path/filepath" "sort" "sync" "time" "github.com/goharbor/acceleration-service/pkg/converter" "github.com/google/uuid" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) +var bucketObjectTasks = []byte("tasks") + const taskMaximumKeepPeriod = time.Hour * 24 const StatusProcessing = "PROCESSING" @@ -30,18 +36,19 @@ const StatusCompleted = "COMPLETED" const StatusFailed = "FAILED" type Task struct { - ID string `json:"id"` - Created time.Time `json:"created"` - Finished *time.Time `json:"finished"` - Source string `json:"source"` - SourceSize uint `json:"source_size"` - TargetSize uint `json:"target_size"` - Status string `json:"status"` - Reason string `json:"reason"` + ID string `json:"id"` + Created time.Time `json:"created"` + Finished time.Time `json:"finished"` + Source string `json:"source"` + SourceSize uint `json:"source_size"` + TargetSize uint `json:"target_size"` + Status string `json:"status"` + Reason string `json:"reason"` } type manager struct { mutex sync.Mutex + db *bolt.DB tasks map[string]*Task } @@ -62,27 +69,96 @@ func (t *Task) IsExpired() bool { return false } -func (m *manager) Create(source string) string { +// Init manager supported by boltdb. +func (m *manager) Init(workDir string) error { + bdb, err := bolt.Open(filepath.Join(workDir, "task.db"), 0655, nil) + if err != nil { + return errors.Wrap(err, "create task database") + } + m.db = bdb + return m.initDatabase() +} + +// initDatabase loads tasks from the database into memory. +func (m *manager) initDatabase() error { + return m.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("tasks")) + if bucket == nil { + return nil + } + + return bucket.ForEach(func(k, v []byte) error { + var task Task + if err := json.Unmarshal(v, &task); err != nil { + return err + } + if task.Status == StatusProcessing { + return bucket.Delete([]byte(task.ID)) + } + m.tasks[task.ID] = &task + return nil + }) + }) +} + +// updateBucket updates task in bucket and creates a new bucket if it doesn't already exist. +func (m *manager) updateBucket(task *Task) error { + return m.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(bucketObjectTasks) + if err != nil { + return err + } + + taskJSON, err := json.Marshal(task) + if err != nil { + return err + } + + if err := bucket.Put([]byte(task.ID), taskJSON); err != nil { + return err + } + + return nil + }) +} + +// deleteBucket deletes a task in bucket +func (m *manager) deleteBucket(taskID string) error { + return m.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketObjectTasks) + if bucket == nil { + return nil + } + return bucket.Delete([]byte(taskID)) + }) +} + +// Create new task +func (m *manager) Create(source string) (string, error) { m.mutex.Lock() defer m.mutex.Unlock() id := uuid.NewString() - m.tasks[id] = &Task{ + task := &Task{ ID: id, Created: time.Now(), - Finished: nil, Source: source, SourceSize: 0, TargetSize: 0, Status: StatusProcessing, Reason: "", } - - return id + m.tasks[id] = task + if err := m.updateBucket(task); err != nil { + return "", err + } + m.tasks[id] = task + return id, nil } -func (m *manager) Finish(id string, metric *converter.Metric, err error) { +// Finish a task +func (m *manager) Finish(id string, metric *converter.Metric, err error) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -99,16 +175,22 @@ func (m *manager) Finish(id string, metric *converter.Metric, err error) { task.SourceSize = uint(metric.SourceImageSize) task.TargetSize = uint(metric.TargetImageSize) } - now := time.Now() - task.Finished = &now + task.Finished = time.Now() + } + if err := m.updateBucket(task); err != nil { + return err } // Evict expired tasks. for id, task := range m.tasks { if task.IsExpired() { delete(m.tasks, id) + if err := m.deleteBucket(id); err != nil { + return err + } } } + return nil } func (m *manager) List() []*Task {