Skip to content

Commit

Permalink
feat: record metrics of conversion in task
Browse files Browse the repository at this point in the history
We have collected the metrics of conversion progress. Let's show them in task.
Add the `SourceSize` and `TargetSize` in task and record them when
task finished.

Signed-off-by: Yadong Ding <[email protected]>
  • Loading branch information
Desiki-high committed Sep 18, 2023
1 parent d3616d1 commit fe881b6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 33 deletions.
38 changes: 21 additions & 17 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,40 @@ func startScheduledGC(content *content.Content) {
}
}

func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
func (adp *LocalAdapter) Convert(ctx context.Context, source string) (*converter.Metric, error) {
target, err := adp.rule.Map(source, TagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyConverted) {
logrus.Infof("image has been converted: %s", source)
return nil
return nil, nil
}
return errors.Wrap(err, "create target reference by rule")
return nil, errors.Wrap(err, "create target reference by rule")
}
cacheRef, err := adp.rule.Map(source, CacheTag)
if err != nil {
if errors.Is(err, errdefs.ErrSameTag) {
logrus.Infof("image was remote cache: %s", source)
return nil
return nil, nil
}
}
if err = adp.content.NewRemoteCache(cacheRef); err != nil {
return err
return nil, err
}
adp.content.GcMutex.RLock()
defer adp.content.GcMutex.RUnlock()
if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil {
metric, err := adp.cvt.Convert(ctx, source, target, cacheRef)
if err != nil {
if errdefs.NeedsRetryWithoutCache(err) && cacheRef != "" {
logrus.Infof("inconsistent layer format with the cache, retry conversion without cache: %s", cacheRef)
if _, err := adp.cvt.Convert(ctx, source, target, ""); err != nil {
return err
return nil, err
}
}
adp.content.GcMutex.RUnlock()
return err
return nil, err
}
go adp.content.GC(ctx, adp.content.Threshold)
return nil
return metric, nil
}

func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error {
Expand All @@ -145,21 +146,24 @@ func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) er
if sync {
// FIXME: The synchronous conversion task should also be
// executed in a limited worker queue.
return metrics.Conversion.OpWrap(func() error {
err := adp.Convert(namespaces.WithNamespace(ctx, "acceleration-service"), ref)
task.Manager.Finish(taskID, err)
return err
_, err := metrics.Conversion.OpWrap(func() (*converter.Metric, error) {
metric, err := adp.Convert(namespaces.WithNamespace(ctx, "acceleration-service"), ref)
task.Manager.Finish(taskID, metric, err)
return nil, err
}, "convert")
return err
}

adp.worker.Dispatch(func() error {
// If the ref is same, we only convert once in the same time.
_, err, _ := dispatchSingleflight.Do(ref, func() (interface{}, error) {
return nil, metrics.Conversion.OpWrap(func() error {
return adp.Convert(namespaces.WithNamespace(context.Background(), "acceleration-service"), ref)
metric, err, _ := dispatchSingleflight.Do(ref, func() (interface{}, error) {
metric, err := metrics.Conversion.OpWrap(func() (*converter.Metric, error) {
metric, err := adp.Convert(namespaces.WithNamespace(context.Background(), "acceleration-service"), ref)
return metric, err
}, "convert")
return metric, err
})
task.Manager.Finish(taskID, err)
task.Manager.Finish(taskID, metric.(*converter.Metric), err)
return err
})

Expand Down
7 changes: 4 additions & 3 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -75,14 +76,14 @@ func NewOpWrapper(scope string, labelNames []string) *OpWrapper {
}
}

func (metrics *OpWrapper) OpWrap(op func() error, lvs ...string) error {
func (metrics *OpWrapper) OpWrap(op func() (*converter.Metric, error), lvs ...string) (*converter.Metric, error) {
start := time.Now()

err := op()
metric, err := op()
Duration(err, metrics.OpDuration, start, lvs...)
CountInc(err, metrics.OpTotal, metrics.OpErrorTotal, lvs...)

return err
return metric, err
}

func Duration(err error, metric *prometheus.HistogramVec, start time.Time, lvs ...string) {
Expand Down
33 changes: 20 additions & 13 deletions pkg/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/google/uuid"
)

Expand All @@ -29,12 +30,14 @@ const StatusCompleted = "COMPLETED"
const StatusFailed = "FAILED"

type Task struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
Status string `json:"status"`
Reason string `json:"reason"`
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
}

type manager struct {
Expand Down Expand Up @@ -66,18 +69,20 @@ func (m *manager) Create(source string) string {
id := uuid.NewString()

m.tasks[id] = &Task{
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
Status: StatusProcessing,
Reason: "",
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
SourceSize: 0,
TargetSize: 0,
Status: StatusProcessing,
Reason: "",
}

return id
}

func (m *manager) Finish(id string, err error) {
func (m *manager) Finish(id string, metric *converter.Metric, err error) {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -88,6 +93,8 @@ func (m *manager) Finish(id string, err error) {
task.Status = StatusFailed
task.Reason = err.Error()
} else {
task.SourceSize = uint(metric.SourceImageSize)
task.TargetSize = uint(metric.TargetImageSize)
task.Status = StatusCompleted
}
now := time.Now()
Expand Down

0 comments on commit fe881b6

Please sign in to comment.