Skip to content

Commit

Permalink
fix: update elastic.events.queued with the correct number of queued e…
Browse files Browse the repository at this point in the history
…vents (elastic#150)

* fix: update elastic.events.queued with the correct number of queued events

Because of document retries the number of queued events after
a flush request can be >0 if the returned 429.
To fix the issue we only remove (failed+indexed) from the queued
event metric after a flush request.

* test: add document retry metric test

* Update appender.go

* Update appender.go
  • Loading branch information
kruskall authored May 6, 2024
1 parent 582f708 commit 7d54164
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 1 deletion.
4 changes: 3 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
if n == 0 {
return nil
}
defer a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive)
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
Expand Down Expand Up @@ -341,6 +340,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal)
}
if err != nil {
a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive)
atomic.AddInt64(&a.docsFailed, int64(n))
logger.Error("bulk indexing request failed", zap.Error(err))
if a.tracingEnabled() {
Expand Down Expand Up @@ -373,6 +373,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
failedCount = make(map[BulkIndexerResponseItem]int, len(resp.FailedDocs))
}
docsFailed = int64(len(resp.FailedDocs))
totalFlushed := docsFailed + docsIndexed
a.addUpDownCount(-totalFlushed, &a.docsActive, a.metrics.docsActive)
for _, info := range resp.FailedDocs {
if info.Status >= 400 && info.Status < 500 {
if info.Status == http.StatusTooManyRequests {
Expand Down
160 changes: 160 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,166 @@ loop:
assert.Equal(t, 4, processedAsserted)
}

func TestAppenderRetry(t *testing.T) {
var bytesTotal int64
var bytesUncompressed int64
var first atomic.Bool
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, result, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressed += stat.UncompressedBytes
if first.CompareAndSwap(false, true) {
result.HasErrors = true
// Respond with an error for the first two items, with one indicating
// "too many requests". These will be recorded as failures in indexing
// stats.
for i := range result.Items {
if i > 2 {
break
}
status := http.StatusInternalServerError
switch i {
case 1:
status = http.StatusTooManyRequests
case 2:
status = http.StatusUnauthorized
}
for action, item := range result.Items[i] {
item.Status = status
result.Items[i][action] = item
}
}
}
json.NewEncoder(w).Encode(result)
})

rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))

indexerAttrs := attribute.NewSet(
attribute.String("a", "b"), attribute.String("c", "d"),
)

indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
FlushBytes: 750, // this is enough to flush after 9 documents
MaxRequests: 1, // to ensure the test is stable
MaxDocumentRetries: 1, // to test the document retry logic
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
MetricAttributes: indexerAttrs,
})

require.NoError(t, err)
defer indexer.Close(context.Background())

const N = 10
for i := 0; i < N; i++ {
addMinimalDoc(t, indexer, "logs-foo-testing")
}

timeout := time.After(2 * time.Second)
loop:
for {
select {
case <-time.After(10 * time.Millisecond):
// Because the internal channel is buffered to increase performance,
// the available indexer may not take documents right away, loop until
// the available bulk requests has been lowered.
if indexer.Stats().BulkRequests == 1 {
break loop
}
case <-timeout:
t.Fatalf("timed out waiting for the active bulk indexer to send one bulk request")
}
}

stats := indexer.Stats()
var rm metricdata.ResourceMetrics
assert.NoError(t, rdr.Collect(context.Background(), &rm))
var asserted atomic.Int64
assertCounter := docappendertest.NewAssertCounter(t, &asserted)

var processedAsserted int
assertProcessedCounter := func(metric metricdata.Metrics, attrs attribute.Set) {
asserted.Add(1)
counter := metric.Data.(metricdata.Sum[int64])
for _, dp := range counter.DataPoints {
metricdatatest.AssertHasAttributes(t, dp, attrs.ToSlice()...)
status, exist := dp.Attributes.Value(attribute.Key("status"))
assert.True(t, exist)
switch status.AsString() {
case "Success":
processedAsserted++
assert.Equal(t, stats.Indexed, dp.Value)
case "FailedClient":
processedAsserted++
assert.Equal(t, stats.FailedClient, dp.Value)
case "FailedServer":
processedAsserted++
assert.Equal(t, stats.FailedServer, dp.Value)
case "TooMany":
processedAsserted++
assert.Equal(t, stats.TooManyRequests, dp.Value)
default:
assert.FailNow(t, "Unexpected metric with status: "+status.AsString())
}
}
}
// check the set of names and then check the counter or histogram
unexpectedMetrics := []string{}
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.count":
assertCounter(m, stats.Added, indexerAttrs)
case "elasticsearch.events.queued":
assertCounter(m, stats.Active, indexerAttrs)
case "elasticsearch.bulk_requests.count":
assertCounter(m, stats.BulkRequests, indexerAttrs)
case "elasticsearch.events.processed":
assertProcessedCounter(m, indexerAttrs)
case "elasticsearch.events.retried":
assertCounter(m, 1, indexerAttrs)
case "elasticsearch.bulk_requests.available":
assertCounter(m, stats.AvailableBulkRequests, indexerAttrs)
case "elasticsearch.flushed.bytes":
assertCounter(m, stats.BytesTotal, indexerAttrs)
case "elasticsearch.flushed.uncompressed.bytes":
assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs)
case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
default:
unexpectedMetrics = append(unexpectedMetrics, m.Name)
}
})

assert.Empty(t, unexpectedMetrics)
assert.Equal(t, int64(8), asserted.Load())
assert.Equal(t, 3, processedAsserted)

// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.NoError(t, err)
stats = indexer.Stats()
failed := int64(2)
assert.Equal(t, docappender.Stats{
Added: N,
Active: 0,
BulkRequests: 2,
Failed: failed,
FailedClient: 1,
FailedServer: 1,
Indexed: N - failed,
TooManyRequests: 0,
AvailableBulkRequests: 1,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressed,
}, stats)
}

func TestAppenderAvailableAppenders(t *testing.T) {
unblockRequests := make(chan struct{})
receivedFlush := make(chan struct{})
Expand Down

0 comments on commit 7d54164

Please sign in to comment.