From 1f04656f9496169a60aab6958e48fbd929388015 Mon Sep 17 00:00:00 2001 From: David Grant Date: Fri, 20 Dec 2024 16:24:14 -0800 Subject: [PATCH] Expose partition as a param to produceSamples --- pkg/blockbuilder/blockbuilder_test.go | 32 +++++++++++++-------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index 1e6641385e..71eb9d98e5 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -114,7 +114,7 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) { var producedSamples []mimirpb.Sample kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute) for kafkaRecTime.Before(cycleEndStartup) { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) producedSamples = append(producedSamples, samples...) kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) @@ -196,7 +196,7 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) { var producedSamples []mimirpb.Sample kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute) for kafkaRecTime.Before(cycleEndStartup) { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) producedSamples = append(producedSamples, samples...) kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) @@ -286,7 +286,7 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) { kafkaRecTime := time.Now().Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute) for range 3 { kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) - produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) } // Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle. @@ -351,7 +351,7 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) { if firstRecordTime.IsZero() { firstRecordTime = kafkaRecTime } - produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) } fallback := time.Now().Add(-cfg.LookbackOnNoCommit) @@ -421,7 +421,7 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) { kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-5 * time.Hour).Add(29 * time.Minute) lastKafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeInterval) for kafkaRecTime.Before(lastKafkaRecTime) { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) producedSamples = append(producedSamples, samples...) kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) @@ -498,7 +498,7 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi firstKafkaRecTime := kafkaRecTime.Add(-time.Minute) lastKafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeInterval) for kafkaRecTime.Before(lastKafkaRecTime) { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) producedSamples = append(producedSamples, samples...) kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) @@ -601,7 +601,7 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) { // Producing some records for multiple tenants for range 10 { for _, tenant := range tenants { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenant, kafkaRecTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenant, kafkaRecTime) producedPerTenantSamples[tenant] = append(producedPerTenantSamples[tenant], samples...) } @@ -683,7 +683,7 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) { // Producing some records for multiple tenants for range 10 { for _, tenant := range tenants { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenant, kafkaRecTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenant, kafkaRecTime) producedPerTenantSamples[tenant] = append(producedPerTenantSamples[tenant], samples...) } @@ -740,7 +740,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) { // Simple first record with all samples in the block. kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-2 * cfg.ConsumeIntervalBuffer) - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, kafkaRecTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, kafkaRecTime) require.Len(t, samples, 1) expSamplesPhase1 = append(expSamplesPhase1, samples...) } @@ -753,7 +753,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) { inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer) lastSeenRecTime = kafkaRecTime - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime, kafkaRecTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime, kafkaRecTime) require.Len(t, samples, 2) expSamplesPhase1 = append(expSamplesPhase1, samples[0]) expSamplesPhase2 = append(expSamplesPhase2, samples[1]) @@ -766,7 +766,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) { kafkaRecTime := cycleEndStartup.Add(cfg.ConsumeInterval - time.Minute) inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer + time.Minute) - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime) require.Len(t, samples, 1) expSamplesPhase2 = append(expSamplesPhase2, samples[0]) } @@ -782,7 +782,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) { kafkaRecTime := lastSeenRecTime.Add(-2 * time.Minute) inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer + 2*time.Minute) - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime) require.Len(t, samples, 1) expSamplesPhase2 = append(expSamplesPhase2, samples[0]) } @@ -836,7 +836,7 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) { var producedSamples []mimirpb.Sample kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-1 * time.Hour).Add(29 * time.Minute) for kafkaRecTime.Before(cycleEndStartup) { - samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) + samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute)) producedSamples = append(producedSamples, samples...) kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2) @@ -883,13 +883,13 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) { ) } -func produceSamples(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, ts time.Time, tenantID string, sampleTs ...time.Time) []mimirpb.Sample { +func produceSamples(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, partition int32, ts time.Time, tenantID string, sampleTs ...time.Time) []mimirpb.Sample { var samples []mimirpb.Sample for _, st := range sampleTs { samples = append(samples, floatSample(st.UnixMilli(), 1)...) } val := createWriteRequest(t, tenantID, samples, nil) - produceRecords(ctx, t, kafkaClient, ts, tenantID, testTopic, 0, val) + produceRecords(ctx, t, kafkaClient, ts, tenantID, testTopic, partition, val) return samples } @@ -1099,7 +1099,7 @@ func TestPullMode(t *testing.T) { var expSamples []mimirpb.Sample for i := range 5 { - expSamples = append(expSamples, produceSamples(ctx, t, kafkaClient, startTime, "1", + expSamples = append(expSamples, produceSamples(ctx, t, kafkaClient, 0, startTime, "1", startTime.Add(time.Duration(i)*time.Hour), )...) }