Skip to content

Commit

Permalink
rename TrackerForUser to Tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 19, 2024
1 parent 1f39282 commit 9b4337d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 48 deletions.
6 changes: 3 additions & 3 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *Manager) EnabledForUser(userID string) bool {
return len(m.limits.CostAttributionLabels(userID)) > 0
}

func (m *Manager) TrackerForUser(userID string) *Tracker {
func (m *Manager) Tracker(userID string) *Tracker {
if !m.EnabledForUser(userID) {
return nil
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
}

invalidKeys := m.inactiveObservationsForUser(userID, deadline)
cat := m.TrackerForUser(userID)
cat := m.Tracker(userID)
for _, key := range invalidKeys {
cat.cleanupTrackerAttribution(key)
}
Expand All @@ -133,7 +133,7 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
}

func (m *Manager) inactiveObservationsForUser(userID string, deadline int64) []string {
cat := m.TrackerForUser(userID)
cat := m.Tracker(userID)
newTrackedLabels := m.limits.CostAttributionLabels(userID)
sort.Slice(newTrackedLabels, func(i, j int) bool {
return newTrackedLabels[i] < newTrackedLabels[j]
Expand Down
30 changes: 15 additions & 15 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ func Test_CreateDeleteTracker(t *testing.T) {
manager := newTestManager()

t.Run("Tracker existence and attributes", func(t *testing.T) {
user1Tracker := manager.TrackerForUser("user1")
user1Tracker := manager.Tracker("user1")
assert.NotNil(t, user1Tracker)
assert.True(t, user1Tracker.CompareCALabels([]string{"team"}))
assert.Equal(t, 5, user1Tracker.MaxCardinality())

assert.Nil(t, manager.TrackerForUser("user2"))
assert.Nil(t, manager.Tracker("user2"))

user3Tracker := manager.TrackerForUser("user3")
user3Tracker := manager.Tracker("user3")
assert.NotNil(t, user3Tracker)
assert.True(t, user3Tracker.CompareCALabels([]string{"department", "service"}))
assert.Equal(t, 2, user3Tracker.MaxCardinality())
})

t.Run("Metrics tracking", func(t *testing.T) {
manager.TrackerForUser("user1").IncrementDiscardedSamples(labels.FromStrings("team", "bar"), 1, "invalid-metrics-name", time.Unix(6, 0))
manager.TrackerForUser("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(12, 0))
manager.TrackerForUser("user3").IncrementReceivedSamples(labels.FromStrings("department", "foo", "service", "dodo"), 1, time.Unix(20, 0))
manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "bar"), 1, "invalid-metrics-name", time.Unix(6, 0))
manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(12, 0))
manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("department", "foo", "service", "dodo"), 1, time.Unix(20, 0))

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand Down Expand Up @@ -124,9 +124,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(12, 0).Unix()))
assert.Equal(t, 1, len(manager.trackersByUserID))
assert.True(t, manager.TrackerForUser("user3").CompareCALabels([]string{"feature", "team"}))
assert.True(t, manager.Tracker("user3").CompareCALabels([]string{"feature", "team"}))

manager.TrackerForUser("user3").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(13, 0))
manager.Tracker("user3").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(13, 0))
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
Expand All @@ -136,9 +136,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Overflow metrics on cardinality limit", func(t *testing.T) {
manager.TrackerForUser("user3").IncrementReceivedSamples(labels.FromStrings("team", "bar", "feature", "bar"), 1, time.Unix(15, 0))
manager.TrackerForUser("user3").IncrementReceivedSamples(labels.FromStrings("team", "baz", "feature", "baz"), 1, time.Unix(16, 0))
manager.TrackerForUser("user3").IncrementReceivedSamples(labels.FromStrings("team", "foo", "feature", "foo"), 1, time.Unix(17, 0))
manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "bar", "feature", "bar"), 1, time.Unix(15, 0))
manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "baz", "feature", "baz"), 1, time.Unix(16, 0))
manager.Tracker("user3").IncrementReceivedSamples(labels.FromStrings("team", "foo", "feature", "foo"), 1, time.Unix(17, 0))
expectedMetrics := `
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
Expand All @@ -151,9 +151,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
manager := newTestManager()

manager.TrackerForUser("user1").IncrementReceivedSamples(labels.FromStrings("team", "foo"), 1, time.Unix(1, 0))
manager.TrackerForUser("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(1, 0))
manager.TrackerForUser("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0))
manager.Tracker("user1").IncrementReceivedSamples(labels.FromStrings("team", "foo"), 1, time.Unix(1, 0))
manager.Tracker("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(1, 0))
manager.Tracker("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0))

t.Run("Purge before inactive timeout", func(t *testing.T) {
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix()))
Expand All @@ -175,7 +175,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {

// User3's tracker should remain since it's active, user1's tracker should be removed
assert.Equal(t, 1, len(manager.trackersByUserID), "Expected one active tracker after purging")
assert.Nil(t, manager.TrackerForUser("user1"), "Expected user1 tracker to be purged")
assert.Nil(t, manager.Tracker("user1"), "Expected user1 tracker to be purged")

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand Down
14 changes: 7 additions & 7 deletions pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ import (
)

func Test_GetCALabels(t *testing.T) {
cat := newTestManager().TrackerForUser("user1")
cat := newTestManager().Tracker("user1")
assert.True(t, cat.CompareCALabels([]string{"team"}), "Expected cost attribution labels mismatch")
}

func Test_GetMaxCardinality(t *testing.T) {
cat := newTestManager().TrackerForUser("user1")
cat := newTestManager().Tracker("user1")
assert.Equal(t, 5, cat.MaxCardinality(), "Expected max cardinality mismatch")
}

func Test_CreateCleanupTracker(t *testing.T) {
tManager := newTestManager()
cat := tManager.TrackerForUser("user4")
cat := tManager.Tracker("user4")

reg := prometheus.NewRegistry()
err := reg.Register(tManager)
Expand Down Expand Up @@ -81,7 +81,7 @@ func Test_CreateCleanupTracker(t *testing.T) {
}

func Test_UpdateCounters(t *testing.T) {
cat := newTestManager().TrackerForUser("user3")
cat := newTestManager().Tracker("user3")
lbls1 := labels.FromStrings("department", "foo", "service", "bar")
lbls2 := labels.FromStrings("department", "bar", "service", "baz")
lbls3 := labels.FromStrings("department", "baz", "service", "foo")
Expand All @@ -103,7 +103,7 @@ func Test_UpdateCounters(t *testing.T) {

func Test_GetInactiveObservations(t *testing.T) {
// Setup the test environment: create a tracker for user1 with a "team" label and max cardinality of 5.
cat := newTestManager().TrackerForUser("user1")
cat := newTestManager().Tracker("user1")

// Create two observations with different last update timestamps.
observations := []labels.Labels{
Expand Down Expand Up @@ -136,14 +136,14 @@ func Test_GetInactiveObservations(t *testing.T) {

func Test_UpdateMaxCardinality(t *testing.T) {
// user1 original max cardinality is 5
cat := newTestManager().TrackerForUser("user1")
cat := newTestManager().Tracker("user1")
cat.UpdateMaxCardinality(2)
assert.Equal(t, 2, cat.MaxCardinality(), "Expected max cardinality update to 2")
}

func Test_Concurrency(t *testing.T) {
m := newTestManager()
cat := m.TrackerForUser("user1")
cat := m.Tracker("user1")

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimese
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)
cat := d.costAttributionMgr.Tracker(userID)
if len(ts.Samples) == 1 {
return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
}
Expand Down Expand Up @@ -792,7 +792,7 @@ func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTim
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)
cat := d.costAttributionMgr.Tracker(userID)
if len(ts.Histograms) == 1 {
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
if err != nil {
Expand Down Expand Up @@ -879,7 +879,7 @@ func (d *Distributor) validateExemplars(ts *mimirpb.PreallocTimeseries, userID s
// The returned error may retain the series labels.
// It uses the passed nowt time to observe the delay of sample timestamps.
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64) (bool, error) {
cat := d.costAttributionMgr.TrackerForUser(userID)
cat := d.costAttributionMgr.Tracker(userID)
if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt); err != nil {
return true, err
}
Expand Down Expand Up @@ -988,7 +988,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now)
d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now)
}

return err
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
totalN := validatedSamples + validatedExemplars + validatedMetadata
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
if len(req.Timeseries) > 0 {
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(validatedSamples), reasonRateLimited, now)
d.costAttributionMgr.Tracker(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(validatedSamples), reasonRateLimited, now)
}
d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples))
d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars))
Expand Down Expand Up @@ -1832,7 +1832,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
d.costAttributionMgr.TrackerForUser(userID).IncrementReceivedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), mtime.Now())
d.costAttributionMgr.Tracker(userID).IncrementReceivedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), mtime.Now())
}
receivedMetadata = len(req.Metadata)

Expand Down
Loading

0 comments on commit 9b4337d

Please sign in to comment.