Skip to content

Commit

Permalink
feat: persistent task worker queues
Browse files Browse the repository at this point in the history
We create a new boltdb for manager to persistent task worker queues in boltdb.
Manager will load the task queue from the database and sync new tasks with database.

Signed-off-by: Yadong Ding <[email protected]>
  • Loading branch information
Desiki-high committed Sep 18, 2023
1 parent eeaf124 commit f46d93e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 18 deletions.
10 changes: 8 additions & 2 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
return nil, err
}

if err := task.Manager.Init(cfg.Provider.WorkDir); err != nil {
return nil, errors.Wrap(err, "task manager init")
}

worker, err := NewWorker(cfg.Converter.Worker)
if err != nil {
return nil, errors.Wrap(err, "create worker")
Expand Down Expand Up @@ -141,8 +145,10 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) (*converter
}

func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error {
taskID := task.Manager.Create(ref)

taskID, err := task.Manager.Create(ref)
if err != nil {
return err
}
if sync {
// FIXME: The synchronous conversion task should also be
// executed in a limited worker queue.
Expand Down
114 changes: 98 additions & 16 deletions pkg/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,40 @@
package task

import (
"encoding/json"
"path/filepath"
"sort"
"sync"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/google/uuid"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)

var bucketObjectTasks = []byte("tasks")

const taskMaximumKeepPeriod = time.Hour * 24

const StatusProcessing = "PROCESSING"
const StatusCompleted = "COMPLETED"
const StatusFailed = "FAILED"

type Task struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
ID string `json:"id"`
Created time.Time `json:"created"`
Finished time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
}

type manager struct {
mutex sync.Mutex
db *bolt.DB
tasks map[string]*Task
}

Expand All @@ -62,27 +69,96 @@ func (t *Task) IsExpired() bool {
return false
}

func (m *manager) Create(source string) string {
// Init manager supported by boltdb.
func (m *manager) Init(workDir string) error {
bdb, err := bolt.Open(filepath.Join(workDir, "task.db"), 0655, nil)
if err != nil {
return errors.Wrap(err, "create task database")
}
m.db = bdb
return m.initDatabase()
}

// initDatabase loads tasks from the database into memory.
func (m *manager) initDatabase() error {
return m.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("tasks"))
if bucket == nil {
return nil
}

return bucket.ForEach(func(k, v []byte) error {
var task Task
if err := json.Unmarshal(v, &task); err != nil {
return err
}
if task.Status == StatusProcessing {
return m.deleteBucket(task.ID)
}
m.tasks[task.ID] = &task
return nil
})
})
}

// updateBucket updates task in bucket and creates a new bucket if it doesn't already exist.
func (m *manager) updateBucket(task *Task) error {
return m.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketObjectTasks)
if err != nil {
return err
}

taskJSON, err := json.Marshal(task)
if err != nil {
return err
}

if err := bucket.Put([]byte(task.ID), taskJSON); err != nil {
return err
}

return nil
})
}

// deleteBucket deletes a task in bucket
func (m *manager) deleteBucket(taskID string) error {
return m.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketObjectTasks)
if bucket == nil {
return nil
}
return bucket.Delete([]byte(taskID))
})
}

// Create new task
func (m *manager) Create(source string) (string, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

id := uuid.NewString()

m.tasks[id] = &Task{
task := &Task{
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
SourceSize: 0,
TargetSize: 0,
Status: StatusProcessing,
Reason: "",
}

return id
m.tasks[id] = task
if err := m.updateBucket(task); err != nil {
return "", err
}
m.tasks[id] = task
return id, nil
}

func (m *manager) Finish(id string, metric *converter.Metric, err error) {
// Finish a task
func (m *manager) Finish(id string, metric *converter.Metric, err error) error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -97,16 +173,22 @@ func (m *manager) Finish(id string, metric *converter.Metric, err error) {
task.TargetSize = uint(metric.TargetImageSize)
task.Status = StatusCompleted
}
now := time.Now()
task.Finished = &now
task.Finished = time.Now()
}
if err := m.updateBucket(task); err != nil {
return err
}

// Evict expired tasks.
for id, task := range m.tasks {
if task.IsExpired() {
delete(m.tasks, id)
if err := m.deleteBucket(id); err != nil {
return err
}
}
}
return nil
}

func (m *manager) List() []*Task {
Expand Down

0 comments on commit f46d93e

Please sign in to comment.