Skip to content

Commit

Permalink
Merge branch 'main' into metrics-uncompressed-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed Apr 23, 2024
2 parents e0ff384 + 71966cb commit d5cb393
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 162 deletions.
27 changes: 24 additions & 3 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"sync/atomic"
"time"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.elastic.co/apm/module/apmzap/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -88,7 +88,12 @@ type Appender struct {
}

// New returns a new Appender that indexes documents into Elasticsearch.
func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
// It is only tested with v8 go-elasticsearch client. Use other clients at your own risk.
func New(client esapi.Transport, cfg Config) (*Appender, error) {
if client == nil {
return nil, errors.New("client is nil")
}

if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 {
return nil, fmt.Errorf(
"expected CompressionLevel in range [-1,9], got %d",
Expand Down Expand Up @@ -142,7 +147,17 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
}
available := make(chan *BulkIndexer, cfg.MaxRequests)
for i := 0; i < cfg.MaxRequests; i++ {
available <- NewBulkIndexer(client, cfg.CompressionLevel, cfg.MaxDocumentRetries)
bi, err := NewBulkIndexer(BulkIndexerConfig{
Client: client,
MaxDocumentRetries: cfg.MaxDocumentRetries,
RetryOnDocumentStatus: cfg.RetryOnDocumentStatus,
CompressionLevel: cfg.CompressionLevel,
Pipeline: cfg.Pipeline,
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %w", err)
}
available <- bi
}
if cfg.Logger == nil {
cfg.Logger = zap.NewNop()
Expand Down Expand Up @@ -383,6 +398,12 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
if docsFailed > 0 {
atomic.AddInt64(&a.docsFailed, docsFailed)
}
if resp.RetriedDocs > 0 {
a.addCount(resp.RetriedDocs,
nil,
a.metrics.docsRetried,
)
}
if docsIndexed > 0 {
a.addCount(docsIndexed,
&a.docsIndexed,
Expand Down
4 changes: 2 additions & 2 deletions appender_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"go.elastic.co/fastjson"
"go.uber.org/zap"

"github.com/elastic/go-docappender"
"github.com/elastic/go-docappender/docappendertest"
"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-docappender/v2/docappendertest"
)

func BenchmarkAppender(b *testing.B) {
Expand Down
90 changes: 0 additions & 90 deletions appender_integration_test.go

This file was deleted.

129 changes: 127 additions & 2 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ import (
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/go-docappender"
"github.com/elastic/go-docappender/docappendertest"
"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-docappender/v2/docappendertest"
"github.com/elastic/go-elasticsearch/v8/esutil"
)

Expand Down Expand Up @@ -770,6 +770,13 @@ func TestAppenderRetryDocument(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))
var rm metricdata.ResourceMetrics

var failedCount atomic.Int32
var done atomic.Bool
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -820,6 +827,7 @@ func TestAppenderRetryDocument(t *testing.T) {
done.Store(true)
})

tc.cfg.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
indexer, err := docappender.New(client, tc.cfg)
require.NoError(t, err)
defer indexer.Close(context.Background())
Expand All @@ -833,13 +841,36 @@ func TestAppenderRetryDocument(t *testing.T) {
return failedCount.Load() == 1
}, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail")

assert.NoError(t, rdr.Collect(context.Background(), &rm))

var asserted atomic.Int64
assertCounter := docappendertest.NewAssertCounter(t, &asserted)
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.retried":
assertCounter(m, 5, *attribute.EmptySet())
}
})
assert.Equal(t, int64(1), asserted.Load())

addMinimalDoc(t, indexer, "logs-foo-testing10")
addMinimalDoc(t, indexer, "logs-foo-testing11")

require.Eventually(t, func() bool {
return failedCount.Load() == 2
}, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail")

assert.NoError(t, rdr.Collect(context.Background(), &rm))

assertCounter = docappendertest.NewAssertCounter(t, &asserted)
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.retried":
assertCounter(m, 5, *attribute.EmptySet())
}
})
assert.Equal(t, int64(2), asserted.Load())

addMinimalDoc(t, indexer, "logs-foo-testing12")

require.Eventually(t, func() bool {
Expand All @@ -852,6 +883,70 @@ func TestAppenderRetryDocument(t *testing.T) {
}
}

func TestAppenderRetryDocument_RetryOnDocumentStatus(t *testing.T) {
testCases := map[string]struct {
status int
expectedDocsInRequest []int // at index i stores the number of documents in the i-th request
cfg docappender.Config
}{
"should retry": {
status: 500,
expectedDocsInRequest: []int{1, 2, 1}, // 3rd request is triggered by indexer close
cfg: docappender.Config{
MaxRequests: 1,
MaxDocumentRetries: 1,
FlushInterval: 100 * time.Millisecond,
RetryOnDocumentStatus: []int{429, 500},
},
},
"should not retry": {
status: 500,
expectedDocsInRequest: []int{1, 1},
cfg: docappender.Config{
MaxRequests: 1,
MaxDocumentRetries: 1,
FlushInterval: 100 * time.Millisecond,
RetryOnDocumentStatus: []int{429},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
var failedCount atomic.Int32
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := docappendertest.DecodeBulkRequest(r)
attempt := failedCount.Add(1) - 1
require.Len(t, result.Items, tc.expectedDocsInRequest[attempt])
for _, item := range result.Items {
itemResp := item["create"]
itemResp.Status = tc.status
item["create"] = itemResp
}
json.NewEncoder(w).Encode(result)
})

indexer, err := docappender.New(client, tc.cfg)
require.NoError(t, err)
defer indexer.Close(context.Background())

addMinimalDoc(t, indexer, "logs-foo-testing1")

require.Eventually(t, func() bool {
return failedCount.Load() == 1
}, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail")

addMinimalDoc(t, indexer, "logs-foo-testing2")

require.Eventually(t, func() bool {
return failedCount.Load() == 2
}, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail")

err = indexer.Close(context.Background())
assert.NoError(t, err)
})
}
}

func TestAppenderCloseFlushContext(t *testing.T) {
srvctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1042,6 +1137,36 @@ func TestAppenderCloseBusyIndexer(t *testing.T) {
IndexersActive: 0}, indexer.Stats())
}

func TestAppenderPipeline(t *testing.T) {
const expected = "my_pipeline"
var actual string
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
actual = r.URL.Query().Get("pipeline")
_, result := docappendertest.DecodeBulkRequest(r)
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
Pipeline: expected,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

err = indexer.Add(context.Background(), "logs-foo-testing", newJSONReader(map[string]any{
"@timestamp": time.Unix(123, 456789111).UTC().Format(docappendertest.TimestampFormat),
"data_stream.type": "logs",
"data_stream.dataset": "foo",
"data_stream.namespace": "testing",
}))
require.NoError(t, err)

// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.NoError(t, err)

assert.Equal(t, expected, actual)
}

func TestAppenderScaling(t *testing.T) {
newIndexer := func(t *testing.T, cfg docappender.Config) *docappender.Appender {
t.Helper()
Expand Down
Loading

0 comments on commit d5cb393

Please sign in to comment.