diff --git a/appender.go b/appender.go index ce61972..52c4561 100644 --- a/appender.go +++ b/appender.go @@ -25,7 +25,6 @@ import ( "math" "net/http" "runtime" - "strings" "sync" "sync/atomic" "time" @@ -149,11 +148,12 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { available := make(chan *BulkIndexer, cfg.MaxRequests) for i := 0; i < cfg.MaxRequests; i++ { bi, err := NewBulkIndexer(BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: cfg.MaxDocumentRetries, - RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, - CompressionLevel: cfg.CompressionLevel, - Pipeline: cfg.Pipeline, + Client: client, + MaxDocumentRetries: cfg.MaxDocumentRetries, + RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, + CompressionLevel: cfg.CompressionLevel, + Pipeline: cfg.Pipeline, + CaptureFullErrorReason: cfg.CaptureFullErrorReason, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %w", err) @@ -385,12 +385,6 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if info.Status >= 500 { serverFailed++ } - if !a.config.CaptureFullErrorReason { - // Match Elasticsearch field mapper field value: - // failed to parse field [%s] of type [%s] in %s. Preview of field's value: '%s' - // https://github.com/elastic/elasticsearch/blob/588eabe185ad319c0268a13480465966cef058cd/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java#L234 - info.Error.Reason, _, _ = strings.Cut(info.Error.Reason, ". Preview") - } info.Position = 0 // reset position so that the response item can be used as key in the map failedCount[info]++ if a.tracingEnabled() { diff --git a/bulk_indexer.go b/bulk_indexer.go index 3a53fb1..5569753 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -25,6 +25,7 @@ import ( "io" "net/http" "slices" + "strings" "unsafe" "github.com/klauspost/compress/gzip" @@ -70,6 +71,10 @@ type BulkIndexerConfig struct { // // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. Pipeline string + + // CaptureFullErrorReason enables the logger to collect the full error.reason + // returned by Elasticsearch when failed to index. default to false + CaptureFullErrorReason bool } // BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use @@ -345,11 +350,20 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } return resp, fmt.Errorf("flush failed: %s", res.String()) } - - if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil { + decoder := jsoniter.NewDecoder(res.Body) + if err := decoder.Decode(&resp); err != nil { return resp, fmt.Errorf("error decoding bulk response: %w", err) } + if !b.config.CaptureFullErrorReason { + for i, f := range resp.FailedDocs { + // Match Elasticsearch field mapper field value: + // failed to parse field [%s] of type [%s] in %s. Preview of field's value: '%s' + // https://github.com/elastic/elasticsearch/blob/588eabe185ad319c0268a13480465966cef058cd/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java#L234 + resp.FailedDocs[i].Error.Reason, _, _ = strings.Cut(f.Error.Reason, ". Preview") + } + } + // Only run the retry logic if document retries are enabled if b.config.MaxDocumentRetries > 0 { buf := make([]byte, 0, 4096) diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 8aa9478..e771388 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -128,3 +128,58 @@ func TestBulkIndexer(t *testing.T) { }) } } + +func TestLogErrorReason(t *testing.T) { + errMsg := "error_reason_invalid. Preview of field's value: 'failed to parse value'" + tests := []struct { + name string + fullReason bool + message string + }{ + { + name: "redact by default", + message: "error_reason_invalid", + }, + { + name: "return full error reason", + fullReason: true, + message: "error_reason_invalid. Preview of field's value: 'failed to parse value'", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + for _, itemsMap := range result.Items { + for k, item := range itemsMap { + result.HasErrors = true + item.Status = http.StatusBadRequest + item.Index = "an_index" + item.Error.Type = "error_type" + item.Error.Reason = errMsg + itemsMap[k] = item + } + } + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + CaptureFullErrorReason: tc.fullReason, + }) + require.NoError(t, err) + + require.NoError(t, indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + })) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, len(stat.FailedDocs)) + require.Equal(t, tc.message, stat.FailedDocs[0].Error.Reason) + }) + } +}