Skip to content

Commit

Permalink
Merge pull request #192 from Desiki-high/feat/task-metric
Browse files Browse the repository at this point in the history
feat: record metrics of conversion in task
  • Loading branch information
imeoer authored Sep 18, 2023
2 parents 3cffdb2 + fe881b6 commit eeaf124
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ GET /api/v1/conversions
"created": "2022-04-06T06:45:11.83226503Z",
"finished": "2022-04-06T06:45:11.948393604Z",
"source": "192.168.1.1/library/nginx:latest",
"source_size": "70254592",
"target_size": "72351744",
"status": "$status",
"reason": "$reason"
}
]
```
`source_size`: uint, total size of the source image with specified platforms in bytes.

`target_size`: uint, total size of the target image with specified platforms in bytes.

`$status`: string, possible values is `PROCESSING`, `COMPLETED`, `FAILED`.

Expand Down
38 changes: 21 additions & 17 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,40 @@ func startScheduledGC(content *content.Content) {
}
}

func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
func (adp *LocalAdapter) Convert(ctx context.Context, source string) (*converter.Metric, error) {
target, err := adp.rule.Map(source, TagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyConverted) {
logrus.Infof("image has been converted: %s", source)
return nil
return nil, nil
}
return errors.Wrap(err, "create target reference by rule")
return nil, errors.Wrap(err, "create target reference by rule")
}
cacheRef, err := adp.rule.Map(source, CacheTag)
if err != nil {
if errors.Is(err, errdefs.ErrSameTag) {
logrus.Infof("image was remote cache: %s", source)
return nil
return nil, nil
}
}
if err = adp.content.NewRemoteCache(cacheRef); err != nil {
return err
return nil, err
}
adp.content.GcMutex.RLock()
defer adp.content.GcMutex.RUnlock()
if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil {
metric, err := adp.cvt.Convert(ctx, source, target, cacheRef)
if err != nil {
if errdefs.NeedsRetryWithoutCache(err) && cacheRef != "" {
logrus.Infof("inconsistent layer format with the cache, retry conversion without cache: %s", cacheRef)
if _, err := adp.cvt.Convert(ctx, source, target, ""); err != nil {
return err
return nil, err
}
}
adp.content.GcMutex.RUnlock()
return err
return nil, err
}
go adp.content.GC(ctx, adp.content.Threshold)
return nil
return metric, nil
}

func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error {
Expand All @@ -145,21 +146,24 @@ func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) er
if sync {
// FIXME: The synchronous conversion task should also be
// executed in a limited worker queue.
return metrics.Conversion.OpWrap(func() error {
err := adp.Convert(namespaces.WithNamespace(ctx, "acceleration-service"), ref)
task.Manager.Finish(taskID, err)
return err
_, err := metrics.Conversion.OpWrap(func() (*converter.Metric, error) {
metric, err := adp.Convert(namespaces.WithNamespace(ctx, "acceleration-service"), ref)
task.Manager.Finish(taskID, metric, err)
return nil, err
}, "convert")
return err
}

adp.worker.Dispatch(func() error {
// If the ref is same, we only convert once in the same time.
_, err, _ := dispatchSingleflight.Do(ref, func() (interface{}, error) {
return nil, metrics.Conversion.OpWrap(func() error {
return adp.Convert(namespaces.WithNamespace(context.Background(), "acceleration-service"), ref)
metric, err, _ := dispatchSingleflight.Do(ref, func() (interface{}, error) {
metric, err := metrics.Conversion.OpWrap(func() (*converter.Metric, error) {
metric, err := adp.Convert(namespaces.WithNamespace(context.Background(), "acceleration-service"), ref)
return metric, err
}, "convert")
return metric, err
})
task.Manager.Finish(taskID, err)
task.Manager.Finish(taskID, metric.(*converter.Metric), err)
return err
})

Expand Down
7 changes: 4 additions & 3 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -75,14 +76,14 @@ func NewOpWrapper(scope string, labelNames []string) *OpWrapper {
}
}

func (metrics *OpWrapper) OpWrap(op func() error, lvs ...string) error {
func (metrics *OpWrapper) OpWrap(op func() (*converter.Metric, error), lvs ...string) (*converter.Metric, error) {
start := time.Now()

err := op()
metric, err := op()
Duration(err, metrics.OpDuration, start, lvs...)
CountInc(err, metrics.OpTotal, metrics.OpErrorTotal, lvs...)

return err
return metric, err
}

func Duration(err error, metric *prometheus.HistogramVec, start time.Time, lvs ...string) {
Expand Down
33 changes: 20 additions & 13 deletions pkg/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/goharbor/acceleration-service/pkg/converter"
"github.com/google/uuid"
)

Expand All @@ -29,12 +30,14 @@ const StatusCompleted = "COMPLETED"
const StatusFailed = "FAILED"

type Task struct {
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
Status string `json:"status"`
Reason string `json:"reason"`
ID string `json:"id"`
Created time.Time `json:"created"`
Finished *time.Time `json:"finished"`
Source string `json:"source"`
SourceSize uint `json:"source_size"`
TargetSize uint `json:"target_size"`
Status string `json:"status"`
Reason string `json:"reason"`
}

type manager struct {
Expand Down Expand Up @@ -66,18 +69,20 @@ func (m *manager) Create(source string) string {
id := uuid.NewString()

m.tasks[id] = &Task{
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
Status: StatusProcessing,
Reason: "",
ID: id,
Created: time.Now(),
Finished: nil,
Source: source,
SourceSize: 0,
TargetSize: 0,
Status: StatusProcessing,
Reason: "",
}

return id
}

func (m *manager) Finish(id string, err error) {
func (m *manager) Finish(id string, metric *converter.Metric, err error) {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -88,6 +93,8 @@ func (m *manager) Finish(id string, err error) {
task.Status = StatusFailed
task.Reason = err.Error()
} else {
task.SourceSize = uint(metric.SourceImageSize)
task.TargetSize = uint(metric.TargetImageSize)
task.Status = StatusCompleted
}
now := time.Now()
Expand Down

0 comments on commit eeaf124

Please sign in to comment.