Skip to content

Commit

Permalink
fix: redact the error message in bulk indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed May 3, 2024
1 parent 3251a77 commit 05c2ba3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
18 changes: 6 additions & 12 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"math"
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -149,11 +148,12 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) {
available := make(chan *BulkIndexer, cfg.MaxRequests)
for i := 0; i < cfg.MaxRequests; i++ {
bi, err := NewBulkIndexer(BulkIndexerConfig{
Client: client,
MaxDocumentRetries: cfg.MaxDocumentRetries,
RetryOnDocumentStatus: cfg.RetryOnDocumentStatus,
CompressionLevel: cfg.CompressionLevel,
Pipeline: cfg.Pipeline,
Client: client,
MaxDocumentRetries: cfg.MaxDocumentRetries,
RetryOnDocumentStatus: cfg.RetryOnDocumentStatus,
CompressionLevel: cfg.CompressionLevel,
Pipeline: cfg.Pipeline,
CaptureFullErrorReason: cfg.CaptureFullErrorReason,
})
if err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %w", err)
Expand Down Expand Up @@ -385,12 +385,6 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
if info.Status >= 500 {
serverFailed++
}
if !a.config.CaptureFullErrorReason {
// Match Elasticsearch field mapper field value:
// failed to parse field [%s] of type [%s] in %s. Preview of field's value: '%s'
// https://github.com/elastic/elasticsearch/blob/588eabe185ad319c0268a13480465966cef058cd/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java#L234
info.Error.Reason, _, _ = strings.Cut(info.Error.Reason, ". Preview")
}
info.Position = 0 // reset position so that the response item can be used as key in the map
failedCount[info]++
if a.tracingEnabled() {
Expand Down
18 changes: 16 additions & 2 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"net/http"
"slices"
"strings"
"unsafe"

"github.com/klauspost/compress/gzip"
Expand Down Expand Up @@ -70,6 +71,10 @@ type BulkIndexerConfig struct {
//
// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
Pipeline string

// CaptureFullErrorReason enables the logger to collect the full error.reason
// returned by Elasticsearch when failed to index. default to false
CaptureFullErrorReason bool
}

// BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use
Expand Down Expand Up @@ -345,11 +350,20 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
}
return resp, fmt.Errorf("flush failed: %s", res.String())
}

if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil {
decoder := jsoniter.NewDecoder(res.Body)
if err := decoder.Decode(&resp); err != nil {
return resp, fmt.Errorf("error decoding bulk response: %w", err)
}

if !b.config.CaptureFullErrorReason {
for i, f := range resp.FailedDocs {
// Match Elasticsearch field mapper field value:
// failed to parse field [%s] of type [%s] in %s. Preview of field's value: '%s'
// https://github.com/elastic/elasticsearch/blob/588eabe185ad319c0268a13480465966cef058cd/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java#L234
resp.FailedDocs[i].Error.Reason, _, _ = strings.Cut(f.Error.Reason, ". Preview")
}
}

// Only run the retry logic if document retries are enabled
if b.config.MaxDocumentRetries > 0 {
buf := make([]byte, 0, 4096)
Expand Down
55 changes: 55 additions & 0 deletions bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,58 @@ func TestBulkIndexer(t *testing.T) {
})
}
}

func TestLogErrorReason(t *testing.T) {
errMsg := "error_reason_invalid. Preview of field's value: 'failed to parse value'"
tests := []struct {
name string
fullReason bool
message string
}{
{
name: "redact by default",
message: "error_reason_invalid",
},
{
name: "return full error reason",
fullReason: true,
message: "error_reason_invalid. Preview of field's value: 'failed to parse value'",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result := docappendertest.DecodeBulkRequest(r)
for _, itemsMap := range result.Items {
for k, item := range itemsMap {
result.HasErrors = true
item.Status = http.StatusBadRequest
item.Index = "an_index"
item.Error.Type = "error_type"
item.Error.Reason = errMsg
itemsMap[k] = item
}
}
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
CaptureFullErrorReason: tc.fullReason,
})
require.NoError(t, err)

require.NoError(t, indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
}),
}))

stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, 1, len(stat.FailedDocs))
require.Equal(t, tc.message, stat.FailedDocs[0].Error.Reason)
})
}
}

0 comments on commit 05c2ba3

Please sign in to comment.