Skip to content

Commit

Permalink
test: add tests for bulk_indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed Apr 26, 2024
1 parent 29645a4 commit 01613c9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
29 changes: 20 additions & 9 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type BulkIndexer struct {
itemsAdded int
bytesFlushed int
bytesUncompFlushed int
pendingUncomBytes int
jsonw fastjson.Writer
writer *countWriter
gzipw *gzip.Writer
Expand Down Expand Up @@ -203,22 +204,21 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) {
} else {
writer = &b.buf
}
b.writer = &countWriter{
0,
writer,
}
b.writer = &countWriter{0, writer}
return b, nil
}

// Reset resets bulk indexer, ready for a new request.
func (b *BulkIndexer) Reset() {
b.bytesFlushed = 0
b.writer.count = 0
b.bytesUncompFlushed = 0
}

// resetBuf resets compressed buffer after flushing it to Elasticsearch
func (b *BulkIndexer) resetBuf() {
b.itemsAdded = 0
b.writer.count = 0
b.pendingUncomBytes = 0
b.buf.Reset()
if b.gzipw != nil {
b.gzipw.Reset(&b.buf)
Expand All @@ -235,6 +235,11 @@ func (b *BulkIndexer) Len() int {
return b.buf.Len()
}

// Len returns the number of buffered bytes.
func (b *BulkIndexer) UncompressedLen() int {
return b.writer.count + b.pendingUncomBytes
}

// BytesFlushed returns the number of bytes flushed by the bulk indexer.
func (b *BulkIndexer) BytesFlushed() int {
return b.bytesFlushed
Expand Down Expand Up @@ -284,6 +289,7 @@ func (b *BulkIndexer) writeMeta(index, documentID string) {

// Flush executes a bulk request if there are any items buffered, and clears out the buffer.
func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error) {

if b.itemsAdded == 0 {
return BulkIndexerResponseStat{}, nil
}
Expand Down Expand Up @@ -314,7 +320,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
}

bytesFlushed := b.buf.Len()
bytesUncompFlushed := b.writer.count
bytesUncompFlushed := b.writer.count + b.pendingUncomBytes
res, err := req.Do(ctx, b.config.Client)
if err != nil {
b.resetBuf()
Expand Down Expand Up @@ -409,7 +415,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
if len(buf) == 0 {
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err)
}
buf = buf[:n]
}
Expand All @@ -422,7 +428,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
seen += newlines
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
Expand All @@ -441,7 +447,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
seen += newlines
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
Expand Down Expand Up @@ -480,6 +486,11 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
if len(tmp) > 0 {
resp.FailedDocs = tmp
}

// record uncompressed bytes written to b.buf
// this should be added to the bytesUncompFlushed on next flush
b.pendingUncomBytes = b.writer.count
b.writer.count = 0
}

return resp, nil
Expand Down
18 changes: 16 additions & 2 deletions bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,16 @@ func TestBulkIndexer(t *testing.T) {

itemCount := 1_000
generateLoad(itemCount)

// All items should be successfully flushed
uncompressed := indexer.UncompressedLen()
uncompressedDocSize := uncompressed / itemCount
stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(itemCount), stat.Indexed)
require.Equal(t, uncompressed, indexer.BytesUncompFlushed())

// nothing is in the buffer if all succeeded
require.Equal(t, 0, indexer.Len())
require.Equal(t, 0, indexer.UncompressedLen())

// Simulate ES failure, all items should be enqueued for retries
esFailing.Store(true)
Expand All @@ -97,17 +102,26 @@ func TestBulkIndexer(t *testing.T) {
require.Equal(t, itemCount, len(stat.FailedDocs))
require.Equal(t, int64(itemCount), stat.RetriedDocs)

// all the flushed bytes are now in the buffer again to be retried
require.Equal(t, indexer.UncompressedLen(), indexer.BytesUncompFlushed())
// Generate more load, all these items should be enqueued for retries
generateLoad(10)
itemCount += 10
require.Equal(t, itemCount, indexer.Items())
expectedBufferedSize := indexer.BytesUncompFlushed() + (10 * uncompressedDocSize)
require.Equal(t, expectedBufferedSize, indexer.UncompressedLen())
}

uncompressedSize := indexer.UncompressedLen()
// Recover ES and ensure all items are indexed
esFailing.Store(false)
stat, err = indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(itemCount), stat.Indexed)
require.Equal(t, uncompressedSize, indexer.BytesUncompFlushed())
// no documents to retry so buffer is empty
require.Equal(t, 0, indexer.Len())
require.Equal(t, 0, indexer.UncompressedLen())
})
}
}

0 comments on commit 01613c9

Please sign in to comment.