diff --git a/bulk_indexer.go b/bulk_indexer.go index 5788f57..41c359d 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -78,6 +78,7 @@ type BulkIndexer struct { itemsAdded int bytesFlushed int bytesUncompFlushed int + pendingUncomBytes int jsonw fastjson.Writer writer *countWriter gzipw *gzip.Writer @@ -203,22 +204,21 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { } else { writer = &b.buf } - b.writer = &countWriter{ - 0, - writer, - } + b.writer = &countWriter{0, writer} return b, nil } // Reset resets bulk indexer, ready for a new request. func (b *BulkIndexer) Reset() { b.bytesFlushed = 0 - b.writer.count = 0 + b.bytesUncompFlushed = 0 } +// resetBuf resets compressed buffer after flushing it to Elasticsearch func (b *BulkIndexer) resetBuf() { b.itemsAdded = 0 b.writer.count = 0 + b.pendingUncomBytes = 0 b.buf.Reset() if b.gzipw != nil { b.gzipw.Reset(&b.buf) @@ -235,6 +235,11 @@ func (b *BulkIndexer) Len() int { return b.buf.Len() } +// Len returns the number of buffered bytes. +func (b *BulkIndexer) UncompressedLen() int { + return b.writer.count + b.pendingUncomBytes +} + // BytesFlushed returns the number of bytes flushed by the bulk indexer. func (b *BulkIndexer) BytesFlushed() int { return b.bytesFlushed @@ -284,6 +289,7 @@ func (b *BulkIndexer) writeMeta(index, documentID string) { // Flush executes a bulk request if there are any items buffered, and clears out the buffer. func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error) { + if b.itemsAdded == 0 { return BulkIndexerResponseStat{}, nil } @@ -314,7 +320,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } bytesFlushed := b.buf.Len() - bytesUncompFlushed := b.writer.count + bytesUncompFlushed := b.writer.count + b.pendingUncomBytes res, err := req.Do(ctx, b.config.Client) if err != nil { b.resetBuf() @@ -409,7 +415,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error if len(buf) == 0 { n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { - return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err) } buf = buf[:n] } @@ -422,7 +428,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error seen += newlines n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { - return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err) } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) @@ -441,7 +447,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error seen += newlines n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { - return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + return resp, fmt.Errorf("failed to read from uncompressed buffer: %w", err) } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) @@ -480,6 +486,11 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error if len(tmp) > 0 { resp.FailedDocs = tmp } + + // record uncompressed bytes written to b.buf + // this should be added to the bytesUncompFlushed on next flush + b.pendingUncomBytes = b.writer.count + b.writer.count = 0 } return resp, nil diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 3ee05a8..d6e0c18 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -79,11 +79,16 @@ func TestBulkIndexer(t *testing.T) { itemCount := 1_000 generateLoad(itemCount) - - // All items should be successfully flushed + uncompressed := indexer.UncompressedLen() + uncompressedDocSize := uncompressed / itemCount stat, err := indexer.Flush(context.Background()) require.NoError(t, err) require.Equal(t, int64(itemCount), stat.Indexed) + require.Equal(t, uncompressed, indexer.BytesUncompFlushed()) + + // nothing is in the buffer if all succeeded + require.Equal(t, 0, indexer.Len()) + require.Equal(t, 0, indexer.UncompressedLen()) // Simulate ES failure, all items should be enqueued for retries esFailing.Store(true) @@ -97,17 +102,26 @@ func TestBulkIndexer(t *testing.T) { require.Equal(t, itemCount, len(stat.FailedDocs)) require.Equal(t, int64(itemCount), stat.RetriedDocs) + // all the flushed bytes are now in the buffer again to be retried + require.Equal(t, indexer.UncompressedLen(), indexer.BytesUncompFlushed()) // Generate more load, all these items should be enqueued for retries generateLoad(10) itemCount += 10 require.Equal(t, itemCount, indexer.Items()) + expectedBufferedSize := indexer.BytesUncompFlushed() + (10 * uncompressedDocSize) + require.Equal(t, expectedBufferedSize, indexer.UncompressedLen()) } + uncompressedSize := indexer.UncompressedLen() // Recover ES and ensure all items are indexed esFailing.Store(false) stat, err = indexer.Flush(context.Background()) require.NoError(t, err) require.Equal(t, int64(itemCount), stat.Indexed) + require.Equal(t, uncompressedSize, indexer.BytesUncompFlushed()) + // no documents to retry so buffer is empty + require.Equal(t, 0, indexer.Len()) + require.Equal(t, 0, indexer.UncompressedLen()) }) } }