Skip to content

Commit

Permalink
Expose partition as a param to produceSamples
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Dec 21, 2024
1 parent c85ce3a commit 1f04656
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestBlockBuilder_StartWithExistingCommit(t *testing.T) {
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute)
for kafkaRecTime.Before(cycleEndStartup) {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) {
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute)
for kafkaRecTime.Before(cycleEndStartup) {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit(t *testing.T) {
kafkaRecTime := time.Now().Truncate(cfg.ConsumeInterval).Add(-7 * time.Hour).Add(29 * time.Minute)
for range 3 {
kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
}

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) {
if firstRecordTime.IsZero() {
firstRecordTime = kafkaRecTime
}
produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
}

fallback := time.Now().Add(-cfg.LookbackOnNoCommit)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection(t *testing.T) {
kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-5 * time.Hour).Add(29 * time.Minute)
lastKafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeInterval)
for kafkaRecTime.Before(lastKafkaRecTime) {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi
firstKafkaRecTime := kafkaRecTime.Add(-time.Minute)
lastKafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeInterval)
for kafkaRecTime.Before(lastKafkaRecTime) {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
Expand Down Expand Up @@ -601,7 +601,7 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) {
// Producing some records for multiple tenants
for range 10 {
for _, tenant := range tenants {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenant, kafkaRecTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenant, kafkaRecTime)
producedPerTenantSamples[tenant] = append(producedPerTenantSamples[tenant], samples...)
}

Expand Down Expand Up @@ -683,7 +683,7 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) {
// Producing some records for multiple tenants
for range 10 {
for _, tenant := range tenants {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenant, kafkaRecTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenant, kafkaRecTime)
producedPerTenantSamples[tenant] = append(producedPerTenantSamples[tenant], samples...)
}

Expand Down Expand Up @@ -740,7 +740,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
// Simple first record with all samples in the block.
kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-2 * cfg.ConsumeIntervalBuffer)

samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, kafkaRecTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, kafkaRecTime)
require.Len(t, samples, 1)
expSamplesPhase1 = append(expSamplesPhase1, samples...)
}
Expand All @@ -753,7 +753,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer)
lastSeenRecTime = kafkaRecTime

samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime, kafkaRecTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime, kafkaRecTime)
require.Len(t, samples, 2)
expSamplesPhase1 = append(expSamplesPhase1, samples[0])
expSamplesPhase2 = append(expSamplesPhase2, samples[1])
Expand All @@ -766,7 +766,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
kafkaRecTime := cycleEndStartup.Add(cfg.ConsumeInterval - time.Minute)
inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer + time.Minute)

samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime)
require.Len(t, samples, 1)
expSamplesPhase2 = append(expSamplesPhase2, samples[0])
}
Expand All @@ -782,7 +782,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
kafkaRecTime := lastSeenRecTime.Add(-2 * time.Minute)
inBlockTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-cfg.ConsumeIntervalBuffer + 2*time.Minute)

samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inBlockTime)
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, tenantID, inBlockTime)
require.Len(t, samples, 1)
expSamplesPhase2 = append(expSamplesPhase2, samples[0])
}
Expand Down Expand Up @@ -836,7 +836,7 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {
var producedSamples []mimirpb.Sample
kafkaRecTime := cycleEndStartup.Truncate(cfg.ConsumeInterval).Add(-1 * time.Hour).Add(29 * time.Minute)
for kafkaRecTime.Before(cycleEndStartup) {
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
samples := produceSamples(ctx, t, kafkaClient, 0, kafkaRecTime, "1", kafkaRecTime.Add(-time.Minute))
producedSamples = append(producedSamples, samples...)

kafkaRecTime = kafkaRecTime.Add(cfg.ConsumeInterval / 2)
Expand Down Expand Up @@ -883,13 +883,13 @@ func TestBlockBuilder_RetryOnTransientErrors(t *testing.T) {
)
}

func produceSamples(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, ts time.Time, tenantID string, sampleTs ...time.Time) []mimirpb.Sample {
func produceSamples(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, partition int32, ts time.Time, tenantID string, sampleTs ...time.Time) []mimirpb.Sample {
var samples []mimirpb.Sample
for _, st := range sampleTs {
samples = append(samples, floatSample(st.UnixMilli(), 1)...)
}
val := createWriteRequest(t, tenantID, samples, nil)
produceRecords(ctx, t, kafkaClient, ts, tenantID, testTopic, 0, val)
produceRecords(ctx, t, kafkaClient, ts, tenantID, testTopic, partition, val)
return samples
}

Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestPullMode(t *testing.T) {
var expSamples []mimirpb.Sample

for i := range 5 {
expSamples = append(expSamples, produceSamples(ctx, t, kafkaClient, startTime, "1",
expSamples = append(expSamples, produceSamples(ctx, t, kafkaClient, 0, startTime, "1",
startTime.Add(time.Duration(i)*time.Hour),
)...)
}
Expand Down

0 comments on commit 1f04656

Please sign in to comment.