Skip to content

Commit

Permalink
query: change queued jobs to a map
Browse files Browse the repository at this point in the history
Changing queued jobs to a map allows for multiple job queuing. However
the queued jobs is still limited to a single job for now.
  • Loading branch information
kcalvinalvin committed Sep 6, 2024
1 parent 81d6cd2 commit 197df14
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type PeerRanking interface {
// TODO(halseth): support more than one active job at a time.
type activeWorker struct {
w Worker
activeJob *queryJob
activeJobs map[uint64]*queryJob
onExit chan struct{}
}

Expand Down Expand Up @@ -220,7 +220,7 @@ Loop:
for p, r := range workers {
// Only one active job at a time is currently
// supported.
if r.activeJob != nil {
if len(r.activeJobs) >= 1 {
continue
}

Expand All @@ -242,7 +242,7 @@ Loop:
log.Tracef("Sent job %v to worker %v",
next.Index(), p)
heap.Pop(work)
r.activeJob = next
r.activeJobs[next.Index()] = next

// Go back to start of loop, to check
// if there are more jobs to
Expand Down Expand Up @@ -278,9 +278,9 @@ Loop:
// remove it from our set of active workers.
onExit := make(chan struct{})
workers[peer.Addr()] = &activeWorker{
w: r,
activeJob: nil,
onExit: onExit,
w: r,
activeJobs: make(map[uint64]*queryJob),
onExit: onExit,
}

w.cfg.Ranking.AddPeer(peer.Addr())
Expand All @@ -302,7 +302,7 @@ Loop:
// Delete the job from the worker's active job, such
// that the slot gets opened for more work.
r := workers[result.peer.Addr()]
r.activeJob = nil
delete(r.activeJobs, result.job.Index())

// Get the index of this query's batch, and delete it
// from the map of current queries, since we don't have
Expand Down

0 comments on commit 197df14

Please sign in to comment.