From 90d02971ed39a2abfbd2a2c368be54bcda9a7e43 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 27 Dec 2022 23:00:56 +0530 Subject: [PATCH] Add cardinality logs and exclude pattern capability (#49) --- .../signozspanmetricsprocessor/config.go | 9 +++ .../signozspanmetricsprocessor/processor.go | 77 +++++++++++++++++++ .../processor_test.go | 56 ++++++++++++-- 3 files changed, 137 insertions(+), 5 deletions(-) diff --git a/processor/signozspanmetricsprocessor/config.go b/processor/signozspanmetricsprocessor/config.go index 01180724..a82b1ff4 100644 --- a/processor/signozspanmetricsprocessor/config.go +++ b/processor/signozspanmetricsprocessor/config.go @@ -42,6 +42,12 @@ type Dimension struct { Default *string `mapstructure:"default"` } +// ExcludePattern defines the pattern to exclude from the metrics. +type ExcludePattern struct { + Name string `mapstructure:"name"` + Pattern string `mapstructure:"pattern"` +} + // Config defines the configuration options for spanmetricsprocessor. type Config struct { config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct @@ -62,6 +68,9 @@ type Config struct { // https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go. Dimensions []Dimension `mapstructure:"dimensions"` + // ExcludePatterns defines the list of patterns to exclude from the metrics. + ExcludePatterns []ExcludePattern `mapstructure:"exclude_patterns"` + // DimensionsCacheSize defines the size of cache for storing Dimensions, which helps to avoid cache memory growing // indefinitely over the lifetime of the collector. // Optional. See defaultDimensionsCacheSize in processor.go for the default value. diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 34a68129..85795bc6 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "net/url" + "regexp" "sort" "strings" "sync" @@ -102,6 +103,9 @@ type processorImp struct { callMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] dbCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] externalCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] + + attrsCardinality map[string]map[string]struct{} + excludePatternRegex map[string]*regexp.Regexp } type dimension struct { @@ -181,6 +185,11 @@ func newProcessor(logger *zap.Logger, config component.ProcessorConfig, nextCons return nil, err } + excludePatternRegex := make(map[string]*regexp.Regexp) + for _, pattern := range pConfig.ExcludePatterns { + excludePatternRegex[pattern.Name] = regexp.MustCompile(pattern.Pattern) + } + return &processorImp{ logger: logger, config: *pConfig, @@ -203,6 +212,8 @@ func newProcessor(logger *zap.Logger, config component.ProcessorConfig, nextCons callMetricKeyToDimensions: callMetricKeyToDimensionsCache, dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache, externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache, + attrsCardinality: make(map[string]map[string]struct{}), + excludePatternRegex: excludePatternRegex, }, nil } @@ -249,6 +260,41 @@ func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error { return nil } +func (p *processorImp) shouldSkip(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) bool { + for key, pattern := range p.excludePatternRegex { + if key == serviceNameKey && pattern.MatchString(serviceName) { + return true + } + if key == operationKey && pattern.MatchString(span.Name()) { + return true + } + if key == spanKindKey && pattern.MatchString(span.Kind().String()) { + return true + } + if key == statusCodeKey && pattern.MatchString(span.Status().Code().String()) { + return true + } + + matched := false + span.Attributes().Range(func(k string, v pcommon.Value) bool { + if key == k && pattern.MatchString(v.AsString()) { + matched = true + } + return true + }) + resourceAttrs.Range(func(k string, v pcommon.Value) bool { + if key == k && pattern.MatchString(v.AsString()) { + matched = true + } + return true + }) + if matched { + return true + } + } + return false +} + // Start implements the component.Component interface. func (p *processorImp) Start(ctx context.Context, host component.Host) error { p.logger.Info("Starting signozspanmetricsprocessor") @@ -317,6 +363,7 @@ func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces p.lock.Unlock() if err != nil { + p.logCardinalityInfo() return err } @@ -361,6 +408,17 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) { return m, nil } +func (p *processorImp) logCardinalityInfo() { + for k, v := range p.attrsCardinality { + values := make([]string, 0, len(v)) + for key := range v { + values = append(values, key) + } + p.logger.Info("Attribute cardinality", zap.String("attribute", k), zap.Int("cardinality", len(v))) + p.logger.Debug("Attribute values", zap.Strings("values", values)) + } +} + // collectLatencyMetrics collects the raw latency metrics, writing the data // into the given instrumentation library metrics. func (p *processorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error { @@ -619,6 +677,11 @@ func getRemoteAddress(span ptrace.Span) (string, bool) { } func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.Span, resourceAttr pcommon.Map) { + + if p.shouldSkip(serviceName, span, resourceAttr) { + p.logger.Debug("Skipping span", zap.String("span", span.Name()), zap.String("service", serviceName)) + return + } // Protect against end timestamps before start timestamps. Assume 0 duration. latencyInMilliseconds := float64(0) startTime := span.StartTimestamp() @@ -806,6 +869,13 @@ func (p *processorImp) buildDimensionKVs(serviceName string, span ptrace.Span, o v.CopyTo(dims.PutEmpty(resourcePrefix + d.name)) } } + dims.Range(func(k string, v pcommon.Value) bool { + if _, exists := p.attrsCardinality[k]; !exists { + p.attrsCardinality[k] = make(map[string]struct{}) + } + p.attrsCardinality[k][v.AsString()] = struct{}{} + return true + }) return dims } @@ -827,6 +897,13 @@ func (p *processorImp) buildCustomDimensionKVs(serviceName string, span ptrace.S v.CopyTo(dims.PutEmpty(resourcePrefix + d.name)) } } + dims.Range(func(k string, v pcommon.Value) bool { + if _, exists := p.attrsCardinality[k]; !exists { + p.attrsCardinality[k] = make(map[string]struct{}) + } + p.attrsCardinality[k][v.AsString()] = struct{}{} + return true + }) return dims } diff --git a/processor/signozspanmetricsprocessor/processor_test.go b/processor/signozspanmetricsprocessor/processor_test.go index 9f899b02..00938044 100644 --- a/processor/signozspanmetricsprocessor/processor_test.go +++ b/processor/signozspanmetricsprocessor/processor_test.go @@ -18,6 +18,8 @@ import ( "bytes" "context" "fmt" + "regexp" + "strings" "testing" "time" @@ -36,6 +38,7 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/metadata" "github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor/internal/cache" @@ -212,7 +215,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { tcon := &mocks.TracesConsumer{} tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(tc.consumeTracesErr) - p := newProcessorImp(mexp, tcon, nil, cumulative, logger) + p := newProcessorImp(mexp, tcon, nil, cumulative, logger, []ExcludePattern{}) traces := buildSampleTrace() @@ -298,7 +301,7 @@ func TestProcessorConsumeTraces(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t)) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), []ExcludePattern{}) for _, traces := range tc.traces { // Test @@ -320,7 +323,7 @@ func TestMetricKeyCache(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t)) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), []ExcludePattern{}) traces := buildSampleTrace() // Test @@ -347,6 +350,40 @@ func TestMetricKeyCache(t *testing.T) { }, 10*time.Second, time.Millisecond*100) } +func TestExcludePatternSkips(t *testing.T) { + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + observedLogger := zap.New(observedZapCore) + + defaultNullValue := pcommon.NewValueStr("defaultNullValue") + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, observedLogger, []ExcludePattern{ + { + Name: "operation", + Pattern: "p*", + }, + }) + + traces := buildSampleTrace() + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + assert.NoError(t, err) + found := false + for _, log := range observedLogs.All() { + if strings.Contains(log.Message, "Skipping span") { + found = true + } + } + assert.True(t, found) +} + func BenchmarkProcessorConsumeTraces(b *testing.B) { // Prepare mexp := &mocks.MetricsExporter{} @@ -356,7 +393,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := pcommon.NewValueStr("defaultNullValue") - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b)) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), []ExcludePattern{}) traces := buildSampleTrace() @@ -367,7 +404,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { } } -func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger) *processorImp { +func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, excludePatterns []ExcludePattern) *processorImp { defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal") // use size 2 for LRU cache for testing purpose metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) @@ -413,6 +450,12 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de if err != nil { panic(err) } + + excludePatternRegex := make(map[string]*regexp.Regexp) + for _, pattern := range excludePatterns { + excludePatternRegex[pattern.Name] = regexp.MustCompile(pattern.Pattern) + } + return &processorImp{ logger: logger, config: Config{AggregationTemporality: temporality}, @@ -455,6 +498,9 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de callMetricKeyToDimensions: callMetricKeyToDimensions, dbCallMetricKeyToDimensions: dbCallMetricKeyToDimensions, externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensions, + + attrsCardinality: make(map[string]map[string]struct{}), + excludePatternRegex: excludePatternRegex, } }