From b4f85fcc8a0414eaaa5102b5e9cc41b5ab29979b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 5 Apr 2024 10:14:34 +0200 Subject: [PATCH 01/11] build(deps): bump golang.org/x/sync from 0.6.0 to 0.7.0 (#144) Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.6.0 to 0.7.0. - [Commits](https://github.com/golang/sync/compare/v0.6.0...v0.7.0) --- updated-dependencies: - dependency-name: golang.org/x/sync dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c658722..bcc8c5b 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/sdk/metric v1.24.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.6.0 + golang.org/x/sync v0.7.0 ) require ( diff --git a/go.sum b/go.sum index b76f05b..bbcd66a 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,8 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From 80aee67a57385a687685089166200cb59a4b87b1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 12:42:00 +0200 Subject: [PATCH 02/11] build(deps): bump the otel group with 3 updates (#145) Bumps the otel group with 3 updates: [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go), [go.opentelemetry.io/otel/metric](https://github.com/open-telemetry/opentelemetry-go) and [go.opentelemetry.io/otel/sdk/metric](https://github.com/open-telemetry/opentelemetry-go). Updates `go.opentelemetry.io/otel` from 1.24.0 to 1.25.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.24.0...v1.25.0) Updates `go.opentelemetry.io/otel/metric` from 1.24.0 to 1.25.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.24.0...v1.25.0) Updates `go.opentelemetry.io/otel/sdk/metric` from 1.24.0 to 1.25.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.24.0...v1.25.0) --- updated-dependencies: - dependency-name: go.opentelemetry.io/otel dependency-type: direct:production update-type: version-update:semver-minor dependency-group: otel - dependency-name: go.opentelemetry.io/otel/metric dependency-type: direct:production update-type: version-update:semver-minor dependency-group: otel - dependency-name: go.opentelemetry.io/otel/sdk/metric dependency-type: direct:production update-type: version-update:semver-minor dependency-group: otel ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 12 ++++++------ go.sum | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index bcc8c5b..69c7022 100644 --- a/go.mod +++ b/go.mod @@ -11,9 +11,9 @@ require ( go.elastic.co/apm/module/apmzap/v2 v2.5.0 go.elastic.co/apm/v2 v2.5.0 go.elastic.co/fastjson v1.3.0 - go.opentelemetry.io/otel v1.24.0 - go.opentelemetry.io/otel/metric v1.24.0 - go.opentelemetry.io/otel/sdk/metric v1.24.0 + go.opentelemetry.io/otel v1.25.0 + go.opentelemetry.io/otel/metric v1.25.0 + go.opentelemetry.io/otel/sdk/metric v1.25.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.7.0 ) @@ -34,10 +34,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.5.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect + go.opentelemetry.io/otel/sdk v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/sys v0.18.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index bbcd66a..9588865 100644 --- a/go.sum +++ b/go.sum @@ -61,16 +61,16 @@ go.elastic.co/apm/v2 v2.5.0 h1:UYqdu/bjcubcP9BIy5+os2ExRzw03yOQFG+sRGGhVlQ= go.elastic.co/apm/v2 v2.5.0/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so= go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= -go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= @@ -84,8 +84,8 @@ golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= From dbedb28132fc1eff08f60bbb7254182c1df2013d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 13:45:26 +0200 Subject: [PATCH 03/11] build(deps): bump github.com/klauspost/compress from 1.17.7 to 1.17.8 (#147) Bumps [github.com/klauspost/compress](https://github.com/klauspost/compress) from 1.17.7 to 1.17.8. - [Release notes](https://github.com/klauspost/compress/releases) - [Changelog](https://github.com/klauspost/compress/blob/master/.goreleaser.yml) - [Commits](https://github.com/klauspost/compress/compare/v1.17.7...v1.17.8) --- updated-dependencies: - dependency-name: github.com/klauspost/compress dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 69c7022..e3e38d3 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/elastic/go-elasticsearch/v8 v8.13.0 github.com/json-iterator/go v1.1.12 - github.com/klauspost/compress v1.17.7 + github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.9.0 go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 go.elastic.co/apm/module/apmzap/v2 v2.5.0 diff --git a/go.sum b/go.sum index 9588865..fbe0546 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= From a418a3fa8f51921fa6c6d2b6911ed524c0e5da10 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Apr 2024 08:44:18 +0200 Subject: [PATCH 04/11] build(deps): bump github.com/elastic/go-elasticsearch/v8 (#152) Bumps [github.com/elastic/go-elasticsearch/v8](https://github.com/elastic/go-elasticsearch) from 8.13.0 to 8.13.1. - [Release notes](https://github.com/elastic/go-elasticsearch/releases) - [Changelog](https://github.com/elastic/go-elasticsearch/blob/main/CHANGELOG.md) - [Commits](https://github.com/elastic/go-elasticsearch/compare/v8.13.0...v8.13.1) --- updated-dependencies: - dependency-name: github.com/elastic/go-elasticsearch/v8 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index e3e38d3..2112fe8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/elastic/go-docappender go 1.21 require ( - github.com/elastic/go-elasticsearch/v8 v8.13.0 + github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index fbe0546..41131bf 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-elasticsearch/v8 v8.13.0 h1:YXPAWpvbYX0mWSNG9tnEpvs4h1stgMy5JUeKZECYYB8= -github.com/elastic/go-elasticsearch/v8 v8.13.0/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= +github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= +github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= From 31a3978ba8f93c347ddbf193a9a94131d07f8952 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Apr 2024 08:44:36 +0200 Subject: [PATCH 05/11] build(deps): bump the go-agent group with 3 updates (#151) Bumps the go-agent group with 3 updates: [go.elastic.co/apm/module/apmelasticsearch/v2](https://github.com/elastic/apm-agent-go), [go.elastic.co/apm/module/apmzap/v2](https://github.com/elastic/apm-agent-go) and [go.elastic.co/apm/v2](https://github.com/elastic/apm-agent-go). Updates `go.elastic.co/apm/module/apmelasticsearch/v2` from 2.5.0 to 2.6.0 - [Release notes](https://github.com/elastic/apm-agent-go/releases) - [Changelog](https://github.com/elastic/apm-agent-go/blob/main/CHANGELOG.asciidoc) - [Commits](https://github.com/elastic/apm-agent-go/compare/v2.5.0...v2.6.0) Updates `go.elastic.co/apm/module/apmzap/v2` from 2.5.0 to 2.6.0 - [Release notes](https://github.com/elastic/apm-agent-go/releases) - [Changelog](https://github.com/elastic/apm-agent-go/blob/main/CHANGELOG.asciidoc) - [Commits](https://github.com/elastic/apm-agent-go/compare/v2.5.0...v2.6.0) Updates `go.elastic.co/apm/v2` from 2.5.0 to 2.6.0 - [Release notes](https://github.com/elastic/apm-agent-go/releases) - [Changelog](https://github.com/elastic/apm-agent-go/blob/main/CHANGELOG.asciidoc) - [Commits](https://github.com/elastic/apm-agent-go/compare/v2.5.0...v2.6.0) --- updated-dependencies: - dependency-name: go.elastic.co/apm/module/apmelasticsearch/v2 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-agent - dependency-name: go.elastic.co/apm/module/apmzap/v2 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-agent - dependency-name: go.elastic.co/apm/v2 dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-agent ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 2112fe8..c4eeb37 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.9.0 - go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 - go.elastic.co/apm/module/apmzap/v2 v2.5.0 - go.elastic.co/apm/v2 v2.5.0 + go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 + go.elastic.co/apm/module/apmzap/v2 v2.6.0 + go.elastic.co/apm/v2 v2.6.0 go.elastic.co/fastjson v1.3.0 go.opentelemetry.io/otel v1.25.0 go.opentelemetry.io/otel/metric v1.25.0 @@ -33,7 +33,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - go.elastic.co/apm/module/apmhttp/v2 v2.5.0 // indirect + go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect go.opentelemetry.io/otel/sdk v1.25.0 // indirect go.opentelemetry.io/otel/trace v1.25.0 // indirect go.uber.org/multierr v1.10.0 // indirect diff --git a/go.sum b/go.sum index 41131bf..4882589 100644 --- a/go.sum +++ b/go.sum @@ -51,14 +51,14 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 h1:0S5Vj5/L4EkXQS7YUr+1ylTuB3njTuBNzdmn3mjXAFI= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0/go.mod h1:zNEXwAPoThH/bAb3TWKD5Og0Zyk0OWURsEHAja1kra4= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0 h1:4AWlw8giL7hRYBQiwF1/Thm0GDsbQH/Ofe4eySAnURo= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0/go.mod h1:ZP7gLEzY/OAPTqNZjp8AzA06HF82zfwXEpKI2sSVTgk= -go.elastic.co/apm/module/apmzap/v2 v2.5.0 h1:COXqVte4i75XQmV+H4m4g+2JubK3Y1WRIzY/ppKa3bQ= -go.elastic.co/apm/module/apmzap/v2 v2.5.0/go.mod h1:PHKFbSROQPFZ2+X3oZyaF8lie5DhK0gtcRMpz//S54g= -go.elastic.co/apm/v2 v2.5.0 h1:UYqdu/bjcubcP9BIy5+os2ExRzw03yOQFG+sRGGhVlQ= -go.elastic.co/apm/v2 v2.5.0/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= +go.elastic.co/apm/module/apmzap/v2 v2.6.0 h1:R/iVORzGu3F9uM43iEVHD0nwiRo59O0bIXdayKsgayQ= +go.elastic.co/apm/module/apmzap/v2 v2.6.0/go.mod h1:B3i/8xRkqLgi6zNuV+Bp7Pt4cutaOObvrVSa7wUTAPw= +go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY= +go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= From 6c2adf353ff176c4d4fe218d8e015fa094ad7e5c Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:33:11 +0200 Subject: [PATCH 06/11] feat: increase retry temp buffer size to 4096 (#153) --- bulk_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index 4d55bb2..e1aaee1 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -276,7 +276,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // Only run the retry logic if document retries are enabled if b.maxDocumentRetry > 0 { - buf := make([]byte, 0, 1024) + buf := make([]byte, 0, 4096) // Eliminate previous retry counts that aren't present in the bulk // request response. From b1a51d43a8cbcc30bca793d8f18713fc2359605f Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 17 Apr 2024 12:56:30 +0100 Subject: [PATCH 07/11] Accept elasticsearch clients other than v8 (#146) Make New* accept esapi.Transport instead of v8 implementation. This enables the use of v8 api on any Transport, e.g. v7 client in addition to existing v8 client. This should be safe enough for our very limited use case of bulk request with e.g. v7 client. Add integration tests for v7. --- appender.go | 5 +- bulk_indexer.go | 7 +- .../appender_integration_test.go | 91 +++++++++++++++---- integrationtest/go.mod | 38 ++++++++ integrationtest/go.sum | 91 +++++++++++++++++++ 5 files changed, 207 insertions(+), 25 deletions(-) rename appender_integration_test.go => integrationtest/appender_integration_test.go (50%) create mode 100644 integrationtest/go.mod create mode 100644 integrationtest/go.sum diff --git a/appender.go b/appender.go index 49ca5fa..feaf22f 100644 --- a/appender.go +++ b/appender.go @@ -29,7 +29,7 @@ import ( "sync/atomic" "time" - "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" "go.elastic.co/apm/module/apmzap/v2" "go.elastic.co/apm/v2" "go.opentelemetry.io/otel/attribute" @@ -87,7 +87,8 @@ type Appender struct { } // New returns a new Appender that indexes documents into Elasticsearch. -func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { +// It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. +func New(client esapi.Transport, cfg Config) (*Appender, error) { if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { return nil, fmt.Errorf( "expected CompressionLevel in range [-1,9], got %d", diff --git a/bulk_indexer.go b/bulk_indexer.go index e1aaee1..e1ba48c 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -30,7 +30,6 @@ import ( "github.com/klauspost/compress/gzip" "go.elastic.co/fastjson" - "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" jsoniter "github.com/json-iterator/go" ) @@ -49,7 +48,7 @@ import ( // maximum possible size, based on configuration and throughput. type BulkIndexer struct { - client *elasticsearch.Client + client esapi.Transport maxDocumentRetry int itemsAdded int bytesFlushed int @@ -137,7 +136,9 @@ func init() { }) } -func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRetry int) *BulkIndexer { +// NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. +// It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. +func NewBulkIndexer(client esapi.Transport, compressionLevel int, maxDocRetry int) *BulkIndexer { b := &BulkIndexer{ client: client, maxDocumentRetry: maxDocRetry, diff --git a/appender_integration_test.go b/integrationtest/appender_integration_test.go similarity index 50% rename from appender_integration_test.go rename to integrationtest/appender_integration_test.go index e3dc0fb..84e758e 100644 --- a/appender_integration_test.go +++ b/integrationtest/appender_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package docappender_test +package integrationtest import ( "bytes" @@ -29,59 +29,110 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/go-docappender/docappendertest" - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/elastic/go-docappender" + "github.com/elastic/go-docappender/docappendertest" + elasticsearch7 "github.com/elastic/go-elasticsearch/v7" + esapi7 "github.com/elastic/go-elasticsearch/v7/esapi" + elasticsearch8 "github.com/elastic/go-elasticsearch/v8" + esapi8 "github.com/elastic/go-elasticsearch/v8/esapi" ) -func TestAppenderIntegration(t *testing.T) { +const N = 100 + +func sendEvents(t *testing.T, indexer *docappender.Appender, index string) { + for i := 0; i < N; i++ { + encoded, err := json.Marshal(map[string]any{"@timestamp": time.Now().Format(docappendertest.TimestampFormat)}) + require.NoError(t, err) + err = indexer.Add(context.Background(), index, bytes.NewReader(encoded)) + require.NoError(t, err) + } + + // Closing the indexer flushes enqueued events. + err := indexer.Close(context.Background()) + require.NoError(t, err) +} + +func TestAppenderIntegrationV8(t *testing.T) { switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { case "1", "true": default: t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") } - config := elasticsearch.Config{} + const index = "logs-generic-testing.v8" + + config := elasticsearch8.Config{} config.Username = "admin" config.Password = "changeme" - client, err := elasticsearch.NewClient(config) + client, err := elasticsearch8.NewClient(config) require.NoError(t, err) indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Second}) require.NoError(t, err) defer indexer.Close(context.Background()) - index := "logs-generic-testing" deleteIndex := func() { - resp, err := esapi.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) + resp, err := esapi8.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) require.NoError(t, err) defer resp.Body.Close() } deleteIndex() defer deleteIndex() - const N = 100 - for i := 0; i < N; i++ { - encoded, err := json.Marshal(map[string]any{"@timestamp": time.Now().Format(docappendertest.TimestampFormat)}) - require.NoError(t, err) - err = indexer.Add(context.Background(), index, bytes.NewReader(encoded)) - require.NoError(t, err) + sendEvents(t, indexer, index) + + // Check that docs are indexed. + resp, err := esapi8.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + resp.Body.Close() + + var result struct { + Count int } + resp, err = esapi8.CountRequest{Index: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, err) + assert.Equal(t, N, result.Count) +} - // Closing the indexer flushes enqueued events. - err = indexer.Close(context.Background()) +func TestAppenderIntegrationV7(t *testing.T) { + switch strings.ToLower(os.Getenv("INTEGRATION_TESTS")) { + case "1", "true": + default: + t.Skip("Skipping integration test, export INTEGRATION_TESTS=1 to run") + } + + const index = "logs-generic-testing.v7" + + config := elasticsearch7.Config{} + config.Username = "admin" + config.Password = "changeme" + client, err := elasticsearch7.NewClient(config) + require.NoError(t, err) + indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Second}) require.NoError(t, err) + defer indexer.Close(context.Background()) + + deleteIndex := func() { + resp, err := esapi7.IndicesDeleteDataStreamRequest{Name: []string{index}}.Do(context.Background(), client) + require.NoError(t, err) + defer resp.Body.Close() + } + deleteIndex() + defer deleteIndex() + + sendEvents(t, indexer, index) // Check that docs are indexed. - resp, err := esapi.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) + resp, err := esapi7.IndicesRefreshRequest{Index: []string{index}}.Do(context.Background(), client) require.NoError(t, err) resp.Body.Close() var result struct { Count int } - resp, err = esapi.CountRequest{Index: []string{index}}.Do(context.Background(), client) + resp, err = esapi7.CountRequest{Index: []string{index}}.Do(context.Background(), client) require.NoError(t, err) defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&result) diff --git a/integrationtest/go.mod b/integrationtest/go.mod new file mode 100644 index 0000000..a827dfb --- /dev/null +++ b/integrationtest/go.mod @@ -0,0 +1,38 @@ +module integrationtest + +go 1.22.0 + +require ( + github.com/elastic/go-docappender v1.1.0 + github.com/elastic/go-elasticsearch/v7 v7.17.10 + github.com/elastic/go-elasticsearch/v8 v8.13.1 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/armon/go-radix v1.0.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.5.0 // indirect + github.com/elastic/go-sysinfo v1.7.1 // indirect + github.com/elastic/go-windows v1.0.1 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/procfs v0.7.3 // indirect + go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 // indirect + go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect + go.elastic.co/apm/v2 v2.6.0 // indirect + go.elastic.co/fastjson v1.3.0 // indirect + go.opentelemetry.io/otel v1.25.0 // indirect + go.opentelemetry.io/otel/metric v1.25.0 // indirect + go.opentelemetry.io/otel/sdk v1.25.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect + golang.org/x/sys v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + howett.net/plist v1.0.0 // indirect +) + +replace github.com/elastic/go-docappender => ../ diff --git a/integrationtest/go.sum b/integrationtest/go.sum new file mode 100644 index 0000000..7cf707f --- /dev/null +++ b/integrationtest/go.sum @@ -0,0 +1,91 @@ +github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= +github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= +github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-docappender v1.1.0 h1:wuA3Im+Y0PuQQ/FzLZUb0+6eT64oLhGCqQV49OvR9EU= +github.com/elastic/go-docappender v1.1.0/go.mod h1:u0hkrzDr9w81uNFWUxeOyM0IX9aZUag/gHlOnHyCrzA= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= +github.com/elastic/go-elasticsearch/v8 v8.13.1/go.mod h1:DIn7HopJs4oZC/w0WoJR13uMUxtHeq92eI5bqv5CRfI= +github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= +github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= +github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= +github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= +github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= +github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= +github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 h1:0S5Vj5/L4EkXQS7YUr+1ylTuB3njTuBNzdmn3mjXAFI= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0/go.mod h1:zNEXwAPoThH/bAb3TWKD5Og0Zyk0OWURsEHAja1kra4= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= +go.elastic.co/apm/module/apmhttp/v2 v2.5.0 h1:4AWlw8giL7hRYBQiwF1/Thm0GDsbQH/Ofe4eySAnURo= +go.elastic.co/apm/module/apmhttp/v2 v2.5.0/go.mod h1:ZP7gLEzY/OAPTqNZjp8AzA06HF82zfwXEpKI2sSVTgk= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= +go.elastic.co/apm/v2 v2.5.0 h1:UYqdu/bjcubcP9BIy5+os2ExRzw03yOQFG+sRGGhVlQ= +go.elastic.co/apm/v2 v2.5.0/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so= +go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= +go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= +go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= +go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= +howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= +howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= From c45c6aa4c5a28bbeabad5127e1a01915561076aa Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 18 Apr 2024 14:21:26 +0100 Subject: [PATCH 08/11] Change NewBulkIndexer signature to accept BulkIndexerConfig with a new Pipeline option; Release v2 (#155) Changes NewBulkIndexer signature to accept a config. The config includes a new Pipeline option, which will then be included in the bulk request. Release v2 as NewBulkIndexer signature changed. --- appender.go | 15 +++- appender_bench_test.go | 4 +- appender_test.go | 34 ++++++++- bulk_indexer.go | 77 ++++++++++++++------ config.go | 5 ++ go.mod | 2 +- integrationtest/appender_integration_test.go | 4 +- integrationtest/go.mod | 5 +- integrationtest/go.sum | 33 +++------ 9 files changed, 125 insertions(+), 54 deletions(-) diff --git a/appender.go b/appender.go index feaf22f..2ed3032 100644 --- a/appender.go +++ b/appender.go @@ -89,6 +89,10 @@ type Appender struct { // New returns a new Appender that indexes documents into Elasticsearch. // It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. func New(client esapi.Transport, cfg Config) (*Appender, error) { + if client == nil { + return nil, errors.New("client is nil") + } + if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { return nil, fmt.Errorf( "expected CompressionLevel in range [-1,9], got %d", @@ -142,7 +146,16 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { } available := make(chan *BulkIndexer, cfg.MaxRequests) for i := 0; i < cfg.MaxRequests; i++ { - available <- NewBulkIndexer(client, cfg.CompressionLevel, cfg.MaxDocumentRetries) + bi, err := NewBulkIndexer(BulkIndexerConfig{ + client: client, + MaxDocumentRetries: cfg.MaxDocumentRetries, + CompressionLevel: cfg.CompressionLevel, + Pipeline: cfg.Pipeline, + }) + if err != nil { + return nil, fmt.Errorf("error creating bulk indexer: %w", err) + } + available <- bi } if cfg.Logger == nil { cfg.Logger = zap.NewNop() diff --git a/appender_bench_test.go b/appender_bench_test.go index 013a73b..e1aac0c 100644 --- a/appender_bench_test.go +++ b/appender_bench_test.go @@ -33,8 +33,8 @@ import ( "go.elastic.co/fastjson" "go.uber.org/zap" - "github.com/elastic/go-docappender" - "github.com/elastic/go-docappender/docappendertest" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" ) func BenchmarkAppender(b *testing.B) { diff --git a/appender_test.go b/appender_test.go index 3ee76b2..301c340 100644 --- a/appender_test.go +++ b/appender_test.go @@ -46,8 +46,8 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "github.com/elastic/go-docappender" - "github.com/elastic/go-docappender/docappendertest" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" "github.com/elastic/go-elasticsearch/v8/esutil" ) @@ -1038,6 +1038,36 @@ func TestAppenderCloseBusyIndexer(t *testing.T) { IndexersActive: 0}, indexer.Stats()) } +func TestAppenderPipeline(t *testing.T) { + const expected = "my_pipeline" + var actual string + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + actual = r.URL.Query().Get("pipeline") + _, result := docappendertest.DecodeBulkRequest(r) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.New(client, docappender.Config{ + FlushInterval: time.Minute, + Pipeline: expected, + }) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + err = indexer.Add(context.Background(), "logs-foo-testing", newJSONReader(map[string]any{ + "@timestamp": time.Unix(123, 456789111).UTC().Format(docappendertest.TimestampFormat), + "data_stream.type": "logs", + "data_stream.dataset": "foo", + "data_stream.namespace": "testing", + })) + require.NoError(t, err) + + // Closing the indexer flushes enqueued documents. + err = indexer.Close(context.Background()) + require.NoError(t, err) + + assert.Equal(t, expected, actual) +} + func TestAppenderScaling(t *testing.T) { newIndexer := func(t *testing.T, cfg docappender.Config) *docappender.Appender { t.Helper() diff --git a/bulk_indexer.go b/bulk_indexer.go index e1ba48c..1a1d1a9 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -20,6 +20,7 @@ package docappender import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -47,17 +48,36 @@ import ( // of concurrent bulk requests. This way we can ensure bulk requests have the // maximum possible size, based on configuration and throughput. +// BulkIndexerConfig holds configuration for BulkIndexer. +type BulkIndexerConfig struct { + // client holds the Elasticsearch client. + client esapi.Transport + + // MaxDocumentRetries holds the maximum number of document retries + MaxDocumentRetries int + + // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) + // to 9 (gzip.BestCompression). Higher values provide greater compression, at a + // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the + // default compression level. + CompressionLevel int + + // Pipeline holds the ingest pipeline ID. + // + // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. + Pipeline string +} + type BulkIndexer struct { - client esapi.Transport - maxDocumentRetry int - itemsAdded int - bytesFlushed int - jsonw fastjson.Writer - writer io.Writer - gzipw *gzip.Writer - copyBuf []byte - buf bytes.Buffer - retryCounts map[int]int + config BulkIndexerConfig + itemsAdded int + bytesFlushed int + jsonw fastjson.Writer + writer io.Writer + gzipw *gzip.Writer + copyBuf []byte + buf bytes.Buffer + retryCounts map[int]int } type BulkIndexerResponseStat struct { @@ -138,22 +158,32 @@ func init() { // NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. // It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. -func NewBulkIndexer(client esapi.Transport, compressionLevel int, maxDocRetry int) *BulkIndexer { +func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { + if cfg.client == nil { + return nil, errors.New("client is nil") + } + + if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { + return nil, fmt.Errorf( + "expected CompressionLevel in range [-1,9], got %d", + cfg.CompressionLevel, + ) + } + b := &BulkIndexer{ - client: client, - maxDocumentRetry: maxDocRetry, - retryCounts: make(map[int]int), + config: cfg, + retryCounts: make(map[int]int), } - if compressionLevel != gzip.NoCompression { - b.gzipw, _ = gzip.NewWriterLevel(&b.buf, compressionLevel) + if cfg.CompressionLevel != gzip.NoCompression { + b.gzipw, _ = gzip.NewWriterLevel(&b.buf, cfg.CompressionLevel) b.writer = b.gzipw } else { b.writer = &b.buf } - return b + return b, nil } -// BulkIndexer resets b, ready for a new request. +// Reset resets bulk indexer, ready for a new request. func (b *BulkIndexer) Reset() { b.bytesFlushed = 0 } @@ -166,7 +196,7 @@ func (b *BulkIndexer) resetBuf() { } } -// Added returns the number of buffered items. +// Items returns the number of buffered items. func (b *BulkIndexer) Items() int { return b.itemsAdded } @@ -230,7 +260,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } } - if b.maxDocumentRetry > 0 { + if b.config.MaxDocumentRetries > 0 { if cap(b.copyBuf) < b.buf.Len() { b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf)) b.copyBuf = b.copyBuf[:cap(b.copyBuf)] @@ -243,13 +273,14 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error Body: &b.buf, Header: make(http.Header), FilterPath: []string{"items.*._index", "items.*.status", "items.*.error.type", "items.*.error.reason"}, + Pipeline: b.config.Pipeline, } if b.gzipw != nil { req.Header.Set("Content-Encoding", "gzip") } bytesFlushed := b.buf.Len() - res, err := req.Do(ctx, b.client) + res, err := req.Do(ctx, b.config.client) if err != nil { b.resetBuf() return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err) @@ -276,7 +307,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } // Only run the retry logic if document retries are enabled - if b.maxDocumentRetry > 0 { + if b.config.MaxDocumentRetries > 0 { buf := make([]byte, 0, 4096) // Eliminate previous retry counts that aren't present in the bulk @@ -327,7 +358,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // Increment 429 count for the positions found. count := b.retryCounts[res.Position] + 1 // check if we are above the maxDocumentRetry setting - if count > b.maxDocumentRetry { + if count > b.config.MaxDocumentRetries { // do not retry, return the document as failed tmp = append(tmp, res) continue diff --git a/config.go b/config.go index 51451d6..62f428d 100644 --- a/config.go +++ b/config.go @@ -80,6 +80,11 @@ type Config struct { // If DocumentBufferSize is zero, the default 1024 will be used. DocumentBufferSize int + // Pipeline holds the ingest pipeline ID. + // + // If Pipeline is empty, no ingest pipeline will be specified in the Bulk request. + Pipeline string + // Scaling configuration for the docappender. // // If unspecified, scaling is enabled by default. diff --git a/go.mod b/go.mod index c4eeb37..b7d2ab4 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/elastic/go-docappender +module github.com/elastic/go-docappender/v2 go 1.21 diff --git a/integrationtest/appender_integration_test.go b/integrationtest/appender_integration_test.go index 84e758e..b02b2a0 100644 --- a/integrationtest/appender_integration_test.go +++ b/integrationtest/appender_integration_test.go @@ -29,8 +29,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/go-docappender" - "github.com/elastic/go-docappender/docappendertest" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" elasticsearch7 "github.com/elastic/go-elasticsearch/v7" esapi7 "github.com/elastic/go-elasticsearch/v7/esapi" elasticsearch8 "github.com/elastic/go-elasticsearch/v8" diff --git a/integrationtest/go.mod b/integrationtest/go.mod index a827dfb..fe1c92c 100644 --- a/integrationtest/go.mod +++ b/integrationtest/go.mod @@ -3,7 +3,7 @@ module integrationtest go 1.22.0 require ( - github.com/elastic/go-docappender v1.1.0 + github.com/elastic/go-docappender/v2 v2.0.0 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/stretchr/testify v1.9.0 @@ -18,6 +18,7 @@ require ( github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect @@ -35,4 +36,4 @@ require ( howett.net/plist v1.0.0 // indirect ) -replace github.com/elastic/go-docappender => ../ +replace github.com/elastic/go-docappender/v2 => ../ diff --git a/integrationtest/go.sum b/integrationtest/go.sum index 7cf707f..8897b9e 100644 --- a/integrationtest/go.sum +++ b/integrationtest/go.sum @@ -1,12 +1,11 @@ github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/elastic/elastic-transport-go/v8 v8.5.0 h1:v5membAl7lvQgBTexPRDBO/RdnlQX+FM9fUVDyXxvH0= github.com/elastic/elastic-transport-go/v8 v8.5.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= -github.com/elastic/go-docappender v1.1.0 h1:wuA3Im+Y0PuQQ/FzLZUb0+6eT64oLhGCqQV49OvR9EU= -github.com/elastic/go-docappender v1.1.0/go.mod h1:u0hkrzDr9w81uNFWUxeOyM0IX9aZUag/gHlOnHyCrzA= github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v8 v8.13.1 h1:du5F8IzUUyCkzxyHdrO9AtopcG95I/qwi2WK8Kf1xlg= @@ -30,8 +29,9 @@ github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -44,39 +44,30 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0 h1:0S5Vj5/L4EkXQS7YUr+1ylTuB3njTuBNzdmn3mjXAFI= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.5.0/go.mod h1:zNEXwAPoThH/bAb3TWKD5Og0Zyk0OWURsEHAja1kra4= +go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2AsfwaUy74TOOYCqkWJow= go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0 h1:4AWlw8giL7hRYBQiwF1/Thm0GDsbQH/Ofe4eySAnURo= -go.elastic.co/apm/module/apmhttp/v2 v2.5.0/go.mod h1:ZP7gLEzY/OAPTqNZjp8AzA06HF82zfwXEpKI2sSVTgk= +go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ= go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0= -go.elastic.co/apm/v2 v2.5.0 h1:UYqdu/bjcubcP9BIy5+os2ExRzw03yOQFG+sRGGhVlQ= -go.elastic.co/apm/v2 v2.5.0/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so= +go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY= go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo= go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs= go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= -go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= -go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 87e9abcd4e95c174ecdc9ad4c9503e83e74e4462 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 18 Apr 2024 15:21:20 +0100 Subject: [PATCH 09/11] Fix BulkIndexerConfig client is not public (#158) BulkIndexerConfig client was not public and therefore not configurable. Fix it. --- appender.go | 2 +- bulk_indexer.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/appender.go b/appender.go index 2ed3032..0dede9c 100644 --- a/appender.go +++ b/appender.go @@ -147,7 +147,7 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { available := make(chan *BulkIndexer, cfg.MaxRequests) for i := 0; i < cfg.MaxRequests; i++ { bi, err := NewBulkIndexer(BulkIndexerConfig{ - client: client, + Client: client, MaxDocumentRetries: cfg.MaxDocumentRetries, CompressionLevel: cfg.CompressionLevel, Pipeline: cfg.Pipeline, diff --git a/bulk_indexer.go b/bulk_indexer.go index 1a1d1a9..958c8f0 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -50,8 +50,8 @@ import ( // BulkIndexerConfig holds configuration for BulkIndexer. type BulkIndexerConfig struct { - // client holds the Elasticsearch client. - client esapi.Transport + // Client holds the Elasticsearch client. + Client esapi.Transport // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int @@ -159,7 +159,7 @@ func init() { // NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. // It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { - if cfg.client == nil { + if cfg.Client == nil { return nil, errors.New("client is nil") } @@ -280,7 +280,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } bytesFlushed := b.buf.Len() - res, err := req.Do(ctx, b.config.client) + res, err := req.Do(ctx, b.config.Client) if err != nil { b.resetBuf() return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err) From 36ab11b6e72ea0dabc3c58648982fb0c7207d14e Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 22 Apr 2024 07:06:48 +0100 Subject: [PATCH 10/11] Add RetryOnDocumentStatus config (#159) Make document level retry status configurable with a default of 429 only. --- appender.go | 9 ++++--- appender_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ bulk_indexer.go | 31 +++++++++++++++++++---- config.go | 5 ++++ 4 files changed, 100 insertions(+), 9 deletions(-) diff --git a/appender.go b/appender.go index 0dede9c..5f64048 100644 --- a/appender.go +++ b/appender.go @@ -147,10 +147,11 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) { available := make(chan *BulkIndexer, cfg.MaxRequests) for i := 0; i < cfg.MaxRequests; i++ { bi, err := NewBulkIndexer(BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: cfg.MaxDocumentRetries, - CompressionLevel: cfg.CompressionLevel, - Pipeline: cfg.Pipeline, + Client: client, + MaxDocumentRetries: cfg.MaxDocumentRetries, + RetryOnDocumentStatus: cfg.RetryOnDocumentStatus, + CompressionLevel: cfg.CompressionLevel, + Pipeline: cfg.Pipeline, }) if err != nil { return nil, fmt.Errorf("error creating bulk indexer: %w", err) diff --git a/appender_test.go b/appender_test.go index 301c340..27f7362 100644 --- a/appender_test.go +++ b/appender_test.go @@ -848,6 +848,70 @@ func TestAppenderRetryDocument(t *testing.T) { } } +func TestAppenderRetryDocument_RetryOnDocumentStatus(t *testing.T) { + testCases := map[string]struct { + status int + expectedDocsInRequest []int // at index i stores the number of documents in the i-th request + cfg docappender.Config + }{ + "should retry": { + status: 500, + expectedDocsInRequest: []int{1, 2, 1}, // 3rd request is triggered by indexer close + cfg: docappender.Config{ + MaxRequests: 1, + MaxDocumentRetries: 1, + FlushInterval: 100 * time.Millisecond, + RetryOnDocumentStatus: []int{429, 500}, + }, + }, + "should not retry": { + status: 500, + expectedDocsInRequest: []int{1, 1}, + cfg: docappender.Config{ + MaxRequests: 1, + MaxDocumentRetries: 1, + FlushInterval: 100 * time.Millisecond, + RetryOnDocumentStatus: []int{429}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + var failedCount atomic.Int32 + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + attempt := failedCount.Add(1) - 1 + require.Len(t, result.Items, tc.expectedDocsInRequest[attempt]) + for _, item := range result.Items { + itemResp := item["create"] + itemResp.Status = tc.status + item["create"] = itemResp + } + json.NewEncoder(w).Encode(result) + }) + + indexer, err := docappender.New(client, tc.cfg) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + addMinimalDoc(t, indexer, "logs-foo-testing1") + + require.Eventually(t, func() bool { + return failedCount.Load() == 1 + }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + + addMinimalDoc(t, indexer, "logs-foo-testing2") + + require.Eventually(t, func() bool { + return failedCount.Load() == 2 + }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + + err = indexer.Close(context.Background()) + assert.NoError(t, err) + }) + } +} + func TestAppenderCloseFlushContext(t *testing.T) { srvctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/bulk_indexer.go b/bulk_indexer.go index 958c8f0..4cd970a 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -56,6 +56,11 @@ type BulkIndexerConfig struct { // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int + // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. + // + // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. + RetryOnDocumentStatus []int + // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) // to 9 (gzip.BestCompression). Higher values provide greater compression, at a // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the @@ -174,6 +179,13 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { config: cfg, retryCounts: make(map[int]int), } + + // use a len check instead of a nil check because document level retries + // should be disabled using MaxDocumentRetries instead. + if len(b.config.RetryOnDocumentStatus) == 0 { + b.config.RetryOnDocumentStatus = []int{http.StatusTooManyRequests} + } + if cfg.CompressionLevel != gzip.NoCompression { b.gzipw, _ = gzip.NewWriterLevel(&b.buf, cfg.CompressionLevel) b.writer = b.gzipw @@ -287,8 +299,8 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } defer res.Body.Close() - // Reset the buffer and gzip writer so they can be reused in case 429s - // were received. + // Reset the buffer and gzip writer so they can be reused in case + // document level retries are needed. b.resetBuf() // Record the number of flushed bytes only when err == nil. The body may @@ -344,7 +356,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error seen := 0 for _, res := range resp.FailedDocs { - if res.Status == http.StatusTooManyRequests { + if b.shouldRetryOnStatus(res.Status) { // there are two lines for each document: // - action // - document @@ -355,7 +367,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error startln := res.Position * 2 endln := startln + 2 - // Increment 429 count for the positions found. + // Increment retry count for the positions found. count := b.retryCounts[res.Position] + 1 // check if we are above the maxDocumentRetry setting if count > b.config.MaxDocumentRetries { @@ -436,7 +448,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error resp.RetriedDocs++ b.itemsAdded++ } else { - // If it's not a 429 treat the document as failed + // If it's not a retriable error, treat the document as failed tmp = append(tmp, res) } } @@ -449,6 +461,15 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error return resp, nil } +func (b *BulkIndexer) shouldRetryOnStatus(docStatus int) bool { + for _, status := range b.config.RetryOnDocumentStatus { + if docStatus == status { + return true + } + } + return false +} + // indexnth returns the index of the nth instance of sep in s. // It returns -1 if sep is not present in s or nth is 0. func indexnth(s []byte, nth int, sep rune) int { diff --git a/config.go b/config.go index 62f428d..83ab497 100644 --- a/config.go +++ b/config.go @@ -58,6 +58,11 @@ type Config struct { // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int + // RetryOnDocumentStatus holds the document level statuses that will trigger a document retry. + // + // If RetryOnDocumentStatus is empty or nil, the default of [429] will be used. + RetryOnDocumentStatus []int + // FlushBytes holds the flush threshold in bytes. If Compression is enabled, // The number of documents that can be buffered will be greater. // From 71966cb3fef67f3ac2a814d837d07e1feaf4c566 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Apr 2024 11:41:39 +0200 Subject: [PATCH 11/11] feat: add document retries metrics (#157) * feat: add document retries metrics Add a metric which tracks the number of document retries Update retry test to assert metrics count * refactor: update metric name * docs: update metrics description Co-authored-by: Carson Ip --------- Co-authored-by: Carson Ip --- appender.go | 6 ++++++ appender_test.go | 31 +++++++++++++++++++++++++++++++ metric.go | 6 ++++++ 3 files changed, 43 insertions(+) diff --git a/appender.go b/appender.go index 5f64048..2d57bd1 100644 --- a/appender.go +++ b/appender.go @@ -391,6 +391,12 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if docsFailed > 0 { atomic.AddInt64(&a.docsFailed, docsFailed) } + if resp.RetriedDocs > 0 { + a.addCount(resp.RetriedDocs, + nil, + a.metrics.docsRetried, + ) + } if docsIndexed > 0 { a.addCount(docsIndexed, &a.docsIndexed, diff --git a/appender_test.go b/appender_test.go index 27f7362..4db78ce 100644 --- a/appender_test.go +++ b/appender_test.go @@ -766,6 +766,13 @@ func TestAppenderRetryDocument(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + var rm metricdata.ResourceMetrics + var failedCount atomic.Int32 var done atomic.Bool client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -816,6 +823,7 @@ func TestAppenderRetryDocument(t *testing.T) { done.Store(true) }) + tc.cfg.MeterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) indexer, err := docappender.New(client, tc.cfg) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -829,6 +837,18 @@ func TestAppenderRetryDocument(t *testing.T) { return failedCount.Load() == 1 }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + var asserted atomic.Int64 + assertCounter := docappendertest.NewAssertCounter(t, &asserted) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.retried": + assertCounter(m, 5, *attribute.EmptySet()) + } + }) + assert.Equal(t, int64(1), asserted.Load()) + addMinimalDoc(t, indexer, "logs-foo-testing10") addMinimalDoc(t, indexer, "logs-foo-testing11") @@ -836,6 +856,17 @@ func TestAppenderRetryDocument(t *testing.T) { return failedCount.Load() == 2 }, 2*time.Second, 50*time.Millisecond, "timed out waiting for first flush request to fail") + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + assertCounter = docappendertest.NewAssertCounter(t, &asserted) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.retried": + assertCounter(m, 5, *attribute.EmptySet()) + } + }) + assert.Equal(t, int64(2), asserted.Load()) + addMinimalDoc(t, indexer, "logs-foo-testing12") require.Eventually(t, func() bool { diff --git a/metric.go b/metric.go index b39216e..2da9cbe 100644 --- a/metric.go +++ b/metric.go @@ -32,6 +32,7 @@ type metrics struct { docsAdded metric.Int64Counter docsActive metric.Int64UpDownCounter docsIndexed metric.Int64Counter + docsRetried metric.Int64Counter bytesTotal metric.Int64Counter availableBulkRequests metric.Int64UpDownCounter activeCreated metric.Int64Counter @@ -103,6 +104,11 @@ func newMetrics(cfg Config) (metrics, error) { description: "Number of APM Events flushed to Elasticsearch. Attributes are used to report separate counts for different outcomes - success, client failure, etc.", p: &ms.docsIndexed, }, + { + name: "elasticsearch.events.retried", + description: "The number of document retries. A single document may be retried more than once.", + p: &ms.docsRetried, + }, { name: "elasticsearch.flushed.bytes", description: "The total number of bytes written to the request body",