Skip to content

Commit

Permalink
[mq] working branch - merge 9d011bd on top of main at 272716f
Browse files Browse the repository at this point in the history
{"baseBranch":"main","baseCommit":"272716faa23e812a822c030ab28437f9a49957fa","createdAt":"2024-12-24T15:51:44.111904Z","headSha":"9d011bd8b0a11979d5928d6faec031068317ca92","id":"f445679c-b4d8-445f-8ed4-7e25a4cb6599","priority":"200","pullRequestNumber":"32343","queuedAt":"2024-12-25T00:21:22.771277Z","status":"STATUS_QUEUED"}
  • Loading branch information
dd-mergequeue[bot] authored Dec 25, 2024
2 parents c9995cc + 9d011bd commit ab28731
Show file tree
Hide file tree
Showing 5 changed files with 793 additions and 107 deletions.
67 changes: 39 additions & 28 deletions comp/core/agenttelemetry/impl/agenttelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,15 @@ func (a *atel) aggregateMetricTags(mCfg *MetricConfig, mt dto.MetricType, ms []*

// create a key from the tags (and drop not specified in the configuration tags)
var specTags = make([]*dto.LabelPair, 0, len(origTags))
var sb strings.Builder
for _, t := range tags {
if _, ok := mCfg.aggregateTagsMap[t.GetName()]; ok {
specTags = append(specTags, t)
tagsKey += makeLabelPairKey(t)
sb.WriteString(makeLabelPairKey(t))
}
}
tagsKey = sb.String()

if mCfg.AggregateTotal {
aggregateMetric(mt, totalm, m)
}
Expand Down Expand Up @@ -255,65 +258,73 @@ func (a *atel) aggregateMetricTags(mCfg *MetricConfig, mt dto.MetricType, ms []*
return maps.Values(amMap)
}

// Using Prometheus terminology. Metrics name or in "Prom" MetricFamily is technically a Datadog metrics.
// dto.Metric are a metric values for each timeseries (tag/value combination).
func buildKeysForMetricsPreviousValues(mt dto.MetricType, metricName string, metrics []*dto.Metric) []string {
keyNames := make([]string, 0, len(metrics))
for _, m := range metrics {
var keyName string
tags := m.GetLabel()
if len(tags) == 0 {
// start with the metric name
// For "tagless" MetricFamily, len(metrics) will be 1, with single iteration and m.GetLabel()
// will be nil. Accordingly, to form a key for that metric its name alone is sufficient.
keyName = metricName
} else {
// Sort tags to stability of the key
sortedTags := cloneLabelsSorted(tags)
var builder strings.Builder

// start with the metric name plus the tags
builder.WriteString(metricName)
for _, tag := range sortedTags {
builder.WriteString(makeLabelPairKey(tag))
}
keyName = builder.String()
//If the metric has tags, len(metrics) will be equal to the number of metric's timeseries.
// Each timeseries or "m" on each iteration in this code, will contain a set of unique
// tagset (as m.GetLabel()). Accordingly, each timeseries should be represented by a unique
// and stable (reproducible) key formed by tagset key names and values.
keyName = fmt.Sprintf("%s%s:", metricName, convertLabelsToKey(tags))
}

if mt == dto.MetricType_HISTOGRAM {
// add bucket names to the key
// On each iteration for metrics without tags (only 1 iteration) or with tags (iteration per
// timeseries). If the metric is a HISTOGRAM, each timeseries bucket individually plus
// implicit "+Inf" bucket. For example, for 3 timeseries with 4-bucket histogram, we will
// track 15 values using 15 keys (3x(4+1)).
for _, bucket := range m.Histogram.GetBucket() {
keyNames = append(keyNames, fmt.Sprintf("%v:%v", keyName, bucket.GetUpperBound()))
}
} else {
keyNames = append(keyNames, keyName)
}

// Add the key for Counter, Gauge metric and HISTOGRAM's +Inf bucket
keyNames = append(keyNames, keyName)
}

return keyNames
}

// Swap current value with the previous value and deduct the previous value from the current value
func deductAndUpdatePrevValue(key string, prevPromMetricValues map[string]uint64, curValue *uint64) {
origCurValue := *curValue
if prevValue, ok := prevPromMetricValues[key]; ok {
*curValue -= prevValue
}
prevPromMetricValues[key] = origCurValue
}

func convertPromHistogramsToDatadogHistogramsValues(metrics []*dto.Metric, prevPromMetricValues map[string]uint64, keyNames []string) {
if len(metrics) > 0 {
bucketCount := len(metrics[0].Histogram.GetBucket())
var prevValue uint64

for i, m := range metrics {
// First, deduct the previous cumulative count from the current one
// 1. deduct the previous cumulative count from each explicit buckets
for j, b := range m.Histogram.GetBucket() {
key := keyNames[(i*bucketCount)+j]
curValue := b.GetCumulativeCount()

// Adjust the counter value if found
if prevValue, ok := prevPromMetricValues[key]; ok {
*b.CumulativeCount -= prevValue
}

// Upsert the cache of previous counter values
prevPromMetricValues[key] = curValue
deductAndUpdatePrevValue(keyNames[(i*(bucketCount+1))+j], prevPromMetricValues, b.CumulativeCount)
}
// 2. deduct the previous cumulative count from the implicit "+Inf" bucket
deductAndUpdatePrevValue(keyNames[((i+1)*(bucketCount+1))-1], prevPromMetricValues, m.Histogram.SampleCount)

// Then, de-cumulate next bucket value from the previous bucket values
var prevValue uint64
// 3. "De-cumulate" next explicit bucket value from the preceding bucket value
prevValue = 0
for _, b := range m.Histogram.GetBucket() {
curValue := b.GetCumulativeCount()
*b.CumulativeCount -= prevValue
prevValue = curValue
}
// 4. "De-cumulate" implicit "+Inf" bucket value from the preceding bucket value
*m.Histogram.SampleCount -= prevValue
}
}
}
Expand Down
Loading

0 comments on commit ab28731

Please sign in to comment.