-
Notifications
You must be signed in to change notification settings - Fork 3
/
example_7_batch_logger_tracer_metric_test.go
50 lines (46 loc) · 1.24 KB
/
example_7_batch_logger_tracer_metric_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package example
import (
"context"
"fmt"
kafka_go_worker "github.com/sellsuki/kafka-go-worker"
"github.com/sellsuki/kafka-go-worker/handler"
"testing"
)
func Test_Example_7(t *testing.T) {
initLogger()
initTracer()
worker := kafka_go_worker.NewKafkaWorker(workerConfig,
handler.WithLoggerZap("batch_process", workerConfig.WorkerName, workerConfig.TopicName),
handler.WithMetricPrometheus(
"batch_Test_Example_7",
prom, workerConfig.WorkerName,
workerConfig.TopicName,
workerConfig.BatchSize,
),
handler.WithTracerOtel(
"kafka_consumer_batch",
"kafka_consumer_worker_example_7",
workerConfig.WorkerName,
false,
),
handler.WithAtLeastOnceCommitter,
handler.WithSerialWorker(demoWorker, false,
handler.WithRecovery,
handler.WithLoggerZap("worker_process", workerConfig.WorkerName, workerConfig.TopicName),
handler.WithMetricPrometheus(
fmt.Sprintf("worker_%s", workerConfig.WorkerName),
prom, workerConfig.WorkerName,
workerConfig.TopicName,
workerConfig.BatchSize,
),
handler.WithTracerOtel(
"kafka_consumer_worker",
"kafka_consumer_worker_example_7_worker",
workerConfig.WorkerName,
true,
),
),
)
// Run util context get cancelled
worker.Start(context.Background())
}