Skip to content

Commit

Permalink
Block-builder-scheduler: RPC service and client module (#10089)
Browse files Browse the repository at this point in the history
- Add proto stubs for the block-builder-scheduler gRPC service, and registers block-builder-scheduler as a server of this service at startup. The two RPC methods (AssignJob and UpdateJob) call into methods already implemented in previous PRs.
- Make the job queue generic so that the spec payload can be the type defined in the gRPC stubs.
- Add a client module (blockbuilder/schedulerpb/client.go) that encapsulates the communication protocol expected by block-builder-scheduler (where workers are expected to constantly send and re-send job updates). This type exposes a GetJob / CompleteJob interface to block-builder.
- Improvements to logging in block-builder-scheduler after testing the Assign/Update flow under docker-compose.
dev env changes:
- Adds mimir-block-builder and mimir-block-builder-scheduler containers to the mimir-ingest-storage docker-compose dev environment.
  • Loading branch information
seizethedave authored Dec 19, 2024
1 parent aa375f8 commit dc1410c
Show file tree
Hide file tree
Showing 12 changed files with 2,858 additions and 127 deletions.
20 changes: 18 additions & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ std.manifestYamlDoc({
self.kafka_2 +
self.kafka_3 +
self.jaeger +
self.blockbuilder +
self.blockbuilderscheduler +
{},

write:: {
Expand Down Expand Up @@ -88,6 +90,22 @@ std.manifestYamlDoc({
}),
},

blockbuilder:: {
'mimir-block-builder-1': mimirService({
name: 'mimir-block-builder-1',
target: 'block-builder',
publishedHttpPort: 8008,
}),
},

blockbuilderscheduler:: {
'mimir-block-builder-scheduler-1': mimirService({
name: 'mimir-block-builder-scheduler-1',
target: 'block-builder-scheduler',
publishedHttpPort: 8019,
}),
},

nginx:: {
nginx: {
hostname: 'nginx',
Expand Down Expand Up @@ -156,8 +174,6 @@ std.manifestYamlDoc({
},
},
},


kafka_2:: {
kafka_2: {
image: 'confluentinc/cp-kafka:latest',
Expand Down
60 changes: 60 additions & 0 deletions development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,66 @@
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"mimir-block-builder-1":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=block-builder -activity-tracker.filepath=/activity/mimir-block-builder-1"
"depends_on":
"kafka_1":
"condition": "service_healthy"
"kafka_2":
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=block-builder"
"hostname": "mimir-block-builder-1"
"image": "mimir"
"ports":
- "8008:8080"
- "11008:11008"
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"mimir-block-builder-scheduler-1":
"build":
"context": "."
"dockerfile": "dev.dockerfile"
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=block-builder-scheduler -activity-tracker.filepath=/activity/mimir-block-builder-scheduler-1"
"depends_on":
"kafka_1":
"condition": "service_healthy"
"kafka_2":
"condition": "service_healthy"
"minio":
"condition": "service_started"
"environment":
- "JAEGER_AGENT_HOST=jaeger"
- "JAEGER_AGENT_PORT=6831"
- "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000"
- "JAEGER_SAMPLER_PARAM=1"
- "JAEGER_SAMPLER_TYPE=const"
- "JAEGER_TAGS=app=block-builder-scheduler"
"hostname": "mimir-block-builder-scheduler-1"
"image": "mimir"
"ports":
- "8019:8080"
- "11019:11019"
"volumes":
- "./config:/mimir/config"
- "./activity:/activity"
"mimir-read-1":
"build":
"context": "."
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/grafana/mimir/pkg/alertmanager"
"github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb"
bbschedulerpb "github.com/grafana/mimir/pkg/blockbuilder/schedulerpb"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/distributor/distributorpb"
Expand Down Expand Up @@ -499,3 +500,7 @@ func (a *API) RegisterMemberlistKV(pathPrefix string, kvs *memberlist.KVInitServ
})
a.RegisterRoute("/memberlist", memberlistStatusHandler(pathPrefix, kvs), false, true, "GET")
}

func (a *API) RegisterBlockBuilderScheduler(s bbschedulerpb.BlockBuilderSchedulerServer) {
bbschedulerpb.RegisterBlockBuilderSchedulerServer(a.server.GRPC, s)
}
98 changes: 53 additions & 45 deletions pkg/blockbuilder/scheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

var (
Expand All @@ -18,39 +19,48 @@ var (
errBadEpoch = errors.New("bad epoch")
)

type jobQueue struct {
type jobQueue[T any] struct {
leaseExpiry time.Duration
logger log.Logger

mu sync.Mutex
epoch int64
jobs map[string]*job
unassigned jobHeap
jobs map[string]*job[T]
unassigned jobHeap[*job[T]]
}

func newJobQueue(leaseExpiry time.Duration, logger log.Logger) *jobQueue {
return &jobQueue{
func newJobQueue[T any](leaseExpiry time.Duration, logger log.Logger, lessFunc func(T, T) bool) *jobQueue[T] {
return &jobQueue[T]{
leaseExpiry: leaseExpiry,
logger: logger,

jobs: make(map[string]*job),
jobs: make(map[string]*job[T]),

unassigned: jobHeap[*job[T]]{
h: make([]*job[T], 0),
less: func(a, b *job[T]) bool {
// Call into the provided lessFunc to compare the job specs.
return lessFunc(a.spec, b.spec)
},
},
}
}

// assign assigns the highest-priority unassigned job to the given worker.
func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) {
func (s *jobQueue[T]) assign(workerID string) (jobKey, T, error) {
var empty T
if workerID == "" {
return jobKey{}, jobSpec{}, errors.New("workerID cannot be empty")
return jobKey{}, empty, errors.New("workerID cannot be empty")
}

s.mu.Lock()
defer s.mu.Unlock()

if s.unassigned.Len() == 0 {
return jobKey{}, jobSpec{}, errNoJobAvailable
return jobKey{}, empty, errNoJobAvailable
}

j := heap.Pop(&s.unassigned).(*job)
j := heap.Pop(&s.unassigned).(*job[T])
j.key.epoch = s.epoch
s.epoch++
j.assignee = workerID
Expand All @@ -61,7 +71,7 @@ func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) {
// importJob imports a job with the given ID and spec into the jobQueue. This is
// meant to be used during recovery, when we're reconstructing the jobQueue from
// worker updates.
func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error {
func (s *jobQueue[T]) importJob(key jobKey, workerID string, spec T) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
Expand Down Expand Up @@ -91,7 +101,7 @@ func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error {
j.spec = spec
}
} else {
s.jobs[key.id] = &job{
s.jobs[key.id] = &job[T]{
key: key,
assignee: workerID,
leaseExpiry: time.Now().Add(s.leaseExpiry),
Expand All @@ -103,20 +113,21 @@ func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error {
}

// addOrUpdate adds a new job or updates an existing job with the given spec.
func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {
func (s *jobQueue[T]) addOrUpdate(id string, spec T) {
s.mu.Lock()
defer s.mu.Unlock()

if j, ok := s.jobs[id]; ok {
// We can only update an unassigned job.
if j.assignee == "" {
level.Info(s.logger).Log("msg", "updated job", "job_id", id)
j.spec = spec
}
return
}

// Otherwise, add a new job.
j := &job{
j := &job[T]{
key: jobKey{
id: id,
epoch: 0,
Expand All @@ -128,11 +139,13 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {
}
s.jobs[id] = j
heap.Push(&s.unassigned, j)

level.Info(s.logger).Log("msg", "created job", "job_id", id)
}

// renewLease renews the lease of the job with the given ID for the given
// worker.
func (s *jobQueue) renewLease(key jobKey, workerID string) error {
func (s *jobQueue[T]) renewLease(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
Expand All @@ -155,12 +168,15 @@ func (s *jobQueue) renewLease(key jobKey, workerID string) error {
}

j.leaseExpiry = time.Now().Add(s.leaseExpiry)

level.Info(s.logger).Log("msg", "renewed lease", "job_id", key.id, "epoch", key.epoch, "worker_id", workerID)

return nil
}

// completeJob completes the job with the given ID for the given worker,
// removing it from the jobQueue.
func (s *jobQueue) completeJob(key jobKey, workerID string) error {
func (s *jobQueue[T]) completeJob(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
Expand All @@ -183,12 +199,14 @@ func (s *jobQueue) completeJob(key jobKey, workerID string) error {
}

delete(s.jobs, key.id)

level.Info(s.logger).Log("msg", "removed completed job from queue", "job_id", key.id, "epoch", key.epoch, "worker_id", workerID)
return nil
}

// clearExpiredLeases unassigns jobs whose leases have expired, making them
// eligible for reassignment.
func (s *jobQueue) clearExpiredLeases() {
func (s *jobQueue[T]) clearExpiredLeases() {
now := time.Now()

s.mu.Lock()
Expand All @@ -199,19 +217,21 @@ func (s *jobQueue) clearExpiredLeases() {
j.assignee = ""
j.failCount++
heap.Push(&s.unassigned, j)

level.Debug(s.logger).Log("msg", "unassigned expired lease", "job_id", j.key.id, "epoch", j.key.epoch, "assignee", j.assignee)
}
}
}

type job struct {
type job[T any] struct {
key jobKey

assignee string
leaseExpiry time.Time
failCount int

// job payload details. We can make this generic later for reuse.
spec jobSpec
// spec contains the job payload details disseminated to the worker.
spec T
}

type jobKey struct {
Expand All @@ -221,38 +241,26 @@ type jobKey struct {
epoch int64
}

type jobSpec struct {
topic string
partition int32
startOffset int64
endOffset int64
commitRecTs time.Time
lastSeenOffset int64
lastBlockEndTs time.Time
type jobHeap[T any] struct {
h []T
less func(T, T) bool
}

func (a *jobSpec) less(b *jobSpec) bool {
return a.commitRecTs.Before(b.commitRecTs)
}

type jobHeap []*job

// Implement the heap.Interface for jobHeap.
func (h jobHeap) Len() int { return len(h) }
func (h jobHeap) Less(i, j int) bool { return h[i].spec.less(&h[j].spec) }
func (h jobHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Implement the heap.Interface for jobHeap's `h` field.
func (h jobHeap[T]) Len() int { return len(h.h) }
func (h jobHeap[T]) Less(i, j int) bool { return h.less(h.h[i], h.h[j]) }
func (h jobHeap[T]) Swap(i, j int) { h.h[i], h.h[j] = h.h[j], h.h[i] }

func (h *jobHeap) Push(x interface{}) {
*h = append(*h, x.(*job))
func (h *jobHeap[T]) Push(x any) {
h.h = append(h.h, x.(T))
}

func (h *jobHeap) Pop() interface{} {
old := *h
func (h *jobHeap[T]) Pop() any {
old := h.h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
h.h = old[0 : n-1]
return x
}

var _ heap.Interface = (*jobHeap)(nil)
var _ heap.Interface = (*jobHeap[int])(nil)
Loading

0 comments on commit dc1410c

Please sign in to comment.