From 197df14a3231f0270edf266e785c5a336df5787b Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 6 Sep 2024 14:00:44 +0900 Subject: [PATCH] query: change queued jobs to a map Changing queued jobs to a map allows for multiple job queuing. However the queued jobs is still limited to a single job for now. --- query/workmanager.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/query/workmanager.go b/query/workmanager.go index 88e91e02..9d6317fb 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -77,7 +77,7 @@ type PeerRanking interface { // TODO(halseth): support more than one active job at a time. type activeWorker struct { w Worker - activeJob *queryJob + activeJobs map[uint64]*queryJob onExit chan struct{} } @@ -220,7 +220,7 @@ Loop: for p, r := range workers { // Only one active job at a time is currently // supported. - if r.activeJob != nil { + if len(r.activeJobs) >= 1 { continue } @@ -242,7 +242,7 @@ Loop: log.Tracef("Sent job %v to worker %v", next.Index(), p) heap.Pop(work) - r.activeJob = next + r.activeJobs[next.Index()] = next // Go back to start of loop, to check // if there are more jobs to @@ -278,9 +278,9 @@ Loop: // remove it from our set of active workers. onExit := make(chan struct{}) workers[peer.Addr()] = &activeWorker{ - w: r, - activeJob: nil, - onExit: onExit, + w: r, + activeJobs: make(map[uint64]*queryJob), + onExit: onExit, } w.cfg.Ranking.AddPeer(peer.Addr()) @@ -302,7 +302,7 @@ Loop: // Delete the job from the worker's active job, such // that the slot gets opened for more work. r := workers[result.peer.Addr()] - r.activeJob = nil + delete(r.activeJobs, result.job.Index()) // Get the index of this query's batch, and delete it // from the map of current queries, since we don't have