Skip to content

Commit

Permalink
Add cardinality logs and exclude pattern capability (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 27, 2022
1 parent a81d21b commit 90d0297
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 5 deletions.
9 changes: 9 additions & 0 deletions processor/signozspanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type Dimension struct {
Default *string `mapstructure:"default"`
}

// ExcludePattern defines the pattern to exclude from the metrics.
type ExcludePattern struct {
Name string `mapstructure:"name"`
Pattern string `mapstructure:"pattern"`
}

// Config defines the configuration options for spanmetricsprocessor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Expand All @@ -62,6 +68,9 @@ type Config struct {
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []Dimension `mapstructure:"dimensions"`

// ExcludePatterns defines the list of patterns to exclude from the metrics.
ExcludePatterns []ExcludePattern `mapstructure:"exclude_patterns"`

// DimensionsCacheSize defines the size of cache for storing Dimensions, which helps to avoid cache memory growing
// indefinitely over the lifetime of the collector.
// Optional. See defaultDimensionsCacheSize in processor.go for the default value.
Expand Down
77 changes: 77 additions & 0 deletions processor/signozspanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"net/url"
"regexp"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -102,6 +103,9 @@ type processorImp struct {
callMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
dbCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
externalCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]

attrsCardinality map[string]map[string]struct{}
excludePatternRegex map[string]*regexp.Regexp
}

type dimension struct {
Expand Down Expand Up @@ -181,6 +185,11 @@ func newProcessor(logger *zap.Logger, config component.ProcessorConfig, nextCons
return nil, err
}

excludePatternRegex := make(map[string]*regexp.Regexp)
for _, pattern := range pConfig.ExcludePatterns {
excludePatternRegex[pattern.Name] = regexp.MustCompile(pattern.Pattern)
}

return &processorImp{
logger: logger,
config: *pConfig,
Expand All @@ -203,6 +212,8 @@ func newProcessor(logger *zap.Logger, config component.ProcessorConfig, nextCons
callMetricKeyToDimensions: callMetricKeyToDimensionsCache,
dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache,
externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache,
attrsCardinality: make(map[string]map[string]struct{}),
excludePatternRegex: excludePatternRegex,
}, nil
}

Expand Down Expand Up @@ -249,6 +260,41 @@ func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error {
return nil
}

func (p *processorImp) shouldSkip(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) bool {
for key, pattern := range p.excludePatternRegex {
if key == serviceNameKey && pattern.MatchString(serviceName) {
return true
}
if key == operationKey && pattern.MatchString(span.Name()) {
return true
}
if key == spanKindKey && pattern.MatchString(span.Kind().String()) {
return true
}
if key == statusCodeKey && pattern.MatchString(span.Status().Code().String()) {
return true
}

matched := false
span.Attributes().Range(func(k string, v pcommon.Value) bool {
if key == k && pattern.MatchString(v.AsString()) {
matched = true
}
return true
})
resourceAttrs.Range(func(k string, v pcommon.Value) bool {
if key == k && pattern.MatchString(v.AsString()) {
matched = true
}
return true
})
if matched {
return true
}
}
return false
}

// Start implements the component.Component interface.
func (p *processorImp) Start(ctx context.Context, host component.Host) error {
p.logger.Info("Starting signozspanmetricsprocessor")
Expand Down Expand Up @@ -317,6 +363,7 @@ func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces
p.lock.Unlock()

if err != nil {
p.logCardinalityInfo()
return err
}

Expand Down Expand Up @@ -361,6 +408,17 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) {
return m, nil
}

func (p *processorImp) logCardinalityInfo() {
for k, v := range p.attrsCardinality {
values := make([]string, 0, len(v))
for key := range v {
values = append(values, key)
}
p.logger.Info("Attribute cardinality", zap.String("attribute", k), zap.Int("cardinality", len(v)))
p.logger.Debug("Attribute values", zap.Strings("values", values))
}
}

// collectLatencyMetrics collects the raw latency metrics, writing the data
// into the given instrumentation library metrics.
func (p *processorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error {
Expand Down Expand Up @@ -619,6 +677,11 @@ func getRemoteAddress(span ptrace.Span) (string, bool) {
}

func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.Span, resourceAttr pcommon.Map) {

if p.shouldSkip(serviceName, span, resourceAttr) {
p.logger.Debug("Skipping span", zap.String("span", span.Name()), zap.String("service", serviceName))
return
}
// Protect against end timestamps before start timestamps. Assume 0 duration.
latencyInMilliseconds := float64(0)
startTime := span.StartTimestamp()
Expand Down Expand Up @@ -806,6 +869,13 @@ func (p *processorImp) buildDimensionKVs(serviceName string, span ptrace.Span, o
v.CopyTo(dims.PutEmpty(resourcePrefix + d.name))
}
}
dims.Range(func(k string, v pcommon.Value) bool {
if _, exists := p.attrsCardinality[k]; !exists {
p.attrsCardinality[k] = make(map[string]struct{})
}
p.attrsCardinality[k][v.AsString()] = struct{}{}
return true
})
return dims
}

Expand All @@ -827,6 +897,13 @@ func (p *processorImp) buildCustomDimensionKVs(serviceName string, span ptrace.S
v.CopyTo(dims.PutEmpty(resourcePrefix + d.name))
}
}
dims.Range(func(k string, v pcommon.Value) bool {
if _, exists := p.attrsCardinality[k]; !exists {
p.attrsCardinality[k] = make(map[string]struct{})
}
p.attrsCardinality[k][v.AsString()] = struct{}{}
return true
})
return dims
}

Expand Down
56 changes: 51 additions & 5 deletions processor/signozspanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"bytes"
"context"
"fmt"
"regexp"
"strings"
"testing"
"time"

Expand All @@ -36,6 +38,7 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/metadata"

"github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -212,7 +215,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
tcon := &mocks.TracesConsumer{}
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(tc.consumeTracesErr)

p := newProcessorImp(mexp, tcon, nil, cumulative, logger)
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, []ExcludePattern{})

traces := buildSampleTrace()

Expand Down Expand Up @@ -298,7 +301,7 @@ func TestProcessorConsumeTraces(t *testing.T) {
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)

defaultNullValue := pcommon.NewValueStr("defaultNullValue")
p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t))
p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), []ExcludePattern{})

for _, traces := range tc.traces {
// Test
Expand All @@ -320,7 +323,7 @@ func TestMetricKeyCache(t *testing.T) {
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)

defaultNullValue := pcommon.NewValueStr("defaultNullValue")
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t))
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), []ExcludePattern{})
traces := buildSampleTrace()

// Test
Expand All @@ -347,6 +350,40 @@ func TestMetricKeyCache(t *testing.T) {
}, 10*time.Second, time.Millisecond*100)
}

func TestExcludePatternSkips(t *testing.T) {
mexp := &mocks.MetricsExporter{}
tcon := &mocks.TracesConsumer{}

mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil)
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)

observedZapCore, observedLogs := observer.New(zap.DebugLevel)
observedLogger := zap.New(observedZapCore)

defaultNullValue := pcommon.NewValueStr("defaultNullValue")
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, observedLogger, []ExcludePattern{
{
Name: "operation",
Pattern: "p*",
},
})

traces := buildSampleTrace()

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.ConsumeTraces(ctx, traces)

assert.NoError(t, err)
found := false
for _, log := range observedLogs.All() {
if strings.Contains(log.Message, "Skipping span") {
found = true
}
}
assert.True(t, found)
}

func BenchmarkProcessorConsumeTraces(b *testing.B) {
// Prepare
mexp := &mocks.MetricsExporter{}
Expand All @@ -356,7 +393,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) {
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)

defaultNullValue := pcommon.NewValueStr("defaultNullValue")
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b))
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), []ExcludePattern{})

traces := buildSampleTrace()

Expand All @@ -367,7 +404,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) {
}
}

func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger) *processorImp {
func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, excludePatterns []ExcludePattern) *processorImp {
defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal")
// use size 2 for LRU cache for testing purpose
metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize)
Expand Down Expand Up @@ -413,6 +450,12 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de
if err != nil {
panic(err)
}

excludePatternRegex := make(map[string]*regexp.Regexp)
for _, pattern := range excludePatterns {
excludePatternRegex[pattern.Name] = regexp.MustCompile(pattern.Pattern)
}

return &processorImp{
logger: logger,
config: Config{AggregationTemporality: temporality},
Expand Down Expand Up @@ -455,6 +498,9 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de
callMetricKeyToDimensions: callMetricKeyToDimensions,
dbCallMetricKeyToDimensions: dbCallMetricKeyToDimensions,
externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensions,

attrsCardinality: make(map[string]map[string]struct{}),
excludePatternRegex: excludePatternRegex,
}
}

Expand Down

0 comments on commit 90d0297

Please sign in to comment.