diff --git a/appender.go b/appender.go index 49ca5fa..a182d2f 100644 --- a/appender.go +++ b/appender.go @@ -69,6 +69,7 @@ type Appender struct { docsIndexed int64 tooManyRequests int64 bytesTotal int64 + bytesUncompTotal int64 availableBulkRequests int64 activeCreated int64 activeDestroyed int64 @@ -220,6 +221,7 @@ func (a *Appender) Stats() Stats { Indexed: atomic.LoadInt64(&a.docsIndexed), TooManyRequests: atomic.LoadInt64(&a.tooManyRequests), BytesTotal: atomic.LoadInt64(&a.bytesTotal), + BytesUncompTotal: atomic.LoadInt64(&a.bytesUncompTotal), AvailableBulkRequests: atomic.LoadInt64(&a.availableBulkRequests), IndexersActive: a.scalingInformation().activeIndexers, IndexersCreated: atomic.LoadInt64(&a.activeCreated), @@ -318,6 +320,11 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if flushed := bulkIndexer.BytesFlushed(); flushed > 0 { a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal) } + // Record the BulkIndexer buffer's length as the bytesTotal metric after + // the request has been flushed. + if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 { + a.addCount(int64(flushed), &a.bytesUncompTotal, a.metrics.bytesUncompTotal) + } if err != nil { atomic.AddInt64(&a.docsFailed, int64(n)) logger.Error("bulk indexing request failed", zap.Error(err)) @@ -719,6 +726,11 @@ type Stats struct { // which counts bytes at the transport level. BytesTotal int64 + // BytesUncompTotal represents the total number of bytes written to + // the request body before compression. + // The number of bytes written will be equal to BytesTotal if compression is disabled. + BytesUncompTotal int64 + // AvailableBulkRequests represents the number of bulk indexers // available for making bulk index requests. AvailableBulkRequests int64 diff --git a/appender_test.go b/appender_test.go index 3ee76b2..96bc16e 100644 --- a/appender_test.go +++ b/appender_test.go @@ -138,6 +138,7 @@ loop: TooManyRequests: 1, AvailableBulkRequests: 10, BytesTotal: bytesTotal, + BytesUncompTotal: 880, }, stats) var rm metricdata.ResourceMetrics @@ -187,6 +188,8 @@ loop: assertCounter(m, stats.AvailableBulkRequests, indexerAttrs) case "elasticsearch.flushed.bytes": assertCounter(m, stats.BytesTotal, indexerAttrs) + case "elasticsearch.flushed.uncompressed.bytes": + assertCounter(m, stats.BytesUncompTotal, indexerAttrs) case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency": // expect this metric name but no assertions done // as it's histogram and it's checked elsewhere @@ -196,7 +199,7 @@ loop: }) assert.Empty(t, unexpectedMetrics) - assert.Equal(t, int64(6), asserted.Load()) + assert.Equal(t, int64(7), asserted.Load()) assert.Equal(t, 4, processedAsserted) } @@ -311,6 +314,7 @@ func TestAppenderCompressionLevel(t *testing.T) { TooManyRequests: 0, AvailableBulkRequests: 10, BytesTotal: bytesTotal, + BytesUncompTotal: 88, // get this value programmatically? }, stats) } diff --git a/bulk_indexer.go b/bulk_indexer.go index 4d55bb2..c214d34 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -49,16 +49,17 @@ import ( // maximum possible size, based on configuration and throughput. type BulkIndexer struct { - client *elasticsearch.Client - maxDocumentRetry int - itemsAdded int - bytesFlushed int - jsonw fastjson.Writer - writer io.Writer - gzipw *gzip.Writer - copyBuf []byte - buf bytes.Buffer - retryCounts map[int]int + client *elasticsearch.Client + maxDocumentRetry int + itemsAdded int + bytesFlushed int + bytesUncompFlushed int + jsonw fastjson.Writer + writer *countWriter + gzipw *gzip.Writer + copyBuf []byte + buf bytes.Buffer + retryCounts map[int]int } type BulkIndexerResponseStat struct { @@ -137,17 +138,32 @@ func init() { }) } +type countWriter struct { + count int + io.Writer +} + +func (cw *countWriter) Write(p []byte) (int, error) { + cw.count += len(p) + return cw.Writer.Write(p) +} + func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRetry int) *BulkIndexer { b := &BulkIndexer{ client: client, maxDocumentRetry: maxDocRetry, retryCounts: make(map[int]int), } + var writer io.Writer if compressionLevel != gzip.NoCompression { b.gzipw, _ = gzip.NewWriterLevel(&b.buf, compressionLevel) - b.writer = b.gzipw + writer = b.gzipw } else { - b.writer = &b.buf + writer = &b.buf + } + b.writer = &countWriter{ + 0, + writer, } return b } @@ -155,10 +171,12 @@ func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRe // BulkIndexer resets b, ready for a new request. func (b *BulkIndexer) Reset() { b.bytesFlushed = 0 + b.writer.count = 0 } func (b *BulkIndexer) resetBuf() { b.itemsAdded = 0 + b.writer.count = 0 b.buf.Reset() if b.gzipw != nil { b.gzipw.Reset(&b.buf) @@ -180,6 +198,11 @@ func (b *BulkIndexer) BytesFlushed() int { return b.bytesFlushed } +// BytesUncompFlushed returns the number of uncompressed bytes flushed by the bulk indexer. +func (b *BulkIndexer) BytesUncompFlushed() int { + return b.bytesUncompFlushed +} + type BulkIndexerItem struct { Index string DocumentID string @@ -248,6 +271,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } bytesFlushed := b.buf.Len() + bytesUncompFlushed := b.writer.count res, err := req.Do(ctx, b.client) if err != nil { b.resetBuf() @@ -262,6 +286,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // Record the number of flushed bytes only when err == nil. The body may // not have been sent otherwise. b.bytesFlushed = bytesFlushed + b.bytesUncompFlushed = bytesUncompFlushed var resp BulkIndexerResponseStat if res.IsError() { if res.StatusCode == http.StatusTooManyRequests { diff --git a/metric.go b/metric.go index b39216e..89424ae 100644 --- a/metric.go +++ b/metric.go @@ -33,6 +33,7 @@ type metrics struct { docsActive metric.Int64UpDownCounter docsIndexed metric.Int64Counter bytesTotal metric.Int64Counter + bytesUncompTotal metric.Int64Counter availableBulkRequests metric.Int64UpDownCounter activeCreated metric.Int64Counter activeDestroyed metric.Int64Counter @@ -109,6 +110,12 @@ func newMetrics(cfg Config) (metrics, error) { unit: "by", p: &ms.bytesTotal, }, + { + name: "elasticsearch.flushed.uncompressed.bytes", + description: "The total number of uncompressed bytes written to the request body", + unit: "by", + p: &ms.bytesUncompTotal, + }, { name: "elasticsearch.indexer.created", description: "The number of active indexer creations.",