From dc1410c659279f6bce1213794af44128fed311a1 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 19 Dec 2024 09:37:16 -0800 Subject: [PATCH] Block-builder-scheduler: RPC service and client module (#10089) - Add proto stubs for the block-builder-scheduler gRPC service, and registers block-builder-scheduler as a server of this service at startup. The two RPC methods (AssignJob and UpdateJob) call into methods already implemented in previous PRs. - Make the job queue generic so that the spec payload can be the type defined in the gRPC stubs. - Add a client module (blockbuilder/schedulerpb/client.go) that encapsulates the communication protocol expected by block-builder-scheduler (where workers are expected to constantly send and re-send job updates). This type exposes a GetJob / CompleteJob interface to block-builder. - Improvements to logging in block-builder-scheduler after testing the Assign/Update flow under docker-compose. dev env changes: - Adds mimir-block-builder and mimir-block-builder-scheduler containers to the mimir-ingest-storage docker-compose dev environment. --- .../docker-compose.jsonnet | 20 +- .../mimir-ingest-storage/docker-compose.yml | 60 + pkg/api/api.go | 5 + pkg/blockbuilder/scheduler/jobs.go | 98 +- pkg/blockbuilder/scheduler/jobs_test.go | 47 +- pkg/blockbuilder/scheduler/scheduler.go | 108 +- pkg/blockbuilder/scheduler/scheduler_test.go | 72 +- pkg/blockbuilder/schedulerpb/client.go | 202 ++ pkg/blockbuilder/schedulerpb/client_test.go | 172 ++ pkg/blockbuilder/schedulerpb/scheduler.pb.go | 2148 +++++++++++++++++ pkg/blockbuilder/schedulerpb/scheduler.proto | 52 + pkg/mimir/modules.go | 1 + 12 files changed, 2858 insertions(+), 127 deletions(-) create mode 100644 pkg/blockbuilder/schedulerpb/client.go create mode 100644 pkg/blockbuilder/schedulerpb/client_test.go create mode 100644 pkg/blockbuilder/schedulerpb/scheduler.pb.go create mode 100644 pkg/blockbuilder/schedulerpb/scheduler.proto diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index 544dbdc729e..59b5336916a 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -13,6 +13,8 @@ std.manifestYamlDoc({ self.kafka_2 + self.kafka_3 + self.jaeger + + self.blockbuilder + + self.blockbuilderscheduler + {}, write:: { @@ -88,6 +90,22 @@ std.manifestYamlDoc({ }), }, + blockbuilder:: { + 'mimir-block-builder-1': mimirService({ + name: 'mimir-block-builder-1', + target: 'block-builder', + publishedHttpPort: 8008, + }), + }, + + blockbuilderscheduler:: { + 'mimir-block-builder-scheduler-1': mimirService({ + name: 'mimir-block-builder-scheduler-1', + target: 'block-builder-scheduler', + publishedHttpPort: 8019, + }), + }, + nginx:: { nginx: { hostname: 'nginx', @@ -156,8 +174,6 @@ std.manifestYamlDoc({ }, }, }, - - kafka_2:: { kafka_2: { image: 'confluentinc/cp-kafka:latest', diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index 39bf2e18692..ccbf581b115 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -162,6 +162,66 @@ "volumes": - "./config:/mimir/config" - "./activity:/activity" + "mimir-block-builder-1": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "sh" + - "-c" + - "exec ./mimir -config.file=./config/mimir.yaml -target=block-builder -activity-tracker.filepath=/activity/mimir-block-builder-1" + "depends_on": + "kafka_1": + "condition": "service_healthy" + "kafka_2": + "condition": "service_healthy" + "minio": + "condition": "service_started" + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=block-builder" + "hostname": "mimir-block-builder-1" + "image": "mimir" + "ports": + - "8008:8080" + - "11008:11008" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" + "mimir-block-builder-scheduler-1": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "sh" + - "-c" + - "exec ./mimir -config.file=./config/mimir.yaml -target=block-builder-scheduler -activity-tracker.filepath=/activity/mimir-block-builder-scheduler-1" + "depends_on": + "kafka_1": + "condition": "service_healthy" + "kafka_2": + "condition": "service_healthy" + "minio": + "condition": "service_started" + "environment": + - "JAEGER_AGENT_HOST=jaeger" + - "JAEGER_AGENT_PORT=6831" + - "JAEGER_REPORTER_MAX_QUEUE_SIZE=1000" + - "JAEGER_SAMPLER_PARAM=1" + - "JAEGER_SAMPLER_TYPE=const" + - "JAEGER_TAGS=app=block-builder-scheduler" + "hostname": "mimir-block-builder-scheduler-1" + "image": "mimir" + "ports": + - "8019:8080" + - "11019:11019" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" "mimir-read-1": "build": "context": "." diff --git a/pkg/api/api.go b/pkg/api/api.go index e2f6da5735c..fd8cdd261f4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/mimir/pkg/alertmanager" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" + bbschedulerpb "github.com/grafana/mimir/pkg/blockbuilder/schedulerpb" "github.com/grafana/mimir/pkg/compactor" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/distributor/distributorpb" @@ -499,3 +500,7 @@ func (a *API) RegisterMemberlistKV(pathPrefix string, kvs *memberlist.KVInitServ }) a.RegisterRoute("/memberlist", memberlistStatusHandler(pathPrefix, kvs), false, true, "GET") } + +func (a *API) RegisterBlockBuilderScheduler(s bbschedulerpb.BlockBuilderSchedulerServer) { + bbschedulerpb.RegisterBlockBuilderSchedulerServer(a.server.GRPC, s) +} diff --git a/pkg/blockbuilder/scheduler/jobs.go b/pkg/blockbuilder/scheduler/jobs.go index a227f0268ad..506e4e88d0a 100644 --- a/pkg/blockbuilder/scheduler/jobs.go +++ b/pkg/blockbuilder/scheduler/jobs.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" ) var ( @@ -18,39 +19,48 @@ var ( errBadEpoch = errors.New("bad epoch") ) -type jobQueue struct { +type jobQueue[T any] struct { leaseExpiry time.Duration logger log.Logger mu sync.Mutex epoch int64 - jobs map[string]*job - unassigned jobHeap + jobs map[string]*job[T] + unassigned jobHeap[*job[T]] } -func newJobQueue(leaseExpiry time.Duration, logger log.Logger) *jobQueue { - return &jobQueue{ +func newJobQueue[T any](leaseExpiry time.Duration, logger log.Logger, lessFunc func(T, T) bool) *jobQueue[T] { + return &jobQueue[T]{ leaseExpiry: leaseExpiry, logger: logger, - jobs: make(map[string]*job), + jobs: make(map[string]*job[T]), + + unassigned: jobHeap[*job[T]]{ + h: make([]*job[T], 0), + less: func(a, b *job[T]) bool { + // Call into the provided lessFunc to compare the job specs. + return lessFunc(a.spec, b.spec) + }, + }, } } // assign assigns the highest-priority unassigned job to the given worker. -func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) { +func (s *jobQueue[T]) assign(workerID string) (jobKey, T, error) { + var empty T if workerID == "" { - return jobKey{}, jobSpec{}, errors.New("workerID cannot be empty") + return jobKey{}, empty, errors.New("workerID cannot be empty") } s.mu.Lock() defer s.mu.Unlock() if s.unassigned.Len() == 0 { - return jobKey{}, jobSpec{}, errNoJobAvailable + return jobKey{}, empty, errNoJobAvailable } - j := heap.Pop(&s.unassigned).(*job) + j := heap.Pop(&s.unassigned).(*job[T]) j.key.epoch = s.epoch s.epoch++ j.assignee = workerID @@ -61,7 +71,7 @@ func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) { // importJob imports a job with the given ID and spec into the jobQueue. This is // meant to be used during recovery, when we're reconstructing the jobQueue from // worker updates. -func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error { +func (s *jobQueue[T]) importJob(key jobKey, workerID string, spec T) error { if key.id == "" { return errors.New("jobID cannot be empty") } @@ -91,7 +101,7 @@ func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error { j.spec = spec } } else { - s.jobs[key.id] = &job{ + s.jobs[key.id] = &job[T]{ key: key, assignee: workerID, leaseExpiry: time.Now().Add(s.leaseExpiry), @@ -103,20 +113,21 @@ func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error { } // addOrUpdate adds a new job or updates an existing job with the given spec. -func (s *jobQueue) addOrUpdate(id string, spec jobSpec) { +func (s *jobQueue[T]) addOrUpdate(id string, spec T) { s.mu.Lock() defer s.mu.Unlock() if j, ok := s.jobs[id]; ok { // We can only update an unassigned job. if j.assignee == "" { + level.Info(s.logger).Log("msg", "updated job", "job_id", id) j.spec = spec } return } // Otherwise, add a new job. - j := &job{ + j := &job[T]{ key: jobKey{ id: id, epoch: 0, @@ -128,11 +139,13 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) { } s.jobs[id] = j heap.Push(&s.unassigned, j) + + level.Info(s.logger).Log("msg", "created job", "job_id", id) } // renewLease renews the lease of the job with the given ID for the given // worker. -func (s *jobQueue) renewLease(key jobKey, workerID string) error { +func (s *jobQueue[T]) renewLease(key jobKey, workerID string) error { if key.id == "" { return errors.New("jobID cannot be empty") } @@ -155,12 +168,15 @@ func (s *jobQueue) renewLease(key jobKey, workerID string) error { } j.leaseExpiry = time.Now().Add(s.leaseExpiry) + + level.Info(s.logger).Log("msg", "renewed lease", "job_id", key.id, "epoch", key.epoch, "worker_id", workerID) + return nil } // completeJob completes the job with the given ID for the given worker, // removing it from the jobQueue. -func (s *jobQueue) completeJob(key jobKey, workerID string) error { +func (s *jobQueue[T]) completeJob(key jobKey, workerID string) error { if key.id == "" { return errors.New("jobID cannot be empty") } @@ -183,12 +199,14 @@ func (s *jobQueue) completeJob(key jobKey, workerID string) error { } delete(s.jobs, key.id) + + level.Info(s.logger).Log("msg", "removed completed job from queue", "job_id", key.id, "epoch", key.epoch, "worker_id", workerID) return nil } // clearExpiredLeases unassigns jobs whose leases have expired, making them // eligible for reassignment. -func (s *jobQueue) clearExpiredLeases() { +func (s *jobQueue[T]) clearExpiredLeases() { now := time.Now() s.mu.Lock() @@ -199,19 +217,21 @@ func (s *jobQueue) clearExpiredLeases() { j.assignee = "" j.failCount++ heap.Push(&s.unassigned, j) + + level.Debug(s.logger).Log("msg", "unassigned expired lease", "job_id", j.key.id, "epoch", j.key.epoch, "assignee", j.assignee) } } } -type job struct { +type job[T any] struct { key jobKey assignee string leaseExpiry time.Time failCount int - // job payload details. We can make this generic later for reuse. - spec jobSpec + // spec contains the job payload details disseminated to the worker. + spec T } type jobKey struct { @@ -221,38 +241,26 @@ type jobKey struct { epoch int64 } -type jobSpec struct { - topic string - partition int32 - startOffset int64 - endOffset int64 - commitRecTs time.Time - lastSeenOffset int64 - lastBlockEndTs time.Time +type jobHeap[T any] struct { + h []T + less func(T, T) bool } -func (a *jobSpec) less(b *jobSpec) bool { - return a.commitRecTs.Before(b.commitRecTs) -} - -type jobHeap []*job - -// Implement the heap.Interface for jobHeap. -func (h jobHeap) Len() int { return len(h) } -func (h jobHeap) Less(i, j int) bool { return h[i].spec.less(&h[j].spec) } -func (h jobHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +// Implement the heap.Interface for jobHeap's `h` field. +func (h jobHeap[T]) Len() int { return len(h.h) } +func (h jobHeap[T]) Less(i, j int) bool { return h.less(h.h[i], h.h[j]) } +func (h jobHeap[T]) Swap(i, j int) { h.h[i], h.h[j] = h.h[j], h.h[i] } -func (h *jobHeap) Push(x interface{}) { - *h = append(*h, x.(*job)) +func (h *jobHeap[T]) Push(x any) { + h.h = append(h.h, x.(T)) } -func (h *jobHeap) Pop() interface{} { - old := *h +func (h *jobHeap[T]) Pop() any { + old := h.h n := len(old) x := old[n-1] - old[n-1] = nil - *h = old[0 : n-1] + h.h = old[0 : n-1] return x } -var _ heap.Interface = (*jobHeap)(nil) +var _ heap.Interface = (*jobHeap[int])(nil) diff --git a/pkg/blockbuilder/scheduler/jobs_test.go b/pkg/blockbuilder/scheduler/jobs_test.go index dae0327c471..ecb52230a35 100644 --- a/pkg/blockbuilder/scheduler/jobs_test.go +++ b/pkg/blockbuilder/scheduler/jobs_test.go @@ -14,15 +14,24 @@ import ( "github.com/grafana/mimir/pkg/util/test" ) +type testSpec struct { + topic string + commitRecTs time.Time +} + +func testSpecLess(a, b testSpec) bool { + return a.commitRecTs.Before(b.commitRecTs) +} + func TestAssign(t *testing.T) { - s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) + s := newJobQueue(988*time.Hour, test.NewTestingLogger(t), testSpecLess) j0, j0spec, err := s.assign("w0") require.Empty(t, j0.id) require.Zero(t, j0spec) require.ErrorIs(t, err, errNoJobAvailable) - s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) + s.addOrUpdate("job1", testSpec{topic: "hello", commitRecTs: time.Now()}) j1, j1spec, err := s.assign("w0") require.NotEmpty(t, j1.id) require.NotZero(t, j1spec) @@ -34,7 +43,7 @@ func TestAssign(t *testing.T) { require.Zero(t, j2spec) require.ErrorIs(t, err, errNoJobAvailable) - s.addOrUpdate("job2", jobSpec{topic: "hello2", commitRecTs: time.Now()}) + s.addOrUpdate("job2", testSpec{topic: "hello2", commitRecTs: time.Now()}) j3, j3spec, err := s.assign("w0") require.NotZero(t, j3.id) require.NotZero(t, j3spec) @@ -43,14 +52,14 @@ func TestAssign(t *testing.T) { } func TestAssignComplete(t *testing.T) { - s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) + s := newJobQueue(988*time.Hour, test.NewTestingLogger(t), testSpecLess) { err := s.completeJob(jobKey{"rando job", 965}, "w0") require.ErrorIs(t, err, errJobNotFound) } - s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) + s.addOrUpdate("job1", testSpec{topic: "hello", commitRecTs: time.Now()}) jk, jspec, err := s.assign("w0") require.NotZero(t, jk) require.NotZero(t, jspec) @@ -87,8 +96,8 @@ func TestAssignComplete(t *testing.T) { } func TestLease(t *testing.T) { - s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) - s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()}) + s := newJobQueue(988*time.Hour, test.NewTestingLogger(t), testSpecLess) + s.addOrUpdate("job1", testSpec{topic: "hello", commitRecTs: time.Now()}) jk, jspec, err := s.assign("w0") require.NotZero(t, jk.id) require.NotZero(t, jspec) @@ -128,8 +137,8 @@ func TestLease(t *testing.T) { // TestImportJob tests the importJob method - the method that is called to learn // about jobs in-flight from a previous scheduler instance. func TestImportJob(t *testing.T) { - s := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) - spec := jobSpec{commitRecTs: time.Now().Add(-1 * time.Hour)} + s := newJobQueue(988*time.Hour, test.NewTestingLogger(t), testSpecLess) + spec := testSpec{commitRecTs: time.Now().Add(-1 * time.Hour)} require.NoError(t, s.importJob(jobKey{"job1", 122}, "w0", spec)) require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec)) require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 122}, "w0", spec)) @@ -146,15 +155,15 @@ func TestImportJob(t *testing.T) { func TestMinHeap(t *testing.T) { n := 517 - jobs := make([]*job, n) + jobs := make([]*job[testSpec], n) order := make([]int, n) for i := 0; i < n; i++ { - jobs[i] = &job{ + jobs[i] = &job[testSpec]{ key: jobKey{ id: fmt.Sprintf("job%d", i), epoch: 0, }, - spec: jobSpec{topic: "hello", commitRecTs: time.Unix(int64(i), 0)}, + spec: testSpec{topic: "hello", commitRecTs: time.Unix(int64(i), 0)}, } order[i] = i } @@ -163,17 +172,23 @@ func TestMinHeap(t *testing.T) { r := rand.New(rand.NewSource(9900)) r.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] }) - h := jobHeap{} + h := jobHeap[*job[testSpec]]{ + less: func(a, b *job[testSpec]) bool { + return testSpecLess(a.spec, b.spec) + }, + } + for _, j := range order { heap.Push(&h, jobs[j]) } - require.Len(t, h, n) + require.Equal(t, n, h.Len()) for i := 0; i < len(jobs); i++ { - p := heap.Pop(&h).(*job) + p := heap.Pop(&h).(*job[testSpec]) + println(i) require.Equal(t, jobs[i], p, "pop order should be in increasing commitRecTs") } - require.Empty(t, h) + require.Zero(t, h.Len()) } diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 971752a9c7d..f3ee93dd72a 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -19,6 +19,7 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/blockbuilder" + "github.com/grafana/mimir/pkg/blockbuilder/schedulerpb" "github.com/grafana/mimir/pkg/storage/ingest" ) @@ -26,13 +27,15 @@ type BlockBuilderScheduler struct { services.Service adminClient *kadm.Client - jobs *jobQueue + jobs *jobQueue[schedulerpb.JobSpec] cfg Config logger log.Logger register prometheus.Registerer metrics schedulerMetrics - mu sync.Mutex + mu sync.Mutex + // committed is our local notion of the committed offsets. + // It is learned from Kafka at startup, but only updated by the completion of jobs. committed kadm.Offsets observations obsMap observationComplete bool @@ -114,7 +117,10 @@ func (s *BlockBuilderScheduler) running(ctx context.Context) error { for { select { case <-updateTick.C: + // These tasks are not prerequisites to updating the schedule, but + // we do them here rather than creating a ton of update tickers. s.jobs.clearExpiredLeases() + s.updateSchedule(ctx) case <-ctx.Done(): return nil @@ -130,7 +136,7 @@ func (s *BlockBuilderScheduler) completeObservationMode() { return } - s.jobs = newJobQueue(s.cfg.JobLeaseExpiry, s.logger) + s.jobs = newJobQueue(s.cfg.JobLeaseExpiry, s.logger, specLessThan) s.finalizeObservations() s.observations = nil s.observationComplete = true @@ -178,14 +184,14 @@ func (s *BlockBuilderScheduler) updateSchedule(ctx context.Context) { // The job is uniquely identified by {topic, partition, consumption start offset}. jobID := fmt.Sprintf("%s/%d/%d", o.Topic, o.Partition, l.Commit.At) partState := blockbuilder.PartitionStateFromLag(s.logger, l, 0) - s.jobs.addOrUpdate(jobID, jobSpec{ - topic: o.Topic, - partition: o.Partition, - startOffset: l.Commit.At, - endOffset: l.End.Offset, - commitRecTs: partState.CommitRecordTimestamp, - lastSeenOffset: partState.LastSeenOffset, - lastBlockEndTs: partState.LastBlockEnd, + s.jobs.addOrUpdate(jobID, schedulerpb.JobSpec{ + Topic: o.Topic, + Partition: o.Partition, + StartOffset: l.Commit.At, + EndOffset: l.End.Offset, + CommitRecTs: partState.CommitRecordTimestamp, + LastSeenOffset: partState.LastSeenOffset, + LastBlockEndTs: partState.LastBlockEnd, }) } } @@ -224,39 +230,68 @@ func commitOffsetsFromLag(lag kadm.GroupLag) kadm.Offsets { return offsets } +// AssignJob returns an assigned job for the given workerID. +func (s *BlockBuilderScheduler) AssignJob(_ context.Context, req *schedulerpb.AssignJobRequest) (*schedulerpb.AssignJobResponse, error) { + key, spec, err := s.assignJob(req.WorkerId) + if err != nil { + return nil, err + } + + return &schedulerpb.AssignJobResponse{ + Key: &schedulerpb.JobKey{ + Id: key.id, + Epoch: key.epoch, + }, + Spec: &spec, + }, err +} + // assignJob returns an assigned job for the given workerID. -// (This is a temporary method for unit tests until we have RPCs.) -func (s *BlockBuilderScheduler) assignJob(workerID string) (jobKey, jobSpec, error) { +func (s *BlockBuilderScheduler) assignJob(workerID string) (jobKey, schedulerpb.JobSpec, error) { s.mu.Lock() doneObserving := s.observationComplete s.mu.Unlock() if !doneObserving { - return jobKey{}, jobSpec{}, status.Error(codes.Unavailable, "observation period not complete") + var empty schedulerpb.JobSpec + return jobKey{}, empty, status.Error(codes.Unavailable, "observation period not complete") } return s.jobs.assign(workerID) } -// updateJob takes a job update from the client and records it, if necessary. -// (This is a temporary method for unit tests until we have RPCs.) -func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete bool, j jobSpec) error { +// UpdateJob takes a job update from the client and records it, if necessary. +func (s *BlockBuilderScheduler) UpdateJob(_ context.Context, req *schedulerpb.UpdateJobRequest) (*schedulerpb.UpdateJobResponse, error) { + k := jobKey{ + id: req.Key.Id, + epoch: req.Key.Epoch, + } + if err := s.updateJob(k, req.WorkerId, req.Complete, *req.Spec); err != nil { + return nil, err + } + return &schedulerpb.UpdateJobResponse{}, nil +} + +func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete bool, j schedulerpb.JobSpec) error { + logger := log.With(s.logger, "job_id", key.id, "epoch", key.epoch, "worker", workerID) + s.mu.Lock() defer s.mu.Unlock() if !s.observationComplete { + // We're still in observation mode. Record the observation. if err := s.updateObservation(key, workerID, complete, j); err != nil { return fmt.Errorf("observe update: %w", err) } - s.logger.Log("msg", "recovered job", "key", key, "worker", workerID) + logger.Log("msg", "recovered job") return nil } - if c, ok := s.committed.Lookup(s.cfg.Kafka.Topic, j.partition); ok { - if j.startOffset <= c.At { + if c, ok := s.committed.Lookup(s.cfg.Kafka.Topic, j.Partition); ok { + if j.StartOffset <= c.At { // Update of a completed/committed job. Ignore. - s.logger.Log("msg", "ignored historical job", "key", key, "worker", workerID) + level.Debug(logger).Log("msg", "ignored historical job") return nil } } @@ -271,18 +306,18 @@ func (s *BlockBuilderScheduler) updateJob(key jobKey, workerID string, complete // TODO: Push forward the local notion of the committed offset. - s.logger.Log("msg", "completed job", "key", key, "worker", workerID) + logger.Log("msg", "completed job") } else { // It's an in-progress job whose lease we need to renew. if err := s.jobs.renewLease(key, workerID); err != nil { return fmt.Errorf("renew lease: %w", err) } - s.logger.Log("msg", "renewed lease", "key", key, "worker", workerID) + logger.Log("msg", "renewed lease") } return nil } -func (s *BlockBuilderScheduler) updateObservation(key jobKey, workerID string, complete bool, j jobSpec) error { +func (s *BlockBuilderScheduler) updateObservation(key jobKey, workerID string, complete bool, j schedulerpb.JobSpec) error { rj, ok := s.observations[key.id] if !ok { s.observations[key.id] = &observation{ @@ -313,18 +348,18 @@ func (s *BlockBuilderScheduler) finalizeObservations() { for _, rj := range s.observations { if rj.complete { // Completed. - if o, ok := s.committed.Lookup(rj.spec.topic, rj.spec.partition); ok { - if rj.spec.endOffset > o.At { + if o, ok := s.committed.Lookup(rj.spec.Topic, rj.spec.Partition); ok { + if rj.spec.EndOffset > o.At { // Completed jobs can push forward the offsets we've learned from Kafka. - o.At = rj.spec.endOffset + o.At = rj.spec.EndOffset o.Metadata = "{}" // TODO: take the new meta from the completion message. - s.committed[rj.spec.topic][rj.spec.partition] = o + s.committed[rj.spec.Topic][rj.spec.Partition] = o } } else { s.committed.Add(kadm.Offset{ - Topic: rj.spec.topic, - Partition: rj.spec.partition, - At: rj.spec.endOffset, + Topic: rj.spec.Topic, + Partition: rj.spec.Partition, + At: rj.spec.EndOffset, Metadata: "{}", // TODO: take the new meta from the completion message. }) } @@ -332,7 +367,7 @@ func (s *BlockBuilderScheduler) finalizeObservations() { // An in-progress job. // These don't affect offsets (yet), they just get added to the job queue. if err := s.jobs.importJob(rj.key, rj.workerID, rj.spec); err != nil { - level.Warn(s.logger).Log("msg", "failed to import job", "key", rj.key, "worker", rj.workerID, "err", err) + level.Warn(s.logger).Log("msg", "failed to import job", "job_id", rj.key.id, "epoch", rj.key.epoch, "worker", rj.workerID, "err", err) } } } @@ -342,7 +377,14 @@ type obsMap map[string]*observation type observation struct { key jobKey - spec jobSpec + spec schedulerpb.JobSpec workerID string complete bool } + +var _ schedulerpb.BlockBuilderSchedulerServer = (*BlockBuilderScheduler)(nil) + +// specLessThan determines whether spec a should come before b in job scheduling. +func specLessThan(a, b schedulerpb.JobSpec) bool { + return a.CommitRecTs.Before(b.CommitRecTs) +} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index cecd6d0c378..e94c1113cda 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -13,10 +13,12 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" + "github.com/grafana/mimir/pkg/blockbuilder/schedulerpb" "github.com/grafana/mimir/pkg/storage/ingest" "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/testkafka" @@ -66,40 +68,40 @@ func TestStartup(t *testing.T) { now := time.Now() // Some jobs that ostensibly exist, but scheduler doesn't know about. - j1 := job{ + j1 := job[schedulerpb.JobSpec]{ key: jobKey{ id: "ingest/64/1000", epoch: 10, }, - spec: jobSpec{ - topic: "ingest", - partition: 64, - startOffset: 1000, - commitRecTs: now.Add(-1 * time.Hour), + spec: schedulerpb.JobSpec{ + Topic: "ingest", + Partition: 64, + StartOffset: 1000, + CommitRecTs: now.Add(-1 * time.Hour), }, } - j2 := job{ + j2 := job[schedulerpb.JobSpec]{ key: jobKey{ id: "ingest/65/256", epoch: 11, }, - spec: jobSpec{ - topic: "ingest", - partition: 65, - startOffset: 256, - commitRecTs: now.Add(-2 * time.Hour), + spec: schedulerpb.JobSpec{ + Topic: "ingest", + Partition: 65, + StartOffset: 256, + CommitRecTs: now.Add(-2 * time.Hour), }, } - j3 := job{ + j3 := job[schedulerpb.JobSpec]{ key: jobKey{ id: "ingest/66/57", epoch: 12, }, - spec: jobSpec{ - topic: "ingest", - partition: 66, - startOffset: 57, - commitRecTs: now.Add(-3 * time.Hour), + spec: schedulerpb.JobSpec{ + Topic: "ingest", + Partition: 66, + StartOffset: 57, + CommitRecTs: now.Add(-3 * time.Hour), }, } @@ -140,12 +142,12 @@ func TestStartup(t *testing.T) { } // And we can resume normal operation: - sched.jobs.addOrUpdate("ingest/65/256", jobSpec{ - topic: "ingest", - partition: 65, - startOffset: 256, - endOffset: 9111, - commitRecTs: now.Add(-1 * time.Hour), + sched.jobs.addOrUpdate("ingest/65/256", schedulerpb.JobSpec{ + Topic: "ingest", + Partition: 65, + StartOffset: 256, + EndOffset: 9111, + CommitRecTs: now.Add(-1 * time.Hour), }) a1key, a1spec, err := sched.assignJob("w0") @@ -189,7 +191,7 @@ func TestObservations(t *testing.T) { } { - nq := newJobQueue(988*time.Hour, test.NewTestingLogger(t)) + nq := newJobQueue(988*time.Hour, test.NewTestingLogger(t), specLessThan) sched.jobs = nq sched.finalizeObservations() require.Len(t, nq.jobs, 0, "No observations, no jobs") @@ -197,7 +199,7 @@ func TestObservations(t *testing.T) { type observation struct { key jobKey - spec jobSpec + spec schedulerpb.JobSpec workerID string complete bool expectErr error @@ -211,11 +213,11 @@ func TestObservations(t *testing.T) { mkJob := func(isComplete bool, worker string, partition int32, id string, epoch int64, commitRecTs time.Time, endOffset int64, expectErr error) { clientData = append(clientData, observation{ key: jobKey{id: id, epoch: epoch}, - spec: jobSpec{ - topic: "ingest", - partition: partition, - commitRecTs: commitRecTs, - endOffset: endOffset, + spec: schedulerpb.JobSpec{ + Topic: "ingest", + Partition: partition, + CommitRecTs: commitRecTs, + EndOffset: endOffset, }, workerID: worker, complete: isComplete, @@ -342,3 +344,11 @@ func TestMonitor(t *testing.T) { cortex_blockbuilder_scheduler_partition_end_offset{partition="3"} 3 `), "cortex_blockbuilder_scheduler_partition_end_offset")) } + +func TestLessThan(t *testing.T) { + now := time.Now() + oneHourAgo := now.Add(-1 * time.Hour) + assert.True(t, specLessThan(schedulerpb.JobSpec{CommitRecTs: oneHourAgo}, schedulerpb.JobSpec{CommitRecTs: now})) + assert.False(t, specLessThan(schedulerpb.JobSpec{CommitRecTs: now}, schedulerpb.JobSpec{CommitRecTs: oneHourAgo})) + assert.False(t, specLessThan(schedulerpb.JobSpec{CommitRecTs: now}, schedulerpb.JobSpec{CommitRecTs: now})) +} diff --git a/pkg/blockbuilder/schedulerpb/client.go b/pkg/blockbuilder/schedulerpb/client.go new file mode 100644 index 00000000000..6d36492e412 --- /dev/null +++ b/pkg/blockbuilder/schedulerpb/client.go @@ -0,0 +1,202 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package schedulerpb + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" +) + +// SchedulerClient is a client for the scheduler service. +// It encapsulates the communication style expected by the scheduler service: +// - AssignJob is polled repeatedly until a job is available. +// - UpdateJob is called periodically to update the status of all known jobs. +// +// SchedulerClient maintains a history of locally-known jobs that are expired +// some time after completion. +type SchedulerClient interface { + Run(context.Context) + GetJob(context.Context) (JobKey, JobSpec, error) + CompleteJob(JobKey) error +} + +type schedulerClient struct { + workerID string + updateInterval time.Duration + maxUpdateAge time.Duration + scheduler BlockBuilderSchedulerClient + logger log.Logger + + mu sync.Mutex + jobs map[JobKey]*job +} + +type job struct { + spec JobSpec + complete bool + // The time, if non-zero, when this job entry will become eligible for purging. + forgetTime time.Time +} + +// NewSchedulerClient creates a new SchedulerClient around the given scheduler +// service client. The client will inform the scheduler service about jobs once +// per updateInterval, and will forget about jobs that have been complete for at +// least maxUpdateAge. Thus maxUpdateAge should be at least twice +// updateInterval. +func NewSchedulerClient(workerID string, scheduler BlockBuilderSchedulerClient, logger log.Logger, + updateInterval time.Duration, maxUpdateAge time.Duration) (SchedulerClient, error) { + + if updateInterval <= 0 { + return nil, fmt.Errorf("updateInterval must be positive") + } + if maxUpdateAge <= 0 { + return nil, fmt.Errorf("maxUpdateAge must be positive") + } + if maxUpdateAge < updateInterval*2 { + return nil, fmt.Errorf("maxUpdateAge must be at least twice the updateInterval") + } + + return &schedulerClient{ + workerID: workerID, + updateInterval: updateInterval, + maxUpdateAge: maxUpdateAge, + scheduler: scheduler, + logger: logger, + + jobs: make(map[JobKey]*job), + }, nil +} + +// Run periodically sends updates to the scheduler service and performs cleanup of old jobs. +// Run will block and run until the given context is canceled. +func (s *schedulerClient) Run(ctx context.Context) { + ticker := time.NewTicker(s.updateInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.sendUpdates(ctx) + s.forgetOldJobs() + + case <-ctx.Done(): + return + } + } +} + +func (s *schedulerClient) sendUpdates(ctx context.Context) { + for key, j := range s.snapshot() { + _, err := s.scheduler.UpdateJob(ctx, &UpdateJobRequest{ + Key: &key, + WorkerId: s.workerID, + Spec: &j.spec, + Complete: j.complete, + }) + if err != nil { + level.Error(s.logger).Log("msg", "failed to update job", "job_id", key.Id, "epoch", key.Epoch, "err", err) + } + } +} + +// snapshot returns a snapshot of the current jobs map. +func (s *schedulerClient) snapshot() map[JobKey]*job { + s.mu.Lock() + defer s.mu.Unlock() + + jobs := make(map[JobKey]*job, len(s.jobs)) + for k, v := range s.jobs { + jobs[k] = &job{ + spec: v.spec, + complete: v.complete, + forgetTime: v.forgetTime, + } + } + return jobs +} + +func (s *schedulerClient) forgetOldJobs() { + s.mu.Lock() + defer s.mu.Unlock() + + now := time.Now() + + for key, j := range s.jobs { + if !j.forgetTime.IsZero() && now.After(j.forgetTime) { + level.Info(s.logger).Log("msg", "forgetting old job", "job_id", key.Id, "epoch", key.Epoch) + delete(s.jobs, key) + } + } +} + +// GetJob returns the job assigned to the worker with the given ID. +// It will block until a job is available. +func (s *schedulerClient) GetJob(ctx context.Context) (JobKey, JobSpec, error) { + boff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 0, // retry as long as the context is valid + }) + var lastErr error + for boff.Ongoing() { + callCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + response, err := s.scheduler.AssignJob(callCtx, &AssignJobRequest{ + WorkerId: s.workerID, + }) + cancel() + if err != nil { + lastErr = err + boff.Wait() + continue + } + + // If we get here, we have a newly assigned job. Track it and return it. + key := *response.GetKey() + spec := *response.GetSpec() + level.Info(s.logger).Log("msg", "assigned job", "job_id", key.Id, "epoch", key.Epoch) + + s.mu.Lock() + if _, ok := s.jobs[key]; ok { + // This should never happen; we'd like to see a log if it does. + level.Warn(s.logger).Log("msg", "job already assigned", "job_id", key.Id, "epoch", key.Epoch) + } + + s.jobs[key] = &job{ + spec: spec, + complete: false, + forgetTime: time.Time{}, + } + s.mu.Unlock() + + return key, spec, nil + } + + return JobKey{}, JobSpec{}, lastErr +} + +func (s *schedulerClient) CompleteJob(jobKey JobKey) error { + level.Info(s.logger).Log("msg", "marking job as completed", "job_id", jobKey.Id, "epoch", jobKey.Epoch) + + s.mu.Lock() + defer s.mu.Unlock() + + j, ok := s.jobs[jobKey] + if !ok { + return fmt.Errorf("job %s (%d) not found", jobKey.GetId(), jobKey.GetEpoch()) + } + if j.complete { + return nil + } + + // Set it as complete and also set a time when it'll become eligible for forgetting. + j.complete = true + j.forgetTime = time.Now().Add(s.maxUpdateAge) + return nil +} + +var _ SchedulerClient = (*schedulerClient)(nil) diff --git a/pkg/blockbuilder/schedulerpb/client_test.go b/pkg/blockbuilder/schedulerpb/client_test.go new file mode 100644 index 00000000000..276926d642f --- /dev/null +++ b/pkg/blockbuilder/schedulerpb/client_test.go @@ -0,0 +1,172 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package schedulerpb + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + grpc "google.golang.org/grpc" + + "github.com/grafana/mimir/pkg/util/test" +) + +func TestGetJob(t *testing.T) { + sched := &mockSchedulerClient{} + sched.jobs = []mockJob{ + {key: JobKey{Id: "foo/983/585", Epoch: 4523}, spec: JobSpec{Topic: "foo", Partition: 983, StartOffset: 585}}, + } + + cli, cliErr := NewSchedulerClient("worker1", sched, test.NewTestingLogger(t), 5*time.Minute, 20*time.Minute) + require.NoError(t, cliErr) + ctx := context.Background() + key, spec, err := cli.GetJob(ctx) + + require.NoError(t, err) + require.Equal(t, sched.jobs[0].key, key) + require.Equal(t, sched.jobs[0].spec, spec) + require.Equal(t, 1, sched.assignCalls) +} + +func TestSendUpdates(t *testing.T) { + sched := &mockSchedulerClient{} + sched.jobs = []mockJob{ + {key: JobKey{Id: "foo/983/585", Epoch: 4523}, spec: JobSpec{Topic: "foo", Partition: 983, StartOffset: 585}}, + {key: JobKey{Id: "foo/4/92842", Epoch: 4524}, spec: JobSpec{Topic: "foo", Partition: 4, StartOffset: 92842}}, + } + + clii, cliErr := NewSchedulerClient("worker1", sched, test.NewTestingLogger(t), 5*time.Minute, 100*time.Hour) + require.NoError(t, cliErr) + cli := clii.(*schedulerClient) + ctx := context.Background() + k, _, err := cli.GetJob(ctx) + require.NoError(t, err) + require.Empty(t, sched.updates) + + cli.sendUpdates(ctx) + require.EqualValues(t, []*UpdateJobRequest{ + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + }, sched.updates) + + cli.sendUpdates(ctx) + require.EqualValues(t, []*UpdateJobRequest{ + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + }, sched.updates) + + require.NoError(t, cli.CompleteJob(k)) + _, _, err2 := cli.GetJob(ctx) + require.NoError(t, err2) + + // Now we have one complete job and one incomplete. We should be sending updates for both. + cli.sendUpdates(ctx) + require.EqualValues(t, []*UpdateJobRequest{ + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: true}, + {Key: &sched.jobs[1].key, WorkerId: "worker1", Spec: &sched.jobs[1].spec, Complete: false}, + }, sched.updates) + + cli.sendUpdates(ctx) + require.EqualValues(t, []*UpdateJobRequest{ + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: true}, + {Key: &sched.jobs[1].key, WorkerId: "worker1", Spec: &sched.jobs[1].spec, Complete: false}, + {Key: &sched.jobs[0].key, WorkerId: "worker1", Spec: &sched.jobs[0].spec, Complete: true}, + {Key: &sched.jobs[1].key, WorkerId: "worker1", Spec: &sched.jobs[1].spec, Complete: false}, + }, sched.updates) +} + +// a mutator for tests. +func (c *schedulerClient) completeJobWithForgetTime(k JobKey, forgetTime time.Time) error { + if err := c.CompleteJob(k); err != nil { + return err + } + c.jobs[k].forgetTime = forgetTime + return nil +} + +func TestForget(t *testing.T) { + sched := &mockSchedulerClient{} + sched.jobs = []mockJob{ + {key: JobKey{Id: "foo/983/585", Epoch: 4523}, spec: JobSpec{Topic: "foo", Partition: 983, StartOffset: 585}}, + {key: JobKey{Id: "foo/4/92842", Epoch: 4524}, spec: JobSpec{Topic: "foo", Partition: 4, StartOffset: 92842}}, + } + + clii, cliErr := NewSchedulerClient("worker1", sched, test.NewTestingLogger(t), 5*time.Minute, 20*time.Minute) + require.NoError(t, cliErr) + ctx := context.Background() + cli := clii.(*schedulerClient) + k, _, err := cli.GetJob(ctx) + require.NoError(t, err) + require.Len(t, cli.jobs, 1) + + k2, _, err2 := cli.GetJob(ctx) + require.NoError(t, err2) + require.Len(t, cli.jobs, 2) + + // Forgetting with no eligible jobs should do nothing. + require.NoError(t, cli.completeJobWithForgetTime(k, time.Now().Add(1_000_000*time.Hour))) + cli.forgetOldJobs() + require.Len(t, cli.jobs, 2, "job count should be unchanged with no eligible jobs for purging") + + // Forgetting with some eligible jobs. + require.NoError(t, cli.completeJobWithForgetTime(k, time.Now().Add(-1*time.Minute))) + cli.forgetOldJobs() + require.Len(t, cli.jobs, 1, "job should have been purged") + + // Forget the other... + require.NoError(t, cli.completeJobWithForgetTime(k2, time.Now().Add(-1*time.Minute))) + cli.forgetOldJobs() + require.Len(t, cli.jobs, 0, "job should have been purged") + + // And make sure we can handle an empty map. + cli.forgetOldJobs() + require.Len(t, cli.jobs, 0, "forgetting an empty map should do nothing") +} + +type mockSchedulerClient struct { + mu sync.Mutex + jobs []mockJob + currentJob int + assignCalls int + updates []*UpdateJobRequest +} + +type mockJob struct { + key JobKey + spec JobSpec +} + +func (m *mockSchedulerClient) AssignJob(_ context.Context, _ *AssignJobRequest, _ ...grpc.CallOption) (*AssignJobResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.assignCalls++ + + if m.currentJob >= len(m.jobs) { + return nil, errors.New("no jobs available") + } + + j := m.jobs[m.currentJob] + m.currentJob++ + + return &AssignJobResponse{ + Key: &j.key, + Spec: &j.spec, + }, nil +} + +func (m *mockSchedulerClient) UpdateJob(_ context.Context, r *UpdateJobRequest, _ ...grpc.CallOption) (*UpdateJobResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.updates = append(m.updates, r) + + return &UpdateJobResponse{}, nil +} + +var _ BlockBuilderSchedulerClient = (*mockSchedulerClient)(nil) diff --git a/pkg/blockbuilder/schedulerpb/scheduler.pb.go b/pkg/blockbuilder/schedulerpb/scheduler.pb.go new file mode 100644 index 00000000000..c613fb27bae --- /dev/null +++ b/pkg/blockbuilder/schedulerpb/scheduler.pb.go @@ -0,0 +1,2148 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: scheduler.proto + +package schedulerpb + +import ( + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + _ "google.golang.org/protobuf/types/known/timestamppb" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" + time "time" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type JobKey struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Epoch int64 `protobuf:"varint,2,opt,name=epoch,proto3" json:"epoch,omitempty"` +} + +func (m *JobKey) Reset() { *m = JobKey{} } +func (*JobKey) ProtoMessage() {} +func (*JobKey) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{0} +} +func (m *JobKey) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JobKey) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_JobKey.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *JobKey) XXX_Merge(src proto.Message) { + xxx_messageInfo_JobKey.Merge(m, src) +} +func (m *JobKey) XXX_Size() int { + return m.Size() +} +func (m *JobKey) XXX_DiscardUnknown() { + xxx_messageInfo_JobKey.DiscardUnknown(m) +} + +var xxx_messageInfo_JobKey proto.InternalMessageInfo + +func (m *JobKey) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *JobKey) GetEpoch() int64 { + if m != nil { + return m.Epoch + } + return 0 +} + +type JobSpec struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition int32 `protobuf:"varint,2,opt,name=partition,proto3" json:"partition,omitempty"` + StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3" json:"start_offset,omitempty"` + EndOffset int64 `protobuf:"varint,4,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"` + CommitRecTs time.Time `protobuf:"bytes,5,opt,name=commitRecTs,proto3,stdtime" json:"commitRecTs"` + LastSeenOffset int64 `protobuf:"varint,6,opt,name=lastSeenOffset,proto3" json:"lastSeenOffset,omitempty"` + LastBlockEndTs time.Time `protobuf:"bytes,7,opt,name=lastBlockEndTs,proto3,stdtime" json:"lastBlockEndTs"` +} + +func (m *JobSpec) Reset() { *m = JobSpec{} } +func (*JobSpec) ProtoMessage() {} +func (*JobSpec) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{1} +} +func (m *JobSpec) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JobSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_JobSpec.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *JobSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_JobSpec.Merge(m, src) +} +func (m *JobSpec) XXX_Size() int { + return m.Size() +} +func (m *JobSpec) XXX_DiscardUnknown() { + xxx_messageInfo_JobSpec.DiscardUnknown(m) +} + +var xxx_messageInfo_JobSpec proto.InternalMessageInfo + +func (m *JobSpec) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *JobSpec) GetPartition() int32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *JobSpec) GetStartOffset() int64 { + if m != nil { + return m.StartOffset + } + return 0 +} + +func (m *JobSpec) GetEndOffset() int64 { + if m != nil { + return m.EndOffset + } + return 0 +} + +func (m *JobSpec) GetCommitRecTs() time.Time { + if m != nil { + return m.CommitRecTs + } + return time.Time{} +} + +func (m *JobSpec) GetLastSeenOffset() int64 { + if m != nil { + return m.LastSeenOffset + } + return 0 +} + +func (m *JobSpec) GetLastBlockEndTs() time.Time { + if m != nil { + return m.LastBlockEndTs + } + return time.Time{} +} + +type AssignJobRequest struct { + WorkerId string `protobuf:"bytes,1,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty"` +} + +func (m *AssignJobRequest) Reset() { *m = AssignJobRequest{} } +func (*AssignJobRequest) ProtoMessage() {} +func (*AssignJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{2} +} +func (m *AssignJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AssignJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AssignJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AssignJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AssignJobRequest.Merge(m, src) +} +func (m *AssignJobRequest) XXX_Size() int { + return m.Size() +} +func (m *AssignJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AssignJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AssignJobRequest proto.InternalMessageInfo + +func (m *AssignJobRequest) GetWorkerId() string { + if m != nil { + return m.WorkerId + } + return "" +} + +type AssignJobResponse struct { + Key *JobKey `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Spec *JobSpec `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` +} + +func (m *AssignJobResponse) Reset() { *m = AssignJobResponse{} } +func (*AssignJobResponse) ProtoMessage() {} +func (*AssignJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{3} +} +func (m *AssignJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AssignJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AssignJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AssignJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_AssignJobResponse.Merge(m, src) +} +func (m *AssignJobResponse) XXX_Size() int { + return m.Size() +} +func (m *AssignJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_AssignJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_AssignJobResponse proto.InternalMessageInfo + +func (m *AssignJobResponse) GetKey() *JobKey { + if m != nil { + return m.Key + } + return nil +} + +func (m *AssignJobResponse) GetSpec() *JobSpec { + if m != nil { + return m.Spec + } + return nil +} + +type UpdateJobRequest struct { + Key *JobKey `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + WorkerId string `protobuf:"bytes,2,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty"` + Spec *JobSpec `protobuf:"bytes,3,opt,name=spec,proto3" json:"spec,omitempty"` + Complete bool `protobuf:"varint,4,opt,name=complete,proto3" json:"complete,omitempty"` +} + +func (m *UpdateJobRequest) Reset() { *m = UpdateJobRequest{} } +func (*UpdateJobRequest) ProtoMessage() {} +func (*UpdateJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{4} +} +func (m *UpdateJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *UpdateJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_UpdateJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *UpdateJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateJobRequest.Merge(m, src) +} +func (m *UpdateJobRequest) XXX_Size() int { + return m.Size() +} +func (m *UpdateJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateJobRequest proto.InternalMessageInfo + +func (m *UpdateJobRequest) GetKey() *JobKey { + if m != nil { + return m.Key + } + return nil +} + +func (m *UpdateJobRequest) GetWorkerId() string { + if m != nil { + return m.WorkerId + } + return "" +} + +func (m *UpdateJobRequest) GetSpec() *JobSpec { + if m != nil { + return m.Spec + } + return nil +} + +func (m *UpdateJobRequest) GetComplete() bool { + if m != nil { + return m.Complete + } + return false +} + +type UpdateJobResponse struct { +} + +func (m *UpdateJobResponse) Reset() { *m = UpdateJobResponse{} } +func (*UpdateJobResponse) ProtoMessage() {} +func (*UpdateJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_2b3fc28395a6d9c5, []int{5} +} +func (m *UpdateJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *UpdateJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_UpdateJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *UpdateJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdateJobResponse.Merge(m, src) +} +func (m *UpdateJobResponse) XXX_Size() int { + return m.Size() +} +func (m *UpdateJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_UpdateJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdateJobResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*JobKey)(nil), "schedulerpb.JobKey") + proto.RegisterType((*JobSpec)(nil), "schedulerpb.JobSpec") + proto.RegisterType((*AssignJobRequest)(nil), "schedulerpb.AssignJobRequest") + proto.RegisterType((*AssignJobResponse)(nil), "schedulerpb.AssignJobResponse") + proto.RegisterType((*UpdateJobRequest)(nil), "schedulerpb.UpdateJobRequest") + proto.RegisterType((*UpdateJobResponse)(nil), "schedulerpb.UpdateJobResponse") +} + +func init() { proto.RegisterFile("scheduler.proto", fileDescriptor_2b3fc28395a6d9c5) } + +var fileDescriptor_2b3fc28395a6d9c5 = []byte{ + // 530 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0xf5, 0x26, 0x4d, 0x1a, 0x8f, 0x51, 0x69, 0xb7, 0x41, 0xb2, 0x02, 0xdd, 0x04, 0x4b, 0xa0, + 0x9c, 0x1c, 0x29, 0xf0, 0x03, 0x44, 0x02, 0x89, 0x80, 0x84, 0xe4, 0x84, 0x73, 0x15, 0xdb, 0x93, + 0xd4, 0x8a, 0xe3, 0x35, 0xde, 0x8d, 0x50, 0x6f, 0x7c, 0x42, 0x3f, 0x80, 0x0f, 0xe0, 0xc6, 0x17, + 0x70, 0xef, 0x31, 0xc7, 0x9e, 0x80, 0x38, 0x17, 0x8e, 0xfd, 0x04, 0x94, 0x75, 0x62, 0x12, 0x23, + 0x04, 0xdc, 0x76, 0xde, 0xbc, 0x99, 0x79, 0xf3, 0x66, 0xe1, 0xae, 0xf0, 0x2e, 0xd0, 0x9f, 0x87, + 0x98, 0xd8, 0x71, 0xc2, 0x25, 0xa7, 0x46, 0x0e, 0xc4, 0x6e, 0xa3, 0x3e, 0xe1, 0x13, 0xae, 0xf0, + 0xce, 0xfa, 0x95, 0x51, 0x1a, 0xcd, 0x09, 0xe7, 0x93, 0x10, 0x3b, 0x2a, 0x72, 0xe7, 0xe3, 0x8e, + 0x0c, 0x66, 0x28, 0xe4, 0x68, 0x16, 0x67, 0x04, 0xcb, 0x86, 0x6a, 0x9f, 0xbb, 0xaf, 0xf0, 0x92, + 0x1e, 0x41, 0x29, 0xf0, 0x4d, 0xd2, 0x22, 0x6d, 0xdd, 0x29, 0x05, 0x3e, 0xad, 0x43, 0x05, 0x63, + 0xee, 0x5d, 0x98, 0xa5, 0x16, 0x69, 0x97, 0x9d, 0x2c, 0xb0, 0xbe, 0x94, 0xe0, 0xb0, 0xcf, 0xdd, + 0x41, 0x8c, 0xde, 0x9a, 0x21, 0x79, 0x1c, 0x78, 0x9b, 0xa2, 0x2c, 0xa0, 0x0f, 0x40, 0x8f, 0x47, + 0x89, 0x0c, 0x64, 0xc0, 0x23, 0x55, 0x5b, 0x71, 0x7e, 0x01, 0xf4, 0x21, 0xdc, 0x11, 0x72, 0x94, + 0xc8, 0x73, 0x3e, 0x1e, 0x0b, 0x94, 0x66, 0x59, 0x35, 0x37, 0x14, 0xf6, 0x46, 0x41, 0xf4, 0x0c, + 0x00, 0x23, 0x7f, 0x4b, 0x38, 0x50, 0x04, 0x1d, 0x23, 0x7f, 0x93, 0x7e, 0x01, 0x86, 0xc7, 0x67, + 0xb3, 0x40, 0x3a, 0xe8, 0x0d, 0x85, 0x59, 0x69, 0x91, 0xb6, 0xd1, 0x6d, 0xd8, 0xd9, 0xa2, 0xf6, + 0x76, 0x51, 0x7b, 0xb8, 0x5d, 0xb4, 0x57, 0xbb, 0xfe, 0xda, 0xd4, 0xae, 0xbe, 0x35, 0x89, 0xb3, + 0x5b, 0x48, 0x1f, 0xc3, 0x51, 0x38, 0x12, 0x72, 0x80, 0x18, 0x65, 0x9d, 0xcd, 0xaa, 0x1a, 0x55, + 0x40, 0xe9, 0xeb, 0x8c, 0xd7, 0x0b, 0xb9, 0x37, 0x7d, 0x1e, 0xf9, 0x43, 0x61, 0x1e, 0xfe, 0xc7, + 0xc8, 0x42, 0xad, 0xd5, 0x81, 0xe3, 0x67, 0x42, 0x04, 0x93, 0xa8, 0xcf, 0x5d, 0x07, 0xdf, 0xcd, + 0x51, 0x48, 0x7a, 0x1f, 0xf4, 0xf7, 0x3c, 0x99, 0x62, 0x72, 0x9e, 0x1f, 0xa0, 0x96, 0x01, 0x2f, + 0x7d, 0xcb, 0x87, 0x93, 0x9d, 0x02, 0x11, 0xf3, 0x48, 0x20, 0x7d, 0x04, 0xe5, 0x29, 0x5e, 0x2a, + 0xae, 0xd1, 0x3d, 0xb5, 0x77, 0xfe, 0x81, 0x9d, 0x5d, 0xd3, 0x59, 0xe7, 0x69, 0x1b, 0x0e, 0x44, + 0x8c, 0x9e, 0xba, 0x82, 0xd1, 0xad, 0x17, 0x79, 0xeb, 0x23, 0x3a, 0x8a, 0x61, 0x7d, 0x24, 0x70, + 0xfc, 0x36, 0xf6, 0x47, 0x12, 0x77, 0x74, 0xfd, 0xe3, 0x94, 0x3d, 0xf9, 0xa5, 0x7d, 0xf9, 0xb9, + 0x84, 0xf2, 0xdf, 0x24, 0xd0, 0x06, 0xd4, 0x3c, 0x3e, 0x8b, 0x43, 0x94, 0xa8, 0x8e, 0x5e, 0x73, + 0xf2, 0xd8, 0x3a, 0x85, 0x93, 0x1d, 0x75, 0x99, 0x09, 0xdd, 0xcf, 0x04, 0xee, 0x29, 0x67, 0x7b, + 0xf3, 0x20, 0xf4, 0x31, 0x19, 0x6c, 0x5b, 0xd3, 0x3e, 0xe8, 0xb9, 0x67, 0xf4, 0x6c, 0x6f, 0x66, + 0xd1, 0xfc, 0x06, 0xfb, 0x53, 0x7a, 0x63, 0x75, 0x1f, 0xf4, 0x7c, 0x74, 0xa1, 0x57, 0xd1, 0xb0, + 0x42, 0xaf, 0xdf, 0x14, 0xf7, 0x9e, 0x2e, 0x96, 0x4c, 0xbb, 0x59, 0x32, 0xed, 0x76, 0xc9, 0xc8, + 0x87, 0x94, 0x91, 0x4f, 0x29, 0x23, 0xd7, 0x29, 0x23, 0x8b, 0x94, 0x91, 0xef, 0x29, 0x23, 0x3f, + 0x52, 0xa6, 0xdd, 0xa6, 0x8c, 0x5c, 0xad, 0x98, 0xb6, 0x58, 0x31, 0xed, 0x66, 0xc5, 0x34, 0xb7, + 0xaa, 0x3e, 0xd8, 0x93, 0x9f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x89, 0x7b, 0x1b, 0x9d, 0x00, 0x04, + 0x00, 0x00, +} + +func (this *JobKey) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*JobKey) + if !ok { + that2, ok := that.(JobKey) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Id != that1.Id { + return false + } + if this.Epoch != that1.Epoch { + return false + } + return true +} +func (this *JobSpec) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*JobSpec) + if !ok { + that2, ok := that.(JobSpec) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Topic != that1.Topic { + return false + } + if this.Partition != that1.Partition { + return false + } + if this.StartOffset != that1.StartOffset { + return false + } + if this.EndOffset != that1.EndOffset { + return false + } + if !this.CommitRecTs.Equal(that1.CommitRecTs) { + return false + } + if this.LastSeenOffset != that1.LastSeenOffset { + return false + } + if !this.LastBlockEndTs.Equal(that1.LastBlockEndTs) { + return false + } + return true +} +func (this *AssignJobRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*AssignJobRequest) + if !ok { + that2, ok := that.(AssignJobRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.WorkerId != that1.WorkerId { + return false + } + return true +} +func (this *AssignJobResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*AssignJobResponse) + if !ok { + that2, ok := that.(AssignJobResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Key.Equal(that1.Key) { + return false + } + if !this.Spec.Equal(that1.Spec) { + return false + } + return true +} +func (this *UpdateJobRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*UpdateJobRequest) + if !ok { + that2, ok := that.(UpdateJobRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Key.Equal(that1.Key) { + return false + } + if this.WorkerId != that1.WorkerId { + return false + } + if !this.Spec.Equal(that1.Spec) { + return false + } + if this.Complete != that1.Complete { + return false + } + return true +} +func (this *UpdateJobResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*UpdateJobResponse) + if !ok { + that2, ok := that.(UpdateJobResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *JobKey) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&schedulerpb.JobKey{") + s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") + s = append(s, "Epoch: "+fmt.Sprintf("%#v", this.Epoch)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *JobSpec) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 11) + s = append(s, "&schedulerpb.JobSpec{") + s = append(s, "Topic: "+fmt.Sprintf("%#v", this.Topic)+",\n") + s = append(s, "Partition: "+fmt.Sprintf("%#v", this.Partition)+",\n") + s = append(s, "StartOffset: "+fmt.Sprintf("%#v", this.StartOffset)+",\n") + s = append(s, "EndOffset: "+fmt.Sprintf("%#v", this.EndOffset)+",\n") + s = append(s, "CommitRecTs: "+fmt.Sprintf("%#v", this.CommitRecTs)+",\n") + s = append(s, "LastSeenOffset: "+fmt.Sprintf("%#v", this.LastSeenOffset)+",\n") + s = append(s, "LastBlockEndTs: "+fmt.Sprintf("%#v", this.LastBlockEndTs)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *AssignJobRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&schedulerpb.AssignJobRequest{") + s = append(s, "WorkerId: "+fmt.Sprintf("%#v", this.WorkerId)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *AssignJobResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&schedulerpb.AssignJobResponse{") + if this.Key != nil { + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + } + if this.Spec != nil { + s = append(s, "Spec: "+fmt.Sprintf("%#v", this.Spec)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *UpdateJobRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&schedulerpb.UpdateJobRequest{") + if this.Key != nil { + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + } + s = append(s, "WorkerId: "+fmt.Sprintf("%#v", this.WorkerId)+",\n") + if this.Spec != nil { + s = append(s, "Spec: "+fmt.Sprintf("%#v", this.Spec)+",\n") + } + s = append(s, "Complete: "+fmt.Sprintf("%#v", this.Complete)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *UpdateJobResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&schedulerpb.UpdateJobResponse{") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringScheduler(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BlockBuilderSchedulerClient is the client API for BlockBuilderScheduler service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BlockBuilderSchedulerClient interface { + AssignJob(ctx context.Context, in *AssignJobRequest, opts ...grpc.CallOption) (*AssignJobResponse, error) + UpdateJob(ctx context.Context, in *UpdateJobRequest, opts ...grpc.CallOption) (*UpdateJobResponse, error) +} + +type blockBuilderSchedulerClient struct { + cc *grpc.ClientConn +} + +func NewBlockBuilderSchedulerClient(cc *grpc.ClientConn) BlockBuilderSchedulerClient { + return &blockBuilderSchedulerClient{cc} +} + +func (c *blockBuilderSchedulerClient) AssignJob(ctx context.Context, in *AssignJobRequest, opts ...grpc.CallOption) (*AssignJobResponse, error) { + out := new(AssignJobResponse) + err := c.cc.Invoke(ctx, "/schedulerpb.BlockBuilderScheduler/AssignJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *blockBuilderSchedulerClient) UpdateJob(ctx context.Context, in *UpdateJobRequest, opts ...grpc.CallOption) (*UpdateJobResponse, error) { + out := new(UpdateJobResponse) + err := c.cc.Invoke(ctx, "/schedulerpb.BlockBuilderScheduler/UpdateJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// BlockBuilderSchedulerServer is the server API for BlockBuilderScheduler service. +type BlockBuilderSchedulerServer interface { + AssignJob(context.Context, *AssignJobRequest) (*AssignJobResponse, error) + UpdateJob(context.Context, *UpdateJobRequest) (*UpdateJobResponse, error) +} + +// UnimplementedBlockBuilderSchedulerServer can be embedded to have forward compatible implementations. +type UnimplementedBlockBuilderSchedulerServer struct { +} + +func (*UnimplementedBlockBuilderSchedulerServer) AssignJob(ctx context.Context, req *AssignJobRequest) (*AssignJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method AssignJob not implemented") +} +func (*UnimplementedBlockBuilderSchedulerServer) UpdateJob(ctx context.Context, req *UpdateJobRequest) (*UpdateJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateJob not implemented") +} + +func RegisterBlockBuilderSchedulerServer(s *grpc.Server, srv BlockBuilderSchedulerServer) { + s.RegisterService(&_BlockBuilderScheduler_serviceDesc, srv) +} + +func _BlockBuilderScheduler_AssignJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AssignJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockBuilderSchedulerServer).AssignJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/schedulerpb.BlockBuilderScheduler/AssignJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockBuilderSchedulerServer).AssignJob(ctx, req.(*AssignJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _BlockBuilderScheduler_UpdateJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockBuilderSchedulerServer).UpdateJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/schedulerpb.BlockBuilderScheduler/UpdateJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockBuilderSchedulerServer).UpdateJob(ctx, req.(*UpdateJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _BlockBuilderScheduler_serviceDesc = grpc.ServiceDesc{ + ServiceName: "schedulerpb.BlockBuilderScheduler", + HandlerType: (*BlockBuilderSchedulerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "AssignJob", + Handler: _BlockBuilderScheduler_AssignJob_Handler, + }, + { + MethodName: "UpdateJob", + Handler: _BlockBuilderScheduler_UpdateJob_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "scheduler.proto", +} + +func (m *JobKey) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JobKey) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *JobKey) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Epoch != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.Epoch)) + i-- + dAtA[i] = 0x10 + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintScheduler(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *JobSpec) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JobSpec) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *JobSpec) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.LastBlockEndTs, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.LastBlockEndTs):]) + if err1 != nil { + return 0, err1 + } + i -= n1 + i = encodeVarintScheduler(dAtA, i, uint64(n1)) + i-- + dAtA[i] = 0x3a + if m.LastSeenOffset != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.LastSeenOffset)) + i-- + dAtA[i] = 0x30 + } + n2, err2 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.CommitRecTs, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.CommitRecTs):]) + if err2 != nil { + return 0, err2 + } + i -= n2 + i = encodeVarintScheduler(dAtA, i, uint64(n2)) + i-- + dAtA[i] = 0x2a + if m.EndOffset != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.EndOffset)) + i-- + dAtA[i] = 0x20 + } + if m.StartOffset != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.StartOffset)) + i-- + dAtA[i] = 0x18 + } + if m.Partition != 0 { + i = encodeVarintScheduler(dAtA, i, uint64(m.Partition)) + i-- + dAtA[i] = 0x10 + } + if len(m.Topic) > 0 { + i -= len(m.Topic) + copy(dAtA[i:], m.Topic) + i = encodeVarintScheduler(dAtA, i, uint64(len(m.Topic))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *AssignJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AssignJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AssignJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.WorkerId) > 0 { + i -= len(m.WorkerId) + copy(dAtA[i:], m.WorkerId) + i = encodeVarintScheduler(dAtA, i, uint64(len(m.WorkerId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *AssignJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AssignJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AssignJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Spec != nil { + { + size, err := m.Spec.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintScheduler(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Key != nil { + { + size, err := m.Key.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintScheduler(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *UpdateJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *UpdateJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *UpdateJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Complete { + i-- + if m.Complete { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } + if m.Spec != nil { + { + size, err := m.Spec.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintScheduler(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if len(m.WorkerId) > 0 { + i -= len(m.WorkerId) + copy(dAtA[i:], m.WorkerId) + i = encodeVarintScheduler(dAtA, i, uint64(len(m.WorkerId))) + i-- + dAtA[i] = 0x12 + } + if m.Key != nil { + { + size, err := m.Key.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintScheduler(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *UpdateJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *UpdateJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *UpdateJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func encodeVarintScheduler(dAtA []byte, offset int, v uint64) int { + offset -= sovScheduler(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *JobKey) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovScheduler(uint64(l)) + } + if m.Epoch != 0 { + n += 1 + sovScheduler(uint64(m.Epoch)) + } + return n +} + +func (m *JobSpec) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovScheduler(uint64(l)) + } + if m.Partition != 0 { + n += 1 + sovScheduler(uint64(m.Partition)) + } + if m.StartOffset != 0 { + n += 1 + sovScheduler(uint64(m.StartOffset)) + } + if m.EndOffset != 0 { + n += 1 + sovScheduler(uint64(m.EndOffset)) + } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.CommitRecTs) + n += 1 + l + sovScheduler(uint64(l)) + if m.LastSeenOffset != 0 { + n += 1 + sovScheduler(uint64(m.LastSeenOffset)) + } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.LastBlockEndTs) + n += 1 + l + sovScheduler(uint64(l)) + return n +} + +func (m *AssignJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.WorkerId) + if l > 0 { + n += 1 + l + sovScheduler(uint64(l)) + } + return n +} + +func (m *AssignJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Key != nil { + l = m.Key.Size() + n += 1 + l + sovScheduler(uint64(l)) + } + if m.Spec != nil { + l = m.Spec.Size() + n += 1 + l + sovScheduler(uint64(l)) + } + return n +} + +func (m *UpdateJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Key != nil { + l = m.Key.Size() + n += 1 + l + sovScheduler(uint64(l)) + } + l = len(m.WorkerId) + if l > 0 { + n += 1 + l + sovScheduler(uint64(l)) + } + if m.Spec != nil { + l = m.Spec.Size() + n += 1 + l + sovScheduler(uint64(l)) + } + if m.Complete { + n += 2 + } + return n +} + +func (m *UpdateJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovScheduler(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozScheduler(x uint64) (n int) { + return sovScheduler(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *JobKey) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&JobKey{`, + `Id:` + fmt.Sprintf("%v", this.Id) + `,`, + `Epoch:` + fmt.Sprintf("%v", this.Epoch) + `,`, + `}`, + }, "") + return s +} +func (this *JobSpec) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&JobSpec{`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Partition:` + fmt.Sprintf("%v", this.Partition) + `,`, + `StartOffset:` + fmt.Sprintf("%v", this.StartOffset) + `,`, + `EndOffset:` + fmt.Sprintf("%v", this.EndOffset) + `,`, + `CommitRecTs:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.CommitRecTs), "Timestamp", "timestamppb.Timestamp", 1), `&`, ``, 1) + `,`, + `LastSeenOffset:` + fmt.Sprintf("%v", this.LastSeenOffset) + `,`, + `LastBlockEndTs:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.LastBlockEndTs), "Timestamp", "timestamppb.Timestamp", 1), `&`, ``, 1) + `,`, + `}`, + }, "") + return s +} +func (this *AssignJobRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AssignJobRequest{`, + `WorkerId:` + fmt.Sprintf("%v", this.WorkerId) + `,`, + `}`, + }, "") + return s +} +func (this *AssignJobResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AssignJobResponse{`, + `Key:` + strings.Replace(this.Key.String(), "JobKey", "JobKey", 1) + `,`, + `Spec:` + strings.Replace(this.Spec.String(), "JobSpec", "JobSpec", 1) + `,`, + `}`, + }, "") + return s +} +func (this *UpdateJobRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&UpdateJobRequest{`, + `Key:` + strings.Replace(this.Key.String(), "JobKey", "JobKey", 1) + `,`, + `WorkerId:` + fmt.Sprintf("%v", this.WorkerId) + `,`, + `Spec:` + strings.Replace(this.Spec.String(), "JobSpec", "JobSpec", 1) + `,`, + `Complete:` + fmt.Sprintf("%v", this.Complete) + `,`, + `}`, + }, "") + return s +} +func (this *UpdateJobResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&UpdateJobResponse{`, + `}`, + }, "") + return s +} +func valueToStringScheduler(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *JobKey) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JobKey: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JobKey: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Epoch", wireType) + } + m.Epoch = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Epoch |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JobSpec) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JobSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JobSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType) + } + m.Partition = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Partition |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartOffset", wireType) + } + m.StartOffset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartOffset |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndOffset", wireType) + } + m.EndOffset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndOffset |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CommitRecTs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.CommitRecTs, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastSeenOffset", wireType) + } + m.LastSeenOffset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastSeenOffset |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastBlockEndTs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.LastBlockEndTs, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AssignJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AssignJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AssignJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WorkerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AssignJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AssignJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AssignJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Key == nil { + m.Key = &JobKey{} + } + if err := m.Key.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Spec", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Spec == nil { + m.Spec = &JobSpec{} + } + if err := m.Spec.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UpdateJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UpdateJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UpdateJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Key == nil { + m.Key = &JobKey{} + } + if err := m.Key.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkerId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WorkerId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Spec", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthScheduler + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthScheduler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Spec == nil { + m.Spec = &JobSpec{} + } + if err := m.Spec.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Complete", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Complete = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UpdateJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowScheduler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UpdateJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UpdateJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipScheduler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthScheduler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipScheduler(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowScheduler + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowScheduler + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowScheduler + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthScheduler + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthScheduler + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowScheduler + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipScheduler(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthScheduler + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthScheduler = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowScheduler = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/blockbuilder/schedulerpb/scheduler.proto b/pkg/blockbuilder/schedulerpb/scheduler.proto new file mode 100644 index 00000000000..3d0f3a3d465 --- /dev/null +++ b/pkg/blockbuilder/schedulerpb/scheduler.proto @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/frontend/v1/frontendv1pb/frontend.proto +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Cortex Authors. + +syntax = "proto3"; + +package schedulerpb; + +import "gogoproto/gogo.proto"; +import "google/protobuf/timestamp.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message JobKey { + string id = 1; + int64 epoch = 2; +} + +message JobSpec { + string topic = 1; + int32 partition = 2; + int64 start_offset = 3; + int64 end_offset = 4; + google.protobuf.Timestamp commitRecTs = 5 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true]; + int64 lastSeenOffset = 6; + google.protobuf.Timestamp lastBlockEndTs = 7 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true]; +} + +service BlockBuilderScheduler { + rpc AssignJob(AssignJobRequest) returns (AssignJobResponse); + rpc UpdateJob(UpdateJobRequest) returns (UpdateJobResponse); +} + +message AssignJobRequest { + string worker_id = 1; +} + +message AssignJobResponse { + JobKey key = 1; + JobSpec spec = 2; +} + +message UpdateJobRequest { + JobKey key = 1; + string worker_id = 2; + JobSpec spec = 3; + bool complete = 4; +} + +message UpdateJobResponse {} diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 697501af98f..39868511b5c 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -1105,6 +1105,7 @@ func (t *Mimir) initBlockBuilderScheduler() (services.Service, error) { return nil, errors.Wrap(err, "block-builder-scheduler init") } t.BlockBuilderScheduler = s + t.API.RegisterBlockBuilderScheduler(s) return s, nil }