Skip to content

Commit

Permalink
Fix pull-mode test.
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Dec 20, 2024
1 parent d42f7bb commit 4281f8c
Showing 1 changed file with 16 additions and 30 deletions.
46 changes: 16 additions & 30 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {

func produceSamples(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, ts time.Time, tenantID string, sampleTs ...time.Time) []mimirpb.Sample {
var samples []mimirpb.Sample
for _, ts := range sampleTs {
samples = append(samples, floatSample(ts.UnixMilli(), 1)...)
for _, st := range sampleTs {
samples = append(samples, floatSample(st.UnixMilli(), 1)...)
}
val := createWriteRequest(t, tenantID, samples, nil)
produceRecords(ctx, t, kafkaClient, ts, tenantID, testTopic, 0, val)
Expand Down Expand Up @@ -737,7 +737,7 @@ func TestPullMode(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

kafkaCluster, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)
_, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)
kafkaClient := mustKafkaClient(t, kafkaAddr)
kafkaClient.AddConsumeTopics(testTopic)

Expand All @@ -748,25 +748,14 @@ func TestPullMode(t *testing.T) {
MaxUpdateAge: 1 * time.Second,
}

startTime := time.Now().Add(-10 * time.Minute)

expSamples := produceSamples(ctx, t, kafkaClient, time.Now(), "1",
startTime.Add(1*time.Second),
startTime.Add(2*time.Second),
startTime.Add(3*time.Second),
startTime.Add(4*time.Second),
startTime.Add(5*time.Second),
)
startTime := time.Now().Add(-2 * time.Hour)
var expSamples []mimirpb.Sample

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
kafkaCluster.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})

// Wait for the commit.
require.Eventually(t, func() bool { return kafkaCommits.Load() > 0 }, 5*time.Second, 100*time.Millisecond, "expected kafka commits")
for i := range 5 {
expSamples = append(expSamples, produceSamples(ctx, t, kafkaClient, startTime, "1",
startTime.Add(time.Duration(i)*time.Minute),
)...)
}

scheduler := &mockSchedulerClient{}
scheduler.addJob(
Expand All @@ -778,10 +767,12 @@ func TestPullMode(t *testing.T) {
Topic: testTopic,
Partition: 0,
StartOffset: 0,
EndOffset: 4,
EndOffset: 5,
CommitRecTs: startTime,
LastSeenOffset: 5,
LastBlockEndTs: startTime.Add(5 * time.Second),
LastSeenOffset: 0,
LastBlockEndTs: startTime,
CycleEndTs: startTime.Add(5 * time.Minute),
CycleEndOffset: 5,
},
)

Expand All @@ -793,14 +784,9 @@ func TestPullMode(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(ctx, bb))
})

l, lerr := bb.getLagForPartition(ctx, 0)
require.NoError(t, lerr)
println("aaaa", l.Start.Offset, l.End.Offset, l.Commit.At)

require.Eventually(t, func() bool {
runCalls, getJobCalls, completeJobCalls := scheduler.counts()
println(runCalls, getJobCalls, completeJobCalls)
return runCalls >= 1 && getJobCalls >= 1 && completeJobCalls >= 1
return runCalls > 0 && getJobCalls > 0 && completeJobCalls > 0
}, 5*time.Second, 100*time.Millisecond, "expected scheduler interaction")

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
Expand Down

0 comments on commit 4281f8c

Please sign in to comment.