Skip to content

Commit

Permalink
test: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed May 14, 2024
1 parent 4a785dd commit b696c9a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 55 deletions.
4 changes: 2 additions & 2 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,12 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
// Record the BulkIndexer uncompressed bytes written to the buffer
// as the bytesUncompressedTotal metric after the request has been flushed.
if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 {
a.addCount(resp.FlushedUncompressedSucceeded,
a.addCount(resp.FlushedUncompressedBytesOK,
&a.bytesUncompressedTotal,
a.metrics.bytesUncompressedTotal,
metric.WithAttributes(attribute.String("outcome", "success")),
)
a.addCount(int64(flushed)-resp.FlushedUncompressedSucceeded,
a.addCount(int64(flushed)-resp.FlushedUncompressedBytesOK,
&a.bytesUncompressedTotal,
a.metrics.bytesUncompressedTotal,
metric.WithAttributes(attribute.String("outcome", "failure")),
Expand Down
25 changes: 24 additions & 1 deletion appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,29 @@ loop:
}
}
}
var flushedAsserted int
var fb int64
assertUncompressedFlushed := func(metric metricdata.Metrics, attrs attribute.Set) {
asserted.Add(1)
counter := metric.Data.(metricdata.Sum[int64])
for _, dp := range counter.DataPoints {
metricdatatest.AssertHasAttributes(t, dp, attrs.ToSlice()...)
status, exist := dp.Attributes.Value(attribute.Key("outcome"))
assert.True(t, exist)
switch status.AsString() {
case "success":
flushedAsserted++
fb += dp.Value
case "failure":
flushedAsserted++
fb += dp.Value
default:
assert.FailNow(t, "Unexpected metric with outcome: "+status.AsString())
}
}
assert.Equal(t, stats.BytesUncompressedTotal, bytesTotal)
assert.Equal(t, stats.BytesUncompressedTotal, fb)
}
// check the set of names and then check the counter or histogram
unexpectedMetrics := []string{}
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
Expand All @@ -191,7 +214,7 @@ loop:
case "elasticsearch.flushed.bytes":
assertCounter(m, stats.BytesTotal, indexerAttrs)
case "elasticsearch.flushed.uncompressed.bytes":
assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs)
assertUncompressedFlushed(m, indexerAttrs)
case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
Expand Down
12 changes: 6 additions & 6 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ type BulkIndexer struct {
}

type BulkIndexerResponseStat struct {
Indexed int64
RetriedDocs int64
FailedDocs []BulkIndexerResponseItem
FlushedUncompressedSucceeded int64
Indexed int64
RetriedDocs int64
FailedDocs []BulkIndexerResponseItem
FlushedUncompressedBytesOK int64
}

// BulkIndexerResponseItem represents the Elasticsearch response item.
Expand Down Expand Up @@ -392,11 +392,11 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
continue
}
if len(b.writer.bytesWritten) > i {
resp.FlushedUncompressedSucceeded += int64(b.writer.bytesWritten[i])
resp.FlushedUncompressedBytesOK += int64(b.writer.bytesWritten[i])
}
}
} else {
resp.FlushedUncompressedSucceeded = int64(b.bytesUncompFlushed)
resp.FlushedUncompressedBytesOK = int64(b.bytesUncompFlushed)
}

b.writer.reset()
Expand Down
82 changes: 36 additions & 46 deletions bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,55 +127,45 @@ func TestBulkIndexer(t *testing.T) {
require.Equal(t, 0, indexer.UncompressedLen())
})
}
t.Run("stat", func(t *testing.T) {
for _, tc := range []struct {
Name string
CompressionLevel int
}{
// {Name: "no_compression", CompressionLevel: gzip.NoCompression},
{Name: "default_compression", CompressionLevel: gzip.DefaultCompression},
} {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := docappendertest.DecodeBulkRequest(r)
for i, itemsMap := range result.Items {
// mark odd item as failed
if i%2 == 0 {
continue
}
for k, item := range itemsMap {
result.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = "for testing"
itemsMap[k] = item
}
t.Run("flushed bytes with success", func(t *testing.T) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := docappendertest.DecodeBulkRequest(r)
for i, itemsMap := range result.Items {
// mark odd item as failed
if i%2 == 0 {
continue
}
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: 0, // disable retry
CompressionLevel: tc.CompressionLevel,
})
require.NoError(t, err)

generateLoad := func(count int) {
for i := 0; i < count; i++ {
require.NoError(t, indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"foo": "boo",
}),
}))
for k, item := range itemsMap {
result.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = "for testing"
itemsMap[k] = item
}
}
itemCount := 100
generateLoad(itemCount)
stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(itemCount/2), stat.Indexed)
require.Equal(t, itemCount/2, len(stat.FailedDocs))
require.Equal(t, int64(indexer.BytesUncompressedFlushed()/2), stat.FlushedUncompressedSucceeded)
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
})
require.NoError(t, err)

generateLoad := func(count int) {
for i := 0; i < count; i++ {
require.NoError(t, indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"foo": "boo",
}),
}))
}
}
itemCount := 100
generateLoad(itemCount)
stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(itemCount/2), stat.Indexed)
require.Equal(t, itemCount/2, len(stat.FailedDocs))
require.Equal(t, int64(indexer.BytesUncompressedFlushed()/2), stat.FlushedUncompressedBytesOK)
})
}

0 comments on commit b696c9a

Please sign in to comment.