Skip to content

Commit

Permalink
feat: track uncompressed data
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed Apr 23, 2024
1 parent 965e904 commit e0ff384
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
12 changes: 12 additions & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Appender struct {
docsIndexed int64
tooManyRequests int64
bytesTotal int64
bytesUncompTotal int64
availableBulkRequests int64
activeCreated int64
activeDestroyed int64
Expand Down Expand Up @@ -220,6 +221,7 @@ func (a *Appender) Stats() Stats {
Indexed: atomic.LoadInt64(&a.docsIndexed),
TooManyRequests: atomic.LoadInt64(&a.tooManyRequests),
BytesTotal: atomic.LoadInt64(&a.bytesTotal),
BytesUncompTotal: atomic.LoadInt64(&a.bytesUncompTotal),
AvailableBulkRequests: atomic.LoadInt64(&a.availableBulkRequests),
IndexersActive: a.scalingInformation().activeIndexers,
IndexersCreated: atomic.LoadInt64(&a.activeCreated),
Expand Down Expand Up @@ -318,6 +320,11 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
if flushed := bulkIndexer.BytesFlushed(); flushed > 0 {
a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal)
}
// Record the BulkIndexer buffer's length as the bytesTotal metric after
// the request has been flushed.
if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 {

Check failure on line 325 in appender.go

View workflow job for this annotation

GitHub Actions / lint

bulkIndexer.BytesUncompressedFlushed undefined (type *BulkIndexer has no field or method BytesUncompressedFlushed) (compile)

Check failure on line 325 in appender.go

View workflow job for this annotation

GitHub Actions / run-benchdiff

bulkIndexer.BytesUncompressedFlushed undefined (type *BulkIndexer has no field or method BytesUncompressedFlushed)

Check failure on line 325 in appender.go

View workflow job for this annotation

GitHub Actions / run-benchdiff

bulkIndexer.BytesUncompressedFlushed undefined (type *BulkIndexer has no field or method BytesUncompressedFlushed)

Check failure on line 325 in appender.go

View workflow job for this annotation

GitHub Actions / run-tests

bulkIndexer.BytesUncompressedFlushed undefined (type *BulkIndexer has no field or method BytesUncompressedFlushed)
a.addCount(int64(flushed), &a.bytesUncompTotal, a.metrics.bytesUncompTotal)
}
if err != nil {
atomic.AddInt64(&a.docsFailed, int64(n))
logger.Error("bulk indexing request failed", zap.Error(err))
Expand Down Expand Up @@ -719,6 +726,11 @@ type Stats struct {
// which counts bytes at the transport level.
BytesTotal int64

// BytesUncompTotal represents the total number of bytes written to
// the request body before compression.
// The number of bytes written will be equal to BytesTotal if compression is disabled.
BytesUncompTotal int64

// AvailableBulkRequests represents the number of bulk indexers
// available for making bulk index requests.
AvailableBulkRequests int64
Expand Down
6 changes: 5 additions & 1 deletion appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ loop:
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompTotal: 880,
}, stats)

var rm metricdata.ResourceMetrics
Expand Down Expand Up @@ -187,6 +188,8 @@ loop:
assertCounter(m, stats.AvailableBulkRequests, indexerAttrs)
case "elasticsearch.flushed.bytes":
assertCounter(m, stats.BytesTotal, indexerAttrs)
case "elasticsearch.flushed.uncompressed.bytes":
assertCounter(m, stats.BytesUncompTotal, indexerAttrs)
case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
Expand All @@ -196,7 +199,7 @@ loop:
})

assert.Empty(t, unexpectedMetrics)
assert.Equal(t, int64(6), asserted.Load())
assert.Equal(t, int64(7), asserted.Load())
assert.Equal(t, 4, processedAsserted)
}

Expand Down Expand Up @@ -311,6 +314,7 @@ func TestAppenderCompressionLevel(t *testing.T) {
TooManyRequests: 0,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompTotal: 88, // get this value programmatically?
}, stats)
}

Expand Down
49 changes: 37 additions & 12 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ import (
// maximum possible size, based on configuration and throughput.

type BulkIndexer struct {
client *elasticsearch.Client
maxDocumentRetry int
itemsAdded int
bytesFlushed int
jsonw fastjson.Writer
writer io.Writer
gzipw *gzip.Writer
copyBuf []byte
buf bytes.Buffer
retryCounts map[int]int
client *elasticsearch.Client
maxDocumentRetry int
itemsAdded int
bytesFlushed int
bytesUncompFlushed int
jsonw fastjson.Writer
writer *countWriter
gzipw *gzip.Writer
copyBuf []byte
buf bytes.Buffer
retryCounts map[int]int
}

type BulkIndexerResponseStat struct {
Expand Down Expand Up @@ -137,28 +138,45 @@ func init() {
})
}

type countWriter struct {
count int
io.Writer
}

func (cw *countWriter) Write(p []byte) (int, error) {
cw.count += len(p)
return cw.Writer.Write(p)
}

func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRetry int) *BulkIndexer {
b := &BulkIndexer{
client: client,
maxDocumentRetry: maxDocRetry,
retryCounts: make(map[int]int),
}
var writer io.Writer
if compressionLevel != gzip.NoCompression {
b.gzipw, _ = gzip.NewWriterLevel(&b.buf, compressionLevel)
b.writer = b.gzipw
writer = b.gzipw
} else {
b.writer = &b.buf
writer = &b.buf
}
b.writer = &countWriter{
0,
writer,
}
return b
}

// BulkIndexer resets b, ready for a new request.
func (b *BulkIndexer) Reset() {
b.bytesFlushed = 0
b.writer.count = 0
}

func (b *BulkIndexer) resetBuf() {
b.itemsAdded = 0
b.writer.count = 0
b.buf.Reset()
if b.gzipw != nil {
b.gzipw.Reset(&b.buf)
Expand All @@ -180,6 +198,11 @@ func (b *BulkIndexer) BytesFlushed() int {
return b.bytesFlushed
}

// BytesUncompFlushed returns the number of uncompressed bytes flushed by the bulk indexer.
func (b *BulkIndexer) BytesUncompFlushed() int {
return b.bytesUncompFlushed
}

type BulkIndexerItem struct {
Index string
DocumentID string
Expand Down Expand Up @@ -248,6 +271,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
}

bytesFlushed := b.buf.Len()
bytesUncompFlushed := b.writer.count
res, err := req.Do(ctx, b.client)
if err != nil {
b.resetBuf()
Expand All @@ -262,6 +286,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// Record the number of flushed bytes only when err == nil. The body may
// not have been sent otherwise.
b.bytesFlushed = bytesFlushed
b.bytesUncompFlushed = bytesUncompFlushed
var resp BulkIndexerResponseStat
if res.IsError() {
if res.StatusCode == http.StatusTooManyRequests {
Expand Down
7 changes: 7 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type metrics struct {
docsActive metric.Int64UpDownCounter
docsIndexed metric.Int64Counter
bytesTotal metric.Int64Counter
bytesUncompTotal metric.Int64Counter
availableBulkRequests metric.Int64UpDownCounter
activeCreated metric.Int64Counter
activeDestroyed metric.Int64Counter
Expand Down Expand Up @@ -109,6 +110,12 @@ func newMetrics(cfg Config) (metrics, error) {
unit: "by",
p: &ms.bytesTotal,
},
{
name: "elasticsearch.flushed.uncompressed.bytes",
description: "The total number of uncompressed bytes written to the request body",
unit: "by",
p: &ms.bytesUncompTotal,
},
{
name: "elasticsearch.indexer.created",
description: "The number of active indexer creations.",
Expand Down

0 comments on commit e0ff384

Please sign in to comment.