Skip to content

Commit

Permalink
feat: optimize preheat log (#1827)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 11, 2022
1 parent 3ea1dea commit d72c255
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 45 deletions.
10 changes: 8 additions & 2 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,15 @@ func WithHostnameAndIP(hostname, ip string) *SugaredLoggerOnWith {
}
}

func WithTaskAndJobID(taskID, jobID string) *SugaredLoggerOnWith {
func WithGroupAndJobID(taskID, jobID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"taskID", taskID, "jobID", jobID},
withArgs: []any{"groupID", taskID, "jobID", jobID},
}
}

func WithGroupAndTaskID(groupID, taskID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"groupID", groupID, "taskID", taskID},
}
}

Expand Down
38 changes: 21 additions & 17 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
machineryv1log "github.com/RichardKnop/machinery/v1/log"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/go-redis/redis/v8"

logger "d7y.io/dragonfly/v2/internal/dflog"
)

type Config struct {
Expand Down Expand Up @@ -112,43 +114,45 @@ type GroupJobState struct {
JobStates []*machineryv1tasks.TaskState
}

func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
jobStates, err := t.Server.GetBackend().GroupTaskStates(groupUUID, 0)
func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
taskStates, err := t.Server.GetBackend().GroupTaskStates(groupID, 0)
if err != nil {
return nil, err
}

if len(jobStates) == 0 {
return nil, errors.New("empty group job")
if len(taskStates) == 0 {
return nil, errors.New("empty group")
}

for _, jobState := range jobStates {
if jobState.IsFailure() {
for _, taskState := range taskStates {
if taskState.IsFailure() {
logger.WithGroupAndTaskID(groupID, taskState.TaskUUID).Errorf("task is failed: %#v", taskState)
return &GroupJobState{
GroupUUID: groupUUID,
GroupUUID: groupID,
State: machineryv1tasks.StateFailure,
CreatedAt: jobState.CreatedAt,
JobStates: jobStates,
CreatedAt: taskState.CreatedAt,
JobStates: taskStates,
}, nil
}
}

for _, jobState := range jobStates {
if !jobState.IsSuccess() {
for _, taskState := range taskStates {
if !taskState.IsSuccess() {
logger.WithGroupAndTaskID(groupID, taskState.TaskUUID).Infof("task is not succeeded: %#v", taskState)
return &GroupJobState{
GroupUUID: groupUUID,
GroupUUID: groupID,
State: machineryv1tasks.StatePending,
CreatedAt: jobState.CreatedAt,
JobStates: jobStates,
CreatedAt: taskState.CreatedAt,
JobStates: taskStates,
}, nil
}
}

return &GroupJobState{
GroupUUID: groupUUID,
GroupUUID: groupID,
State: machineryv1tasks.StateSuccess,
CreatedAt: jobStates[0].CreatedAt,
JobStates: jobStates,
CreatedAt: taskStates[0].CreatedAt,
JobStates: taskStates,
}, nil
}

Expand Down
20 changes: 11 additions & 9 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -126,16 +127,11 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
return nil, errors.New("unknow preheat type")
}

logger.Infof("preheat %s queues: %v, files: %#v", json.URL, queues, files)
return p.createGroupJob(ctx, files, queues)
}

func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
signatures := []*machineryv1tasks.Signature{}
var urls []string
for i := range files {
urls = append(urls, files[i].URL)
}
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
for _, file := range files {
args, err := internaljob.MarshalRequest(file)
Expand All @@ -145,24 +141,30 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea
}

signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.PreheatJob,
RoutingKey: queue.String(),
Args: args,
})
}
}
group, err := machineryv1tasks.NewGroup(signatures...)

group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
return nil, err
}

var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
}

logger.Infof("create preheat group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := p.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Error("create preheat group job failed", err)
logger.Errorf("create preheat group %s failed", group.GroupUUID, err)
return nil, err
}

logger.Infof("create preheat group job successfully, group uuid: %s, urls: %s", group.GroupUUID, urls)
return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
Expand Down
29 changes: 12 additions & 17 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,51 +98,46 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
return &job, nil
}

func (s *service) pollingJob(ctx context.Context, id uint, taskID string) {
func (s *service) pollingJob(ctx context.Context, id uint, groupID string) {
var (
job model.Job
log = logger.WithTaskAndJobID(taskID, fmt.Sprint(id))
log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id))
)
if _, _, err := retry.Run(ctx, 5, 10, 120, func() (any, bool, error) {
groupJob, err := s.job.GetGroupJobState(taskID)
if _, _, err := retry.Run(ctx, 5, 10, 480, func() (any, bool, error) {
groupJob, err := s.job.GetGroupJobState(groupID)
if err != nil {
log.Errorf("polling job failed: %s", err.Error())
log.Errorf("polling group failed: %s", err.Error())
return nil, false, err
}

result, err := structure.StructToMap(groupJob)
if err != nil {
log.Errorf("polling job failed: %s", err.Error())
log.Errorf("polling group failed: %s", err.Error())
return nil, false, err
}

if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
State: groupJob.State,
Result: result,
}).Error; err != nil {
log.Errorf("polling job failed: %s", err.Error())
log.Errorf("polling group failed: %s", err.Error())
return nil, true, err
}

switch job.State {
case machineryv1tasks.StateSuccess:
log.Info("polling job success")
log.Info("polling group succeeded")
return nil, true, nil
case machineryv1tasks.StateFailure:
var jobStates []machineryv1tasks.TaskState
for _, jobState := range groupJob.JobStates {
jobStates = append(jobStates, *jobState)
}

log.Errorf("polling job failed: %#v", jobStates)
log.Error("polling group failed")
return nil, true, nil
default:
msg := fmt.Sprintf("polling job state is %s", job.State)
log.Info(msg)
return nil, false, errors.New(msg)
}
}); err != nil {
log.Errorf("polling job failed: %s", err.Error())
log.Errorf("polling group failed: %s", err.Error())
}

// Polling timeout and failed.
Expand All @@ -151,9 +146,9 @@ func (s *service) pollingJob(ctx context.Context, id uint, taskID string) {
if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
State: machineryv1tasks.StateFailure,
}).Error; err != nil {
log.Errorf("polling job failed: %s", err.Error())
log.Errorf("polling group failed: %s", err.Error())
}
log.Error("polling job timeout")
log.Error("polling group timeout")
}
}

Expand Down

0 comments on commit d72c255

Please sign in to comment.