From 7d54164af76095d98d830009c14d9cb70658ff33 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 6 May 2024 19:11:17 +0200 Subject: [PATCH] fix: update elastic.events.queued with the correct number of queued events (#150) * fix: update elastic.events.queued with the correct number of queued events Because of document retries the number of queued events after a flush request can be >0 if the returned 429. To fix the issue we only remove (failed+indexed) from the queued event metric after a flush request. * test: add document retry metric test * Update appender.go * Update appender.go --- appender.go | 4 +- appender_test.go | 160 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 1 deletion(-) diff --git a/appender.go b/appender.go index 4f2d076..3300890 100644 --- a/appender.go +++ b/appender.go @@ -303,7 +303,6 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if n == 0 { return nil } - defer a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive) defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests) logger := a.config.Logger @@ -341,6 +340,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal) } if err != nil { + a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive) atomic.AddInt64(&a.docsFailed, int64(n)) logger.Error("bulk indexing request failed", zap.Error(err)) if a.tracingEnabled() { @@ -373,6 +373,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { failedCount = make(map[BulkIndexerResponseItem]int, len(resp.FailedDocs)) } docsFailed = int64(len(resp.FailedDocs)) + totalFlushed := docsFailed + docsIndexed + a.addUpDownCount(-totalFlushed, &a.docsActive, a.metrics.docsActive) for _, info := range resp.FailedDocs { if info.Status >= 400 && info.Status < 500 { if info.Status == http.StatusTooManyRequests { diff --git a/appender_test.go b/appender_test.go index aecd26c..e7fcfc6 100644 --- a/appender_test.go +++ b/appender_test.go @@ -205,6 +205,166 @@ loop: assert.Equal(t, 4, processedAsserted) } +func TestAppenderRetry(t *testing.T) { + var bytesTotal int64 + var bytesUncompressed int64 + var first atomic.Bool + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + bytesTotal += r.ContentLength + _, result, stat := docappendertest.DecodeBulkRequestWithStats(r) + bytesUncompressed += stat.UncompressedBytes + if first.CompareAndSwap(false, true) { + result.HasErrors = true + // Respond with an error for the first two items, with one indicating + // "too many requests". These will be recorded as failures in indexing + // stats. + for i := range result.Items { + if i > 2 { + break + } + status := http.StatusInternalServerError + switch i { + case 1: + status = http.StatusTooManyRequests + case 2: + status = http.StatusUnauthorized + } + for action, item := range result.Items[i] { + item.Status = status + result.Items[i][action] = item + } + } + } + json.NewEncoder(w).Encode(result) + }) + + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + + indexerAttrs := attribute.NewSet( + attribute.String("a", "b"), attribute.String("c", "d"), + ) + + indexer, err := docappender.New(client, docappender.Config{ + FlushInterval: time.Minute, + FlushBytes: 750, // this is enough to flush after 9 documents + MaxRequests: 1, // to ensure the test is stable + MaxDocumentRetries: 1, // to test the document retry logic + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), + MetricAttributes: indexerAttrs, + }) + + require.NoError(t, err) + defer indexer.Close(context.Background()) + + const N = 10 + for i := 0; i < N; i++ { + addMinimalDoc(t, indexer, "logs-foo-testing") + } + + timeout := time.After(2 * time.Second) +loop: + for { + select { + case <-time.After(10 * time.Millisecond): + // Because the internal channel is buffered to increase performance, + // the available indexer may not take documents right away, loop until + // the available bulk requests has been lowered. + if indexer.Stats().BulkRequests == 1 { + break loop + } + case <-timeout: + t.Fatalf("timed out waiting for the active bulk indexer to send one bulk request") + } + } + + stats := indexer.Stats() + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + var asserted atomic.Int64 + assertCounter := docappendertest.NewAssertCounter(t, &asserted) + + var processedAsserted int + assertProcessedCounter := func(metric metricdata.Metrics, attrs attribute.Set) { + asserted.Add(1) + counter := metric.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + metricdatatest.AssertHasAttributes(t, dp, attrs.ToSlice()...) + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + processedAsserted++ + assert.Equal(t, stats.Indexed, dp.Value) + case "FailedClient": + processedAsserted++ + assert.Equal(t, stats.FailedClient, dp.Value) + case "FailedServer": + processedAsserted++ + assert.Equal(t, stats.FailedServer, dp.Value) + case "TooMany": + processedAsserted++ + assert.Equal(t, stats.TooManyRequests, dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + } + // check the set of names and then check the counter or histogram + unexpectedMetrics := []string{} + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + assertCounter(m, stats.Added, indexerAttrs) + case "elasticsearch.events.queued": + assertCounter(m, stats.Active, indexerAttrs) + case "elasticsearch.bulk_requests.count": + assertCounter(m, stats.BulkRequests, indexerAttrs) + case "elasticsearch.events.processed": + assertProcessedCounter(m, indexerAttrs) + case "elasticsearch.events.retried": + assertCounter(m, 1, indexerAttrs) + case "elasticsearch.bulk_requests.available": + assertCounter(m, stats.AvailableBulkRequests, indexerAttrs) + case "elasticsearch.flushed.bytes": + assertCounter(m, stats.BytesTotal, indexerAttrs) + case "elasticsearch.flushed.uncompressed.bytes": + assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs) + case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency": + // expect this metric name but no assertions done + // as it's histogram and it's checked elsewhere + default: + unexpectedMetrics = append(unexpectedMetrics, m.Name) + } + }) + + assert.Empty(t, unexpectedMetrics) + assert.Equal(t, int64(8), asserted.Load()) + assert.Equal(t, 3, processedAsserted) + + // Closing the indexer flushes enqueued documents. + err = indexer.Close(context.Background()) + require.NoError(t, err) + stats = indexer.Stats() + failed := int64(2) + assert.Equal(t, docappender.Stats{ + Added: N, + Active: 0, + BulkRequests: 2, + Failed: failed, + FailedClient: 1, + FailedServer: 1, + Indexed: N - failed, + TooManyRequests: 0, + AvailableBulkRequests: 1, + BytesTotal: bytesTotal, + BytesUncompressedTotal: bytesUncompressed, + }, stats) +} + func TestAppenderAvailableAppenders(t *testing.T) { unblockRequests := make(chan struct{}) receivedFlush := make(chan struct{})