diff --git a/appender.go b/appender.go index a182d2f..ce3fa73 100644 --- a/appender.go +++ b/appender.go @@ -29,7 +29,7 @@ import ( "sync/atomic" "time" - "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" "go.elastic.co/apm/module/apmzap/v2" "go.elastic.co/apm/v2" "go.opentelemetry.io/otel/attribute" @@ -88,7 +88,12 @@ type Appender struct { } // New returns a new Appender that indexes documents into Elasticsearch. -func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { +// It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. +func New(client esapi.Transport, cfg Config) (*Appender, error) { + if client == nil { + return nil, errors.New("client is nil") + } + if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { return nil, fmt.Errorf( "expected CompressionLevel in range [-1,9], got %d", @@ -142,7 +147,17 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { } available := make(chan *BulkIndexer, cfg.MaxRequests) for i := 0; i < cfg.MaxRequests; i++ { - available <- NewBulkIndexer(client, cfg.CompressionLevel, cfg.MaxDocumentRetries) + bi, err := NewBulkIndexer(BulkIndexerConfig{ + Client: client, + MaxDocumentRetries: cfg.MaxDocumentRetries, + RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, + CompressionLevel: cfg.CompressionLevel, + Pipeline: cfg.Pipeline, + }) + if err != nil { + return nil, fmt.Errorf("error creating bulk indexer: %w", err) + } + available <- bi } if cfg.Logger == nil { cfg.Logger = zap.NewNop() @@ -383,6 +398,12 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if docsFailed > 0 { atomic.AddInt64(&a.docsFailed, docsFailed) } + if resp.RetriedDocs > 0 { + a.addCount(resp.RetriedDocs, + nil, + a.metrics.docsRetried, + ) + } if docsIndexed > 0 { a.addCount(docsIndexed, &a.docsIndexed, diff --git a/appender_bench_test.go b/appender_bench_test.go index 013a73b..e1aac0c 100644 --- a/appender_bench_test.go +++ b/appender_bench_test.go @@ -33,8 +33,8 @@ import ( "go.elastic.co/fastjson" "go.uber.org/zap" - "github.com/elastic/go-docappender" - "github.com/elastic/go-docappender/docappendertest" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" ) func BenchmarkAppender(b *testing.B) { diff --git a/appender_integration_test.go b/appender_integration_test.go deleted file mode 100644 index e3dc0fb..0000000 --- a/appender_integration_test.go +++ /dev/null @@ -1,90 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package docappender_test - -import ( - "bytes" - "context" - "encoding/json" - "os" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/go-docappender/docappendertest" - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" - - "github.com/elastic/go-docappender" -) - -func TestAppenderIntegration(t *testing.T) { - switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { - case "1", "true": - default: - t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") - } - - config := elasticsearch.Config{} - config.Username = "admin" - config.Password = "changeme" - client, err := elasticsearch.NewClient(config) - require.NoError(t, err) - indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Second}) - require.NoError(t, err) - defer indexer.Close(context.Background()) - - index := "logs-generic-testing" - deleteIndex := func() { - resp, err := esapi.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) - require.NoError(t, err) - defer resp.Body.Close() - } - deleteIndex() - defer deleteIndex() - - const N = 100 - for i := 0; i < N; i++ { - encoded, err := json.Marshal(map[string]any{"@timestamp": time.Now().Format(docappendertest.TimestampFormat)}) - require.NoError(t, err) - err = indexer.Add(context.Background(), index, bytes.NewReader(encoded)) - require.NoError(t, err) - } - - // Closing the indexer flushes enqueued events. - err = indexer.Close(context.Background()) - require.NoError(t, err) - - // Check that docs are indexed. - resp, err := esapi.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) - require.NoError(t, err) - resp.Body.Close() - - var result struct { - Count int - } - resp, err = esapi.CountRequest{Index: []string{index}}.Do(context.Background(), client) - require.NoError(t, err) - defer resp.Body.Close() - err = json.NewDecoder(resp.Body).Decode(&result) - require.NoError(t, err) - assert.Equal(t, N, result.Count) -} diff --git a/appender_test.go b/appender_test.go index 96bc16e..b4ee1c6 100644 --- a/appender_test.go +++ b/appender_test.go @@ -46,8 +46,8 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "github.com/elastic/go-docappender" - "github.com/elastic/go-docappender/docappendertest" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" "github.com/elastic/go-elasticsearch/v8/esutil" ) @@ -770,6 +770,13 @@ func TestAppenderRetryDocument(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + var rm metricdata.ResourceMetrics + var failedCount atomic.Int32 var done atomic.Bool client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -820,6 +827,7 @@ func TestAppenderRetryDocument(t *testing.T) { done.Store(true) }) + tc.cfg.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) indexer, err := docappender.New(client, tc.cfg) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -833,6 +841,18 @@ func TestAppenderRetryDocument(t *testing.T) { return failedCount.Load() == 1 }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + var asserted atomic.Int64 + assertCounter := docappendertest.NewAssertCounter(t, &asserted) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.retried": + assertCounter(m, 5, *attribute.EmptySet()) + } + }) + assert.Equal(t, int64(1), asserted.Load()) + addMinimalDoc(t, indexer, "logs-foo-testing10") addMinimalDoc(t, indexer, "logs-foo-testing11") @@ -840,6 +860,17 @@ func TestAppenderRetryDocument(t *testing.T) { return failedCount.Load() == 2 }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + assertCounter = docappendertest.NewAssertCounter(t, &asserted) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.retried": + assertCounter(m, 5, *attribute.EmptySet()) + } + }) + assert.Equal(t, int64(2), asserted.Load()) + addMinimalDoc(t, indexer, "logs-foo-testing12") require.Eventually(t, func() bool { @@ -852,6 +883,70 @@ func TestAppenderRetryDocument(t *testing.T) { } } +func TestAppenderRetryDocument_RetryOnDocumentStatus(t *testing.T) { + testCases := map[string]struct { + status int + expectedDocsInRequest []int // at index i stores the number of documents in the i-th request + cfg docappender.Config + }{ + "should retry": { + status: 500, + expectedDocsInRequest: []int{1, 2, 1}, // 3rd request is triggered by indexer close + cfg: docappender.Config{ + MaxRequests: 1, + MaxDocumentRetries: 1, + FlushInterval: 100 * time.Millisecond, + RetryOnDocumentStatus: []int{429, 500}, + }, + }, + "should not retry": { + status: 500, + expectedDocsInRequest: []int{1, 1}, + cfg: docappender.Config{ + MaxRequests: 1, + MaxDocumentRetries: 1, + FlushInterval: 100 * time.Millisecond, + RetryOnDocumentStatus: []int{429}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + var failedCount atomic.Int32 + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + attempt := failedCount.Add(1) - 1 + require.Len(t, result.Items, tc.expectedDocsInRequest[attempt]) + for _, item := range result.Items { + itemResp := item["create"] + itemResp.Status = tc.status + item["create"] = itemResp + } + json.NewEncoder(w).Encode(result) + }) + + indexer, err := docappender.New(client, tc.cfg) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + addMinimalDoc(t, indexer, "logs-foo-testing1") + + require.Eventually(t, func() bool { + return failedCount.Load() == 1 + }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + + addMinimalDoc(t, indexer, "logs-foo-testing2") + + require.Eventually(t, func() bool { + return failedCount.Load() == 2 + }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + + err = indexer.Close(context.Background()) + assert.NoError(t, err) + }) + } +} + func TestAppenderCloseFlushContext(t *testing.T) { srvctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1042,6 +1137,36 @@ func TestAppenderCloseBusyIndexer(t *testing.T) { IndexersActive: 0}, indexer.Stats()) } +func TestAppenderPipeline(t *testing.T) { + const expected = "my_pipeline" + var actual string + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + actual = r.URL.Query().Get("pipeline") + _, result := docappendertest.DecodeBulkRequest(r) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.New(client, docappender.Config{ + FlushInterval: time.Minute, + Pipeline: expected, + }) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + err = indexer.Add(context.Background(), "logs-foo-testing", newJSONReader(map[string]any{ + "@timestamp": time.Unix(123, 456789111).UTC().Format(docappendertest.TimestampFormat), + "data_stream.type": "logs", + "data_stream.dataset": "foo", + "data_stream.namespace": "testing", + })) + require.NoError(t, err) + + // Closing the indexer flushes enqueued documents. + err = indexer.Close(context.Background()) + require.NoError(t, err) + + assert.Equal(t, expected, actual) +} + func TestAppenderScaling(t *testing.T) { newIndexer := func(t *testing.T, cfg docappender.Config) *docappender.Appender { t.Helper() diff --git a/bulk_indexer.go b/bulk_indexer.go index c214d34..d6ed2f0 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -20,6 +20,7 @@ package docappender import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -30,7 +31,6 @@ import ( "github.com/klauspost/compress/gzip" "go.elastic.co/fastjson" - "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" jsoniter "github.com/json-iterator/go" ) @@ -48,14 +48,38 @@ import ( // of concurrent bulk requests. This way we can ensure bulk requests have the // maximum possible size, based on configuration and throughput. +// BulkIndexerConfig holds configuration for BulkIndexer. +type BulkIndexerConfig struct { + // Client holds the Elasticsearch client. + Client esapi.Transport + + // MaxDocumentRetries holds the maximum number of document retries + MaxDocumentRetries int + + // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. + // + // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. + RetryOnDocumentStatus []int + + // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) + // to 9 (gzip.BestCompression). Higher values provide greater compression, at a + // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the + // default compression level. + CompressionLevel int + + // Pipeline holds the ingest pipeline ID. + // + // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. + Pipeline string +} + type BulkIndexer struct { - client *elasticsearch.Client - maxDocumentRetry int + config BulkIndexerConfig itemsAdded int bytesFlushed int bytesUncompFlushed int jsonw fastjson.Writer - writer *countWriter + writer countWriter gzipw *gzip.Writer copyBuf []byte buf bytes.Buffer @@ -148,16 +172,34 @@ func (cw *countWriter) Write(p []byte) (int, error) { return cw.Writer.Write(p) } -func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRetry int) *BulkIndexer { +// NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. +// It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. +func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { + if cfg.Client == nil { + return nil, errors.New("client is nil") + } + + if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { + return nil, fmt.Errorf( + "expected CompressionLevel in range [-1,9], got %d", + cfg.CompressionLevel, + ) + } + b := &BulkIndexer{ - client: client, - maxDocumentRetry: maxDocRetry, - retryCounts: make(map[int]int), + config: cfg, + retryCounts: make(map[int]int), } - var writer io.Writer - if compressionLevel != gzip.NoCompression { - b.gzipw, _ = gzip.NewWriterLevel(&b.buf, compressionLevel) - writer = b.gzipw + + // use a len check instead of a nil check because document level retries + // should be disabled using MaxDocumentRetries instead. + if len(b.config.RetryOnDocumentStatus) == 0 { + b.config.RetryOnDocumentStatus = []int{http.StatusTooManyRequests} + } + + if cfg.CompressionLevel != gzip.NoCompression { + b.gzipw, _ = gzip.NewWriterLevel(&b.buf, cfg.CompressionLevel) + b.writer = b.gzipw } else { writer = &b.buf } @@ -165,10 +207,10 @@ func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRe 0, writer, } - return b + return b, nil } -// BulkIndexer resets b, ready for a new request. +// Reset resets bulk indexer, ready for a new request. func (b *BulkIndexer) Reset() { b.bytesFlushed = 0 b.writer.count = 0 @@ -183,7 +225,7 @@ func (b *BulkIndexer) resetBuf() { } } -// Added returns the number of buffered items. +// Items returns the number of buffered items. func (b *BulkIndexer) Items() int { return b.itemsAdded } @@ -252,7 +294,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } } - if b.maxDocumentRetry > 0 { + if b.config.MaxDocumentRetries > 0 { if cap(b.copyBuf) < b.buf.Len() { b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf)) b.copyBuf = b.copyBuf[:cap(b.copyBuf)] @@ -265,6 +307,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error Body: &b.buf, Header: make(http.Header), FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"}, + Pipeline: b.config.Pipeline, } if b.gzipw != nil { req.Header.Set("Content-Encoding", "gzip") @@ -272,15 +315,15 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error bytesFlushed := b.buf.Len() bytesUncompFlushed := b.writer.count - res, err := req.Do(ctx, b.client) + res, err := req.Do(ctx, b.config.Client) if err != nil { b.resetBuf() return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err) } defer res.Body.Close() - // Reset the buffer and gzip writer so they can be reused in case 429s - // were received. + // Reset the buffer and gzip writer so they can be reused in case + // document level retries are needed. b.resetBuf() // Record the number of flushed bytes only when err == nil. The body may @@ -300,8 +343,8 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } // Only run the retry logic if document retries are enabled - if b.maxDocumentRetry > 0 { - buf := make([]byte, 0, 1024) + if b.config.MaxDocumentRetries > 0 { + buf := make([]byte, 0, 4096) // Eliminate previous retry counts that aren't present in the bulk // request response. @@ -337,7 +380,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error seen := 0 for _, res := range resp.FailedDocs { - if res.Status == http.StatusTooManyRequests { + if b.shouldRetryOnStatus(res.Status) { // there are two lines for each document: // - action // - document @@ -348,10 +391,10 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error startln := res.Position * 2 endln := startln + 2 - // Increment 429 count for the positions found. + // Increment retry count for the positions found. count := b.retryCounts[res.Position] + 1 // check if we are above the maxDocumentRetry setting - if count > b.maxDocumentRetry { + if count > b.config.MaxDocumentRetries { // do not retry, return the document as failed tmp = append(tmp, res) continue @@ -429,7 +472,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error resp.RetriedDocs++ b.itemsAdded++ } else { - // If it's not a 429 treat the document as failed + // If it's not a retriable error, treat the document as failed tmp = append(tmp, res) } } @@ -442,6 +485,15 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error return resp, nil } +func (b *BulkIndexer) shouldRetryOnStatus(docStatus int) bool { + for _, status := range b.config.RetryOnDocumentStatus { + if docStatus == status { + return true + } + } + return false +} + // indexnth returns the index of the nth instance of sep in s. // It returns -1 if sep is not present in s or nth is 0. func indexnth(s []byte, nth int, sep rune) int { diff --git a/config.go b/config.go index 51451d6..83ab497 100644 --- a/config.go +++ b/config.go @@ -58,6 +58,11 @@ type Config struct { // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int + // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. + // + // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. + RetryOnDocumentStatus []int + // FlushBytes holds the flush threshold in bytes. If Compression is enabled, // The number of documents that can be buffered will be greater. // @@ -80,6 +85,11 @@ type Config struct { // If DocumentBufferSize is zero, the default 1024 will be used. DocumentBufferSize int + // Pipeline holds the ingest pipeline ID. + // + // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. + Pipeline string + // Scaling configuration for the docappender. // // If unspecified, scaling is enabled by default. diff --git a/go.mod b/go.mod index c658722..b7d2ab4 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,21 @@ -module github.com/elastic/go-docappender +module github.com/elastic/go-docappender/v2 go 1.21 require ( - github.com/elastic/go-elasticsearch/v8 v8.13.0 + github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/json-iterator/go v1.1.12 - github.com/klauspost/compress v1.17.7 + github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.9.0 - go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 - go.elastic.co/apm/module/apmzap/v2 v2.5.0 - go.elastic.co/apm/v2 v2.5.0 + go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 + go.elastic.co/apm/module/apmzap/v2 v2.6.0 + go.elastic.co/apm/v2 v2.6.0 go.elastic.co/fastjson v1.3.0 - go.opentelemetry.io/otel v1.24.0 - go.opentelemetry.io/otel/metric v1.24.0 - go.opentelemetry.io/otel/sdk/metric v1.24.0 + go.opentelemetry.io/otel v1.25.0 + go.opentelemetry.io/otel/metric v1.25.0 + go.opentelemetry.io/otel/sdk/metric v1.25.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.7.0 ) require ( @@ -33,11 +33,11 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - go.elastic.co/apm/module/apmhttp/v2 v2.5.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect + go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect + go.opentelemetry.io/otel/sdk v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/sys v0.18.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index b76f05b..4882589 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-elasticsearch/v8 v8.13.0 h1:YXPAWpvbYX0mWSNG9tnEpvs4h1stgMy5JUeKZECYYB8= -github.com/elastic/go-elasticsearch/v8 v8.13.0/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= +github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= +github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= @@ -26,8 +26,8 @@ github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -51,26 +51,26 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 h1:0S5Vj5/L4EkXQS7YUr+1ylTuB3njTuBNzdmn3mjXAFI= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0/go.mod h1:zNEXwAPoThH/bAb3TWKD5Og0Zyk0OWURsEHAja1kra4= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0 h1:4AWlw8giL7hRYBQiwF1/Thm0GDsbQH/Ofe4eySAnURo= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0/go.mod h1:ZP7gLEzY/OAPTqNZjp8AzA06HF82zfwXEpKI2sSVTgk= -go.elastic.co/apm/module/apmzap/v2 v2.5.0 h1:COXqVte4i75XQmV+H4m4g+2JubK3Y1WRIzY/ppKa3bQ= -go.elastic.co/apm/module/apmzap/v2 v2.5.0/go.mod h1:PHKFbSROQPFZ2+X3oZyaF8lie5DhK0gtcRMpz//S54g= -go.elastic.co/apm/v2 v2.5.0 h1:UYqdu/bjcubcP9BIy5+os2ExRzw03yOQFG+sRGGhVlQ= -go.elastic.co/apm/v2 v2.5.0/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= +go.elastic.co/apm/module/apmzap/v2 v2.6.0 h1:R/iVORzGu3F9uM43iEVHD0nwiRo59O0bIXdayKsgayQ= +go.elastic.co/apm/module/apmzap/v2 v2.6.0/go.mod h1:B3i/8xRkqLgi6zNuV+Bp7Pt4cutaOObvrVSa7wUTAPw= +go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY= +go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= -go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= @@ -79,13 +79,13 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/integrationtest/appender_integration_test.go b/integrationtest/appender_integration_test.go new file mode 100644 index 0000000..b02b2a0 --- /dev/null +++ b/integrationtest/appender_integration_test.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integrationtest + +import ( + "bytes" + "context" + "encoding/json" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" + elasticsearch7 "github.com/elastic/go-elasticsearch/v7" + esapi7 "github.com/elastic/go-elasticsearch/v7/esapi" + elasticsearch8 "github.com/elastic/go-elasticsearch/v8" + esapi8 "github.com/elastic/go-elasticsearch/v8/esapi" +) + +const N = 100 + +func sendEvents(t *testing.T, indexer *docappender.Appender, index string) { + for i := 0; i < N; i++ { + encoded, err := json.Marshal(map[string]any{"@timestamp": time.Now().Format(docappendertest.TimestampFormat)}) + require.NoError(t, err) + err = indexer.Add(context.Background(), index, bytes.NewReader(encoded)) + require.NoError(t, err) + } + + // Closing the indexer flushes enqueued events. + err := indexer.Close(context.Background()) + require.NoError(t, err) +} + +func TestAppenderIntegrationV8(t *testing.T) { + switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { + case "1", "true": + default: + t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") + } + + const index = "logs-generic-testing.v8" + + config := elasticsearch8.Config{} + config.Username = "admin" + config.Password = "changeme" + client, err := elasticsearch8.NewClient(config) + require.NoError(t, err) + indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Second}) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + deleteIndex := func() { + resp, err := esapi8.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + } + deleteIndex() + defer deleteIndex() + + sendEvents(t, indexer, index) + + // Check that docs are indexed. + resp, err := esapi8.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + resp.Body.Close() + + var result struct { + Count int + } + resp, err = esapi8.CountRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, err) + assert.Equal(t, N, result.Count) +} + +func TestAppenderIntegrationV7(t *testing.T) { + switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { + case "1", "true": + default: + t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") + } + + const index = "logs-generic-testing.v7" + + config := elasticsearch7.Config{} + config.Username = "admin" + config.Password = "changeme" + client, err := elasticsearch7.NewClient(config) + require.NoError(t, err) + indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Second}) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + deleteIndex := func() { + resp, err := esapi7.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + } + deleteIndex() + defer deleteIndex() + + sendEvents(t, indexer, index) + + // Check that docs are indexed. + resp, err := esapi7.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + resp.Body.Close() + + var result struct { + Count int + } + resp, err = esapi7.CountRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, err) + assert.Equal(t, N, result.Count) +} diff --git a/integrationtest/go.mod b/integrationtest/go.mod new file mode 100644 index 0000000..fe1c92c --- /dev/null +++ b/integrationtest/go.mod @@ -0,0 +1,39 @@ +module integrationtest + +go 1.22.0 + +require ( + github.com/elastic/go-docappender/v2 v2.0.0 + github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/elastic/go-elasticsearch/v8 v8.13.1 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/armon/go-radix v1.0.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect + github.com/elastic/go-sysinfo v1.7.1 // indirect + github.com/elastic/go-windows v1.0.1 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/procfs v0.7.3 // indirect + go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 // indirect + go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect + go.elastic.co/apm/v2 v2.6.0 // indirect + go.elastic.co/fastjson v1.3.0 // indirect + go.opentelemetry.io/otel v1.25.0 // indirect + go.opentelemetry.io/otel/metric v1.25.0 // indirect + go.opentelemetry.io/otel/sdk v1.25.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect + golang.org/x/sys v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + howett.net/plist v1.0.0 // indirect +) + +replace github.com/elastic/go-docappender/v2 => ../ diff --git a/integrationtest/go.sum b/integrationtest/go.sum new file mode 100644 index 0000000..8897b9e --- /dev/null +++ b/integrationtest/go.sum @@ -0,0 +1,82 @@ +github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= +github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= +github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= +github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= +github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= +github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= +github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= +github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= +github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= +github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= +go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY= +go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= +go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= +go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= +howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= +howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= diff --git a/metric.go b/metric.go index 89424ae..2c7d56d 100644 --- a/metric.go +++ b/metric.go @@ -32,6 +32,7 @@ type metrics struct { docsAdded metric.Int64Counter docsActive metric.Int64UpDownCounter docsIndexed metric.Int64Counter + docsRetried metric.Int64Counter bytesTotal metric.Int64Counter bytesUncompTotal metric.Int64Counter availableBulkRequests metric.Int64UpDownCounter @@ -104,6 +105,11 @@ func newMetrics(cfg Config) (metrics, error) { description: "Number of APM Events flushed to Elasticsearch. Attributes are used to report separate counts for different outcomes - success, client failure, etc.", p: &ms.docsIndexed, }, + { + name: "elasticsearch.events.retried", + description: "The number of document retries. A single document may be retried more than once.", + p: &ms.docsRetried, + }, { name: "elasticsearch.flushed.bytes", description: "The total number of bytes written to the request body",