Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid duplicated Cached promises when invalidating expanded postings cache #6452

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"flag"
"slices"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -166,7 +165,7 @@ func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context
}

func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
var seed string
var seed int
cache := c.blocksCache

// If is a head block, lets add the seed on the cache key so we can
Expand Down Expand Up @@ -208,8 +207,8 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return nil, 0, err
}

key := cacheKey(seed, blockID, ms...)
promise, loaded := cache.getPromiseForKey(key, fetch)
key := cacheKey(blockID, ms...)
promise, loaded := cache.getPromiseForKey(seed, key, fetch)
if loaded {
c.metrics.CacheHits.WithLabelValues(cache.name).Inc()
}
Expand All @@ -231,11 +230,11 @@ func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}
}

func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) int {
return c.seedByHash.getSeed(c.userId, metricName)
}

func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
func cacheKey(blockID ulid.ULID, ms ...*labels.Matcher) string {
slices.SortFunc(ms, func(i, j *labels.Matcher) int {
if i.Type != j.Type {
return int(i.Type - j.Type)
Expand All @@ -254,14 +253,12 @@ func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
sepLen = 1
)

size := len(seed) + len(blockID.String()) + 2*sepLen
size := len(blockID.String()) + sepLen
for _, m := range ms {
size += len(m.Name) + len(m.Value) + typeLen + sepLen
}
sb := strings.Builder{}
sb.Grow(size)
sb.WriteString(seed)
sb.WriteByte('|')
sb.WriteString(blockID.String())
sb.WriteByte('|')
for _, m := range ms {
Expand Down Expand Up @@ -300,13 +297,13 @@ func newSeedByHash(size int) *seedByHash {
}
}

func (s *seedByHash) getSeed(userId string, v string) string {
func (s *seedByHash) getSeed(userId string, v string) int {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := i % uint64(len(s.strippedLock))
s.strippedLock[l].RLock()
defer s.strippedLock[l].RUnlock()
return strconv.Itoa(s.seedByHash[i])
return s.seedByHash[i]
}

func (s *seedByHash) incrementSeed(userId string, v string) {
Expand Down Expand Up @@ -360,9 +357,10 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
func (c *fifoCache[V]) getPromiseForKey(seed int, k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
seed: seed,
}
defer close(r.done)

Expand All @@ -385,15 +383,39 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
c.metrics.CacheEvicts.WithLabelValues(c.name, "expired").Inc()
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
loaded = r
r.ts = c.timeNow()
ok = false
var reason string
invalidated, expired := false, false

switch {
// If the seed from the cached promise is not equal with the incoming sample, it means that the cache key was invalidated
case loaded.(*cacheEntryPromise[V]).seed != seed:
invalidated = true
reason = "invalidated"
case loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()):
expired = true
reason = "expired"
}

if invalidated || expired {
c.metrics.CacheEvicts.WithLabelValues(c.name, reason).Inc()

// If the cache is invalid of expired, lets try to replace its value
if c.cachedValues.CompareAndSwap(k, loaded, r) {
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
loaded = r
r.ts = c.timeNow()
ok = false
} else if invalidated {
// If we cannot perform the swap, it indicates that another goroutine is attempting to set the cache key concurrently.
// In this scenario, fetch the key if it was invalidated, as we cannot be certain whether the other goroutine holds
// the most up-to-date value. Loading from the cache in this state may result in returning a stale value.
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
r.ts = c.timeNow()
loaded = r
}
}
}

Expand Down Expand Up @@ -459,6 +481,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
type cacheEntryPromise[V any] struct {
ts time.Time
sizeBytes int64
seed int

done chan struct{}
v V
Expand Down
36 changes: 22 additions & 14 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

func TestCacheKey(t *testing.T) {
blockID := ulid.MustNew(1, nil)
seed := "seed123"
matchers := []*labels.Matcher{
{
Type: labels.MatchEqual,
Expand All @@ -41,8 +40,8 @@ func TestCacheKey(t *testing.T) {
Value: "value_4",
},
}
r := cacheKey(seed, blockID, matchers...)
require.Equal(t, "seed123|00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r)
r := cacheKey(blockID, matchers...)
require.Equal(t, "00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r)
}

func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
Expand All @@ -67,7 +66,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
cache.getPromiseForKey("key1", fetchFunc)
cache.getPromiseForKey(1, "key1", fetchFunc)
}()
}

Expand All @@ -82,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) {
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
old, loaded := cache.getPromiseForKey(1, "key1", func() (int, int64, error) {
return 1, 0, nil
})
require.False(t, loaded)
Expand All @@ -91,7 +90,6 @@ func TestFifoCacheDisabled(t *testing.T) {
}

func TestFifoCacheExpire(t *testing.T) {

keySize := 20
numberOfKeys := 100

Expand Down Expand Up @@ -128,17 +126,24 @@ func TestFifoCacheExpire(t *testing.T) {

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) {
p, loaded := cache.getPromiseForKey(1, key, func() (int, int64, error) {
return 1, 8, nil
})
require.False(t, loaded)
require.Equal(t, 1, p.v)
require.True(t, cache.contains(key))
p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) {
return 1, 0, nil
p, loaded = cache.getPromiseForKey(1, key, func() (int, int64, error) {
return 1, 8, nil
})
require.True(t, loaded)
require.Equal(t, 1, p.v)

// Changing seed and make sure the key is reloaded
p, loaded = cache.getPromiseForKey(2, key, func() (int, int64, error) {
return 2, 8, nil
})
require.False(t, loaded)
require.Equal(t, 2, p.v)
}

totalCacheSize := 0
Expand All @@ -156,8 +161,9 @@ func TestFifoCacheExpire(t *testing.T) {
err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="full"} %v
`, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts")
`, numberOfKeys, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)

}
Expand All @@ -170,7 +176,7 @@ func TestFifoCacheExpire(t *testing.T) {
for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
originalSize := cache.cachedBytes
p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) {
p, loaded := cache.getPromiseForKey(2, key, func() (int, int64, error) {
return 2, 18, nil
})
require.False(t, loaded)
Expand All @@ -183,15 +189,16 @@ func TestFifoCacheExpire(t *testing.T) {
err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v
`, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
`, numberOfKeys, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)

cache.timeNow = func() time.Time {
return timeNow().Add(5 * c.cfg.Ttl)
}

cache.getPromiseForKey("newKwy", func() (int, int64, error) {
cache.getPromiseForKey(1, "newKwy", func() (int, int64, error) {
return 2, 18, nil
})

Expand All @@ -200,7 +207,8 @@ func TestFifoCacheExpire(t *testing.T) {
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v
`, numberOfKeys*2)), "cortex_ingester_expanded_postings_cache_evicts")
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
`, numberOfKeys*2, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)
}
})
Expand Down
Loading