Skip to content

Commit

Permalink
tests: bump kgo-verifier version and support new features
Browse files Browse the repository at this point in the history
See redpanda-data/kgo-verifier#60,
which added `--tombstone-probability`, `--compacted` as input parameters.

Also, use `errors='replace'` in `kafka_cat.py`, to avoid UTF-8 decoding issues
with randomly generated bytes in `kgo-verifier` records.
  • Loading branch information
WillemKauf committed Nov 18, 2024
1 parent 9da76a2 commit d1f0226
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
4 changes: 2 additions & 2 deletions tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
git -C /opt clone https://github.com/WillemKauf/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b
git reset --hard 937e85753c6d153e303ac8cead9c5ee0d250f1a8
go mod tidy
make
3 changes: 2 additions & 1 deletion tests/rptest/clients/kafka_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def _cmd_raw(self, cmd: list[str], input: str | None = None):
res = subprocess.check_output(
["kcat", "-b", self._redpanda.brokers()] + cmd,
text=True,
input=input)
input=input,
errors='replace')
self._redpanda.logger.debug(res)
return res
except subprocess.CalledProcessError as e:
Expand Down
46 changes: 37 additions & 9 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ class ValidatorStatus:

def __init__(self, name: str, valid_reads: int, invalid_reads: int,
out_of_scope_invalid_reads: int,
max_offsets_consumed: Optional[int], lost_offsets: Dict[str,
int]):
max_offsets_consumed: Optional[int],
lost_offsets: Dict[str, int], tombstones_consumed: int):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name
Expand All @@ -486,6 +486,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int,
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed
self.lost_offsets = lost_offsets
self.tombstones_consumed = tombstones_consumed

@property
def total_reads(self):
Expand All @@ -510,7 +511,8 @@ def __str__(self):
f"valid_reads={self.valid_reads}, " \
f"invalid_reads={self.invalid_reads}, " \
f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \
f"lost_offsets={self.lost_offsets}>"
f"lost_offsets={self.lost_offsets}, " \
f"tombstones_consumed={self.tombstones_consumed}>"


class ConsumerStatus:
Expand All @@ -531,7 +533,8 @@ def __init__(self,
'out_of_scope_invalid_reads': 0,
'name': "",
'max_offsets_consumed': dict(),
'lost_offsets': dict()
'lost_offsets': dict(),
'tombstones_consumed': 0
}

self.validator = ValidatorStatus(**validator)
Expand Down Expand Up @@ -571,7 +574,8 @@ def __init__(self,
msgs_per_producer_id=None,
max_buffered_records=None,
tolerate_data_loss=False,
tolerate_failed_produce=False):
tolerate_failed_produce=False,
tombstone_probability=0.0):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs, username, password,
Expand All @@ -590,6 +594,7 @@ def __init__(self,
self._max_buffered_records = max_buffered_records
self._tolerate_data_loss = tolerate_data_loss
self._tolerate_failed_produce = tolerate_failed_produce
self._tombstone_probability = tombstone_probability

@property
def produce_status(self):
Expand Down Expand Up @@ -697,6 +702,9 @@ def start_node(self, node, clean=False):
if self._tolerate_failed_produce:
cmd += " --tolerate-failed-produce"

if self._tombstone_probability is not None:
cmd += f" --tombstone-probability {self._tombstone_probability}"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
Expand Down Expand Up @@ -745,7 +753,9 @@ def __init__(
username: Optional[str] = None,
password: Optional[str] = None,
enable_tls: Optional[bool] = False,
use_transactions: Optional[bool] = False):
use_transactions: Optional[bool] = False,
compacted: Optional[bool] = False,
expect_fully_compacted: Optional[bool] = False):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)
self._max_msgs = max_msgs
Expand All @@ -755,6 +765,8 @@ def __init__(
self._tolerate_data_loss = tolerate_data_loss
self._producer = producer
self._use_transactions = use_transactions
self._compacted = compacted
self._expect_fully_compacted = expect_fully_compacted

def start_node(self, node, clean=False):
if clean:
Expand All @@ -778,6 +790,11 @@ def start_node(self, node, clean=False):
cmd += " --tolerate-data-loss"
if self._use_transactions:
cmd += " --use-transactions"
if self._compacted:
cmd += " --compacted"
if self._expect_fully_compacted:
cmd += " --expect-fully-compacted"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down Expand Up @@ -877,7 +894,9 @@ def __init__(self,
continuous=False,
tolerate_data_loss=False,
group_name=None,
use_transactions=False):
use_transactions=False,
compacted=False,
expect_fully_compacted=False):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)

Expand All @@ -889,6 +908,8 @@ def __init__(self,
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._use_transactions = use_transactions
self._compacted = compacted
self._expect_fully_compacted = expect_fully_compacted

def start_node(self, node, clean=False):
if clean:
Expand All @@ -915,6 +936,11 @@ def start_node(self, node, clean=False):
cmd += f" --consumer_group_name {self._group_name}"
if self._use_transactions:
cmd += " --use-transactions"
if self._compacted:
cmd += " --compacted"
if self._expect_fully_compacted:
cmd += " --expect-fully-compacted"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand All @@ -933,7 +959,8 @@ def __init__(self,
active=False,
failed_transactions=0,
aborted_transaction_msgs=0,
fails=0):
fails=0,
tombstones_produced=0):
self.topic = topic
self.sent = sent
self.acked = acked
Expand All @@ -947,7 +974,8 @@ def __init__(self,
self.failed_transactions = failed_transactions
self.aborted_transaction_messages = aborted_transaction_msgs
self.fails = fails
self.tombstones_produced = tombstones_produced

def __str__(self):
l = self.latency
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {l['p50']}/{l['p90']}/{l['p99']}>"
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {self.tombstones_produced} {l['p50']}/{l['p90']}/{l['p99']}>"

0 comments on commit d1f0226

Please sign in to comment.