Skip to content

Commit

Permalink
tests: bump kgo-verifier version and support new features
Browse files Browse the repository at this point in the history
See redpanda-data/kgo-verifier#60,
which added `--tombstone-probability`, `--compacted`,
`--validate-latest-values` as input parameters.
  • Loading branch information
WillemKauf committed Dec 4, 2024
1 parent ce8d016 commit e91e68e
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 11 deletions.
4 changes: 2 additions & 2 deletions tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
git -C /opt clone https://github.com/WillemKauf/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b
git reset --hard 01dcf480ea33758630643be7bd24b2bba0fd6c0e
go mod tidy
make
51 changes: 42 additions & 9 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def clean_node(self, node: ClusterNode):
self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node")
node.account.kill_process("kgo-verifier", clean_shutdown=False)
node.account.remove("valid_offsets*json", True)
node.account.remove("latest_value*json", True)
node.account.remove(f"/tmp/{self.__class__.__name__}*", True)

def _remote(self, node, action, timeout=60):
Expand Down Expand Up @@ -475,8 +476,8 @@ class ValidatorStatus:

def __init__(self, name: str, valid_reads: int, invalid_reads: int,
out_of_scope_invalid_reads: int,
max_offsets_consumed: Optional[int], lost_offsets: Dict[str,
int]):
max_offsets_consumed: Optional[int],
lost_offsets: Dict[str, int], tombstones_consumed: int):
# Validator name is just a unique name per worker thread in kgo-verifier: useful in logging
# but we mostly don't care
self.name = name
Expand All @@ -486,6 +487,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int,
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads
self.max_offsets_consumed = max_offsets_consumed
self.lost_offsets = lost_offsets
self.tombstones_consumed = tombstones_consumed

@property
def total_reads(self):
Expand All @@ -510,7 +512,8 @@ def __str__(self):
f"valid_reads={self.valid_reads}, " \
f"invalid_reads={self.invalid_reads}, " \
f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \
f"lost_offsets={self.lost_offsets}>"
f"lost_offsets={self.lost_offsets}, " \
f"tombstones_consumed={self.tombstones_consumed}>"


class ConsumerStatus:
Expand All @@ -531,7 +534,8 @@ def __init__(self,
'out_of_scope_invalid_reads': 0,
'name': "",
'max_offsets_consumed': dict(),
'lost_offsets': dict()
'lost_offsets': dict(),
'tombstones_consumed': 0
}

self.validator = ValidatorStatus(**validator)
Expand Down Expand Up @@ -571,7 +575,9 @@ def __init__(self,
msgs_per_producer_id=None,
max_buffered_records=None,
tolerate_data_loss=False,
tolerate_failed_produce=False):
tolerate_failed_produce=False,
tombstone_probability=0.0,
validate_latest_values=False):
super(KgoVerifierProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node,
debug_logs, trace_logs, username, password,
Expand All @@ -590,6 +596,8 @@ def __init__(self,
self._max_buffered_records = max_buffered_records
self._tolerate_data_loss = tolerate_data_loss
self._tolerate_failed_produce = tolerate_failed_produce
self._tombstone_probability = tombstone_probability
self._validate_latest_values = validate_latest_values

@property
def produce_status(self):
Expand Down Expand Up @@ -697,6 +705,11 @@ def start_node(self, node, clean=False):
if self._tolerate_failed_produce:
cmd += " --tolerate-failed-produce"

if self._tombstone_probability is not None:
cmd += f" --tombstone-probability {self._tombstone_probability}"
if self._validate_latest_values:
cmd += " --validate-latest-values"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ProduceStatus)
Expand Down Expand Up @@ -745,7 +758,9 @@ def __init__(
username: Optional[str] = None,
password: Optional[str] = None,
enable_tls: Optional[bool] = False,
use_transactions: Optional[bool] = False):
use_transactions: Optional[bool] = False,
compacted: Optional[bool] = False,
validate_latest_values: Optional[bool] = False):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)
self._max_msgs = max_msgs
Expand All @@ -755,6 +770,8 @@ def __init__(
self._tolerate_data_loss = tolerate_data_loss
self._producer = producer
self._use_transactions = use_transactions
self._compacted = compacted
self._validate_latest_values = validate_latest_values

def start_node(self, node, clean=False):
if clean:
Expand All @@ -778,6 +795,11 @@ def start_node(self, node, clean=False):
cmd += " --tolerate-data-loss"
if self._use_transactions:
cmd += " --use-transactions"
if self._compacted:
cmd += " --compacted"
if self._validate_latest_values:
cmd += " --validate-latest-values"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand Down Expand Up @@ -877,7 +899,9 @@ def __init__(self,
continuous=False,
tolerate_data_loss=False,
group_name=None,
use_transactions=False):
use_transactions=False,
compacted=False,
validate_latest_values=False):
super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs,
trace_logs, username, password, enable_tls)

Expand All @@ -889,6 +913,8 @@ def __init__(self,
self._continuous = continuous
self._tolerate_data_loss = tolerate_data_loss
self._use_transactions = use_transactions
self._compacted = compacted
self._validate_latest_values = validate_latest_values

def start_node(self, node, clean=False):
if clean:
Expand All @@ -915,6 +941,11 @@ def start_node(self, node, clean=False):
cmd += f" --consumer_group_name {self._group_name}"
if self._use_transactions:
cmd += " --use-transactions"
if self._compacted:
cmd += " --compacted"
if self._validate_latest_values:
cmd += " --validate-latest-values"

self.spawn(cmd, node)

self._status_thread = StatusThread(self, node, ConsumerStatus)
Expand All @@ -933,7 +964,8 @@ def __init__(self,
active=False,
failed_transactions=0,
aborted_transaction_msgs=0,
fails=0):
fails=0,
tombstones_produced=0):
self.topic = topic
self.sent = sent
self.acked = acked
Expand All @@ -947,7 +979,8 @@ def __init__(self,
self.failed_transactions = failed_transactions
self.aborted_transaction_messages = aborted_transaction_msgs
self.fails = fails
self.tombstones_produced = tombstones_produced

def __str__(self):
l = self.latency
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {l['p50']}/{l['p90']}/{l['p99']}>"
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {self.tombstones_produced} {l['p50']}/{l['p90']}/{l['p99']}>"
Loading

0 comments on commit e91e68e

Please sign in to comment.