Skip to content

Commit

Permalink
Implement partition compaction grouper (#6172)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexqyle authored Dec 31, 2024
1 parent d41ad75 commit 4953086
Show file tree
Hide file tree
Showing 17 changed files with 4,715 additions and 54 deletions.
18 changes: 9 additions & 9 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,18 @@ compactor:
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]
# How long compaction visit marker file should be considered as expired and
# able to be picked up by compactor again.
# CLI flag: -compactor.compaction-visit-marker-timeout
[compaction_visit_marker_timeout: <duration> | default = 10m]

# How frequently block visit marker file should be updated duration
# How frequently compaction visit marker file should be updated duration
# compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
# CLI flag: -compactor.compaction-visit-marker-file-update-interval
[compaction_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able
# to be picked up by cleaner again. The value should be smaller than
Expand Down
27 changes: 18 additions & 9 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2331,17 +2331,18 @@ sharding_ring:
[wait_active_instance_timeout: <duration> | default = 10m]
# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]
# How long compaction visit marker file should be considered as expired and able
# to be picked up by compactor again.
# CLI flag: -compactor.compaction-visit-marker-timeout
[compaction_visit_marker_timeout: <duration> | default = 10m]
# How frequently block visit marker file should be updated duration compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
# How frequently compaction visit marker file should be updated duration
# compaction.
# CLI flag: -compactor.compaction-visit-marker-file-update-interval
[compaction_visit_marker_file_update_interval: <duration> | default = 1m]
# How long cleaner visit marker file should be considered as expired and able to
# be picked up by cleaner again. The value should be smaller than
Expand Down Expand Up @@ -3592,6 +3593,14 @@ query_rejection:
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# Index size limit in bytes for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-index-size-bytes
[compactor_partition_index_size_bytes: <int> | default = 68719476736]

# Time series count limit for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-series-count
[compactor_partition_series_count: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
73 changes: 54 additions & 19 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
errInvalidCompactionStrategy = errors.New("invalid compaction strategy")
errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter, _ int) compact.Grouper {
return compact.NewDefaultGrouperWithMetrics(
logger,
bkt,
Expand All @@ -79,9 +79,31 @@ var (
cfg.BlocksFetchConcurrency)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ingestionReplicationFactor int) compact.Grouper {
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionGrouper(ctx, logger, bkt)
return NewPartitionCompactionGrouper(
ctx,
logger,
bkt,
cfg.AcceptMalformedIndex,
true, // Enable vertical compaction
blocksMarkedForNoCompaction,
syncerMetrics,
compactorMetrics,
metadata.NoneFunc,
cfg,
ring,
ringLifecycle.Addr,
ringLifecycle.ID,
limits,
userID,
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
true,
cfg.CompactionVisitMarkerTimeout,
noCompactionMarkFilter.NoCompactMarkedBlocks,
ingestionReplicationFactor)
} else {
return NewShuffleShardingGrouper(
ctx,
Expand All @@ -102,7 +124,7 @@ var (
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
cfg.CompactionVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactionMarkFilter.NoCompactMarkedBlocks)
Expand Down Expand Up @@ -133,7 +155,7 @@ var (
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
}
return compactor, plannerFactory, nil
Expand All @@ -156,6 +178,7 @@ type BlocksGrouperFactory func(
limit Limits,
userID string,
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
ingestionReplicationFactor int,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
Expand All @@ -182,6 +205,8 @@ type PlannerFactory func(
// Limits defines limits used by the Compactor.
type Limits interface {
CompactorTenantShardSize(userID string) int
CompactorPartitionIndexSizeBytes(userID string) int64
CompactorPartitionSeriesCount(userID string) int64
}

// Config holds the Compactor config.
Expand Down Expand Up @@ -213,8 +238,8 @@ type Config struct {
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// Compaction mode.
CompactionStrategy string `yaml:"compaction_mode"`
// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
Expand All @@ -226,9 +251,9 @@ type Config struct {
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

// Block visit marker file config
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
// Compaction visit marker file config
CompactionVisitMarkerTimeout time.Duration `yaml:"compaction_visit_marker_timeout"`
CompactionVisitMarkerFileUpdateInterval time.Duration `yaml:"compaction_visit_marker_file_update_interval"`

// Cleaner visit marker file config
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
Expand Down Expand Up @@ -258,7 +283,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-strategy", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
Expand All @@ -271,8 +296,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")

f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")
f.DurationVar(&cfg.CompactionVisitMarkerTimeout, "compactor.compaction-visit-marker-timeout", 10*time.Minute, "How long compaction visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.CompactionVisitMarkerFileUpdateInterval, "compactor.compaction-visit-marker-file-update-interval", 1*time.Minute, "How frequently compaction visit marker file should be updated duration compaction.")

f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval")
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")
Expand Down Expand Up @@ -305,7 +330,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
}
}

// Make sure a valid compaction mode is being used
// Make sure a valid compaction strategy is being used
if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) {
return errInvalidCompactionStrategy
}
Expand Down Expand Up @@ -379,10 +404,13 @@ type Compactor struct {

// Thanos compactor metrics per user
compactorMetrics *compactorMetrics

// Replication factor of ingester ring
ingestionReplicationFactor int
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, ingestionReplicationFactor int) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, nil, "compactor", logger, registerer)
}
Expand All @@ -405,7 +433,11 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
}
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
if ingestionReplicationFactor <= 0 {
ingestionReplicationFactor = 1
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits, ingestionReplicationFactor)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
Expand All @@ -422,6 +454,7 @@ func newCompactor(
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
limits *validation.Overrides,
ingestionReplicationFactor int,
) (*Compactor, error) {
var compactorMetrics *compactorMetrics
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
Expand Down Expand Up @@ -496,8 +529,9 @@ func newCompactor(
Name: "cortex_compactor_block_visit_marker_write_failed",
Help: "Number of block visit marker file failed to be written.",
}),
limits: limits,
compactorMetrics: compactorMetrics,
limits: limits,
compactorMetrics: compactorMetrics,
ingestionReplicationFactor: ingestionReplicationFactor,
}

if len(compactorCfg.EnabledTenants) > 0 {
Expand Down Expand Up @@ -761,6 +795,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
} else if markedForDeletion {
c.CompactionRunSkippedTenants.Inc()
c.compactorMetrics.deleteMetricsForDeletedTenant(userID)
level.Debug(c.logger).Log("msg", "skipping user because it is marked for deletion", "user", userID)
continue
}
Expand Down Expand Up @@ -929,7 +964,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter),
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor),
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics),
c.blocksCompactor,
c.compactDirForUser(userID),
Expand Down
30 changes: 30 additions & 0 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type compactorMetrics struct {
verticalCompactions *prometheus.CounterVec
remainingPlannedCompactions *prometheus.GaugeVec
compactionErrorsCount *prometheus.CounterVec
partitionCount *prometheus.GaugeVec
}

const (
Expand Down Expand Up @@ -169,6 +170,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_compaction_error_total",
Help: "Total number of errors from compactions.",
}, append(commonLabels, compactionErrorTypesLabelName))
m.partitionCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_compactor_group_partition_count",
Help: "Number of partitions for each compaction group.",
}, compactionLabels)

return &m
}
Expand Down Expand Up @@ -207,3 +212,28 @@ func (m *compactorMetrics) getCommonLabelValues(userID string) []string {
}
return labelValues
}

func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...string) {
if len(m.compactionLabels) != len(commonLabels)+len(compactionLabels) {
return
}

m.compactions.WithLabelValues(labelValue...)
m.compactionPlanned.WithLabelValues(labelValue...)
m.compactionRunsStarted.WithLabelValues(labelValue...)
m.compactionRunsCompleted.WithLabelValues(labelValue...)
m.compactionFailures.WithLabelValues(labelValue...)
m.verticalCompactions.WithLabelValues(labelValue...)
m.partitionCount.WithLabelValues(labelValue...)
}

func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
m.syncerBlocksMarkedForDeletion.DeleteLabelValues(userID)
m.compactions.DeleteLabelValues(userID)
m.compactionPlanned.DeleteLabelValues(userID)
m.compactionRunsStarted.DeleteLabelValues(userID)
m.compactionRunsCompleted.DeleteLabelValues(userID)
m.compactionFailures.DeleteLabelValues(userID)
m.verticalCompactions.DeleteLabelValues(userID)
m.partitionCount.DeleteLabelValues(userID)
}
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_compaction_error_total{type="unauthorized",user="aaa"} 477730
cortex_compactor_compaction_error_total{type="unauthorized",user="bbb"} 488840
cortex_compactor_compaction_error_total{type="unauthorized",user="ccc"} 499950
# HELP cortex_compactor_group_partition_count Number of partitions for each compaction group.
# TYPE cortex_compactor_group_partition_count gauge
cortex_compactor_group_partition_count{user="aaa"} 511060
cortex_compactor_group_partition_count{user="bbb"} 522170
cortex_compactor_group_partition_count{user="ccc"} 533280
`))
require.NoError(t, err)

Expand Down Expand Up @@ -183,4 +188,7 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.compactionErrorsCount.WithLabelValues("aaa", unauthorizedError).Add(43 * base)
cm.compactionErrorsCount.WithLabelValues("bbb", unauthorizedError).Add(44 * base)
cm.compactionErrorsCount.WithLabelValues("ccc", unauthorizedError).Add(45 * base)
cm.partitionCount.WithLabelValues("aaa").Add(46 * base)
cm.partitionCount.WithLabelValues("bbb").Add(47 * base)
cm.partitionCount.WithLabelValues("ccc").Add(48 * base)
}
Loading

0 comments on commit 4953086

Please sign in to comment.