Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement partition compaction planner #6469

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ compactor:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long shuffle sharding planner would wait before running planning code.
# CLI flag: -compactor.sharding-planner-delay
[sharding_planner_delay: <duration> | default = 10s]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2330,6 +2330,10 @@ sharding_ring:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long shuffle sharding planner would wait before running planning code.
# CLI flag: -compactor.sharding-planner-delay
[sharding_planner_delay: <duration> | default = 10s]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
Expand Down
11 changes: 7 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var (
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger)
return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
Expand Down Expand Up @@ -234,9 +234,10 @@ type Config struct {
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"`

// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`
Expand Down Expand Up @@ -304,6 +305,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")

f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code.")
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type compactorMetrics struct {
remainingPlannedCompactions *prometheus.GaugeVec
compactionErrorsCount *prometheus.CounterVec
partitionCount *prometheus.GaugeVec
compactionsNotPlanned *prometheus.CounterVec
}

const (
Expand Down Expand Up @@ -174,6 +175,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_group_partition_count",
Help: "Number of partitions for each compaction group.",
}, compactionLabels)
m.compactionsNotPlanned = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_group_compactions_not_planned",
Help: "Total number of group compaction not planned due to non-critical error (ie. group is currently visited by other compactor).",
}, compactionLabels)

return &m
}
Expand Down Expand Up @@ -225,6 +230,7 @@ func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...str
m.compactionFailures.WithLabelValues(labelValue...)
m.verticalCompactions.WithLabelValues(labelValue...)
m.partitionCount.WithLabelValues(labelValue...)
m.compactionsNotPlanned.WithLabelValues(labelValue...)
}

func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
Expand All @@ -236,4 +242,5 @@ func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
m.compactionFailures.DeleteLabelValues(userID)
m.verticalCompactions.DeleteLabelValues(userID)
m.partitionCount.DeleteLabelValues(userID)
m.compactionsNotPlanned.DeleteLabelValues(userID)
}
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_group_partition_count{user="aaa"} 511060
cortex_compactor_group_partition_count{user="bbb"} 522170
cortex_compactor_group_partition_count{user="ccc"} 533280
# HELP cortex_compactor_group_compactions_not_planned Total number of group compaction not planned due to non-critical error (ie. group is currently visited by other compactor).
# TYPE cortex_compactor_group_compactions_not_planned counter
cortex_compactor_group_compactions_not_planned{user="aaa"} 544390
cortex_compactor_group_compactions_not_planned{user="bbb"} 555500
cortex_compactor_group_compactions_not_planned{user="ccc"} 566610
`))
require.NoError(t, err)

Expand Down Expand Up @@ -191,4 +196,7 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.partitionCount.WithLabelValues("aaa").Add(46 * base)
cm.partitionCount.WithLabelValues("bbb").Add(47 * base)
cm.partitionCount.WithLabelValues("ccc").Add(48 * base)
cm.compactionsNotPlanned.WithLabelValues("aaa").Add(49 * base)
cm.compactionsNotPlanned.WithLabelValues("bbb").Add(50 * base)
cm.compactionsNotPlanned.WithLabelValues("ccc").Add(51 * base)
}
130 changes: 123 additions & 7 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,146 @@ package compactor

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

var (
plannerCompletedPartitionError = errors.New("got completed partition")
plannerVisitedPartitionError = errors.New("got partition visited by other compactor")
)

type PartitionCompactionPlanner struct {
ctx context.Context
bkt objstore.InstrumentedBucket
logger log.Logger
ctx context.Context
bkt objstore.InstrumentedBucket
logger log.Logger
ranges []int64
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
ringLifecyclerID string
userID string
plannerDelay time.Duration
partitionVisitMarkerTimeout time.Duration
partitionVisitMarkerFileUpdateInterval time.Duration
compactorMetrics *compactorMetrics
}

func NewPartitionCompactionPlanner(
ctx context.Context,
bkt objstore.InstrumentedBucket,
logger log.Logger,
ranges []int64,
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
ringLifecyclerID string,
userID string,
plannerDelay time.Duration,
partitionVisitMarkerTimeout time.Duration,
partitionVisitMarkerFileUpdateInterval time.Duration,
compactorMetrics *compactorMetrics,
) *PartitionCompactionPlanner {
return &PartitionCompactionPlanner{
ctx: ctx,
bkt: bkt,
logger: logger,
ctx: ctx,
bkt: bkt,
logger: logger,
ranges: ranges,
noCompBlocksFunc: noCompBlocksFunc,
ringLifecyclerID: ringLifecyclerID,
userID: userID,
plannerDelay: plannerDelay,
partitionVisitMarkerTimeout: partitionVisitMarkerTimeout,
partitionVisitMarkerFileUpdateInterval: partitionVisitMarkerFileUpdateInterval,
compactorMetrics: compactorMetrics,
}
}

func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
panic("PartitionCompactionPlanner not implemented")
cortexMetaExtensions, err := tsdb.ConvertToCortexMetaExtensions(extensions)
if err != nil {
return nil, err
}
if cortexMetaExtensions == nil {
return nil, fmt.Errorf("cortexMetaExtensions cannot be nil")
}
return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan)
}

func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) {
partitionInfo := cortexMetaExtensions.PartitionInfo
if partitionInfo == nil {
return nil, fmt.Errorf("partitionInfo cannot be nil")
}
partitionID := partitionInfo.PartitionID
partitionedGroupID := partitionInfo.PartitionedGroupID

// This delay would prevent double compaction when two compactors
// claimed same partition in grouper at same time.
time.Sleep(p.plannerDelay)

visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
existingPartitionVisitMarker := &partitionVisitMarker{}
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)
visitMarkerExists := true
if err != nil {
if errors.Is(err, errorVisitMarkerNotFound) {
visitMarkerExists = false
} else {
return nil, fmt.Errorf("unable to get visit marker file for partition with partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
}
}
if visitMarkerExists {
if existingPartitionVisitMarker.GetStatus() == Completed {
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
level.Warn(p.logger).Log("msg", "partition is in completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
return nil, plannerCompletedPartitionError
}
if !existingPartitionVisitMarker.IsPendingByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
level.Warn(p.logger).Log("msg", "partition is not visited by current compactor", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
return nil, plannerVisitedPartitionError
}
}

// Ensure all blocks fits within the largest range. This is a double check
// to ensure there's no bug in the previous blocks grouping, given this Plan()
// is just a pass-through.
// Modified from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63
largestRange := p.ranges[len(p.ranges)-1]
rangeStart := getRangeStart(metasByMinTime[0], largestRange)
rangeEnd := rangeStart + largestRange
noCompactMarked := p.noCompBlocksFunc()
resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime))

for _, b := range metasByMinTime {
if b.ULID == DUMMY_BLOCK_ID {
continue
}
blockID := b.ULID.String()
if _, excluded := noCompactMarked[b.ULID]; excluded {
continue
}

if b.MinTime < rangeStart || b.MaxTime > rangeEnd {
return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd)
}

resultMetas = append(resultMetas, b)
}

if len(resultMetas) < 1 {
level.Info(p.logger).Log("msg", "result meta size is empty", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "size", len(resultMetas))
return nil, nil
}

go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false)

return resultMetas, nil
}
Loading
Loading