diff --git a/tests/docker/ducktape-deps/kgo-verifier b/tests/docker/ducktape-deps/kgo-verifier index 176b425778ada..131f878942d35 100644 --- a/tests/docker/ducktape-deps/kgo-verifier +++ b/tests/docker/ducktape-deps/kgo-verifier @@ -1,7 +1,7 @@ #!/usr/bin/env bash set -e -git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git +git -C /opt clone https://github.com/WillemKauf/kgo-verifier.git cd /opt/kgo-verifier -git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b +git reset --hard 01dcf480ea33758630643be7bd24b2bba0fd6c0e go mod tidy make diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index efbd4cab43930..286d904c53833 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -228,6 +228,7 @@ def clean_node(self, node: ClusterNode): self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node") node.account.kill_process("kgo-verifier", clean_shutdown=False) node.account.remove("valid_offsets*json", True) + node.account.remove("latest_value*json", True) node.account.remove(f"/tmp/{self.__class__.__name__}*", True) def _remote(self, node, action, timeout=60): @@ -475,8 +476,8 @@ class ValidatorStatus: def __init__(self, name: str, valid_reads: int, invalid_reads: int, out_of_scope_invalid_reads: int, - max_offsets_consumed: Optional[int], lost_offsets: Dict[str, - int]): + max_offsets_consumed: Optional[int], + lost_offsets: Dict[str, int], tombstones_consumed: int): # Validator name is just a unique name per worker thread in kgo-verifier: useful in logging # but we mostly don't care self.name = name @@ -486,6 +487,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int, self.out_of_scope_invalid_reads = out_of_scope_invalid_reads self.max_offsets_consumed = max_offsets_consumed self.lost_offsets = lost_offsets + self.tombstones_consumed = tombstones_consumed @property def total_reads(self): @@ -510,7 +512,8 @@ def __str__(self): f"valid_reads={self.valid_reads}, " \ f"invalid_reads={self.invalid_reads}, " \ f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \ - f"lost_offsets={self.lost_offsets}>" + f"lost_offsets={self.lost_offsets}, " \ + f"tombstones_consumed={self.tombstones_consumed}>" class ConsumerStatus: @@ -531,7 +534,8 @@ def __init__(self, 'out_of_scope_invalid_reads': 0, 'name': "", 'max_offsets_consumed': dict(), - 'lost_offsets': dict() + 'lost_offsets': dict(), + 'tombstones_consumed': 0 } self.validator = ValidatorStatus(**validator) @@ -571,7 +575,9 @@ def __init__(self, msgs_per_producer_id=None, max_buffered_records=None, tolerate_data_loss=False, - tolerate_failed_produce=False): + tolerate_failed_produce=False, + tombstone_probability=0.0, + validate_latest_values=False): super(KgoVerifierProducer, self).__init__(context, redpanda, topic, msg_size, custom_node, debug_logs, trace_logs, username, password, @@ -590,6 +596,8 @@ def __init__(self, self._max_buffered_records = max_buffered_records self._tolerate_data_loss = tolerate_data_loss self._tolerate_failed_produce = tolerate_failed_produce + self._tombstone_probability = tombstone_probability + self._validate_latest_values = validate_latest_values @property def produce_status(self): @@ -697,6 +705,11 @@ def start_node(self, node, clean=False): if self._tolerate_failed_produce: cmd += " --tolerate-failed-produce" + if self._tombstone_probability is not None: + cmd += f" --tombstone-probability {self._tombstone_probability}" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ProduceStatus) @@ -745,7 +758,9 @@ def __init__( username: Optional[str] = None, password: Optional[str] = None, enable_tls: Optional[bool] = False, - use_transactions: Optional[bool] = False): + use_transactions: Optional[bool] = False, + compacted: Optional[bool] = False, + validate_latest_values: Optional[bool] = False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) self._max_msgs = max_msgs @@ -755,6 +770,8 @@ def __init__( self._tolerate_data_loss = tolerate_data_loss self._producer = producer self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -778,6 +795,11 @@ def start_node(self, node, clean=False): cmd += " --tolerate-data-loss" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -877,7 +899,9 @@ def __init__(self, continuous=False, tolerate_data_loss=False, group_name=None, - use_transactions=False): + use_transactions=False, + compacted=False, + validate_latest_values=False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) @@ -889,6 +913,8 @@ def __init__(self, self._continuous = continuous self._tolerate_data_loss = tolerate_data_loss self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -915,6 +941,11 @@ def start_node(self, node, clean=False): cmd += f" --consumer_group_name {self._group_name}" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -933,7 +964,8 @@ def __init__(self, active=False, failed_transactions=0, aborted_transaction_msgs=0, - fails=0): + fails=0, + tombstones_produced=0): self.topic = topic self.sent = sent self.acked = acked @@ -947,7 +979,8 @@ def __init__(self, self.failed_transactions = failed_transactions self.aborted_transaction_messages = aborted_transaction_msgs self.fails = fails + self.tombstones_produced = tombstones_produced def __str__(self): l = self.latency - return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {l['p50']}/{l['p90']}/{l['p99']}>" + return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {self.tombstones_produced} {l['p50']}/{l['p90']}/{l['p99']}>" diff --git a/tests/rptest/tests/datalake/compaction_test.py b/tests/rptest/tests/datalake/compaction_test.py new file mode 100644 index 0000000000000..1038caee6918b --- /dev/null +++ b/tests/rptest/tests/datalake/compaction_test.py @@ -0,0 +1,289 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import time +from rptest.clients.rpk import RpkTool, TopicSpec +from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.services.redpanda import MetricsEndpoint +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.redpanda import SISettings +from rptest.tests.datalake.utils import supported_storage_types +from ducktape.mark import matrix +from ducktape.utils.util import wait_until +from rptest.services.cluster import cluster +from rptest.utils.mode_checks import skip_debug_mode +from rptest.tests.datalake.datalake_verifier import DatalakeVerifier + + +class CompactionGapsTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + super(CompactionGapsTest, self).__init__( + test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx, fast_uploads=True), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + "datalake_coordinator_snapshot_max_delay_secs": 10, + "log_compaction_interval_ms": 2000 + }, + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + self.segment_size = 5 * 1024 * 1024 + self.kafka_cat = KafkaCat(self.redpanda) + + def setUp(self): + # redpanda will be started by DatalakeServices + pass + + def partition_segments(self) -> int: + assert len(self.redpanda.nodes) == 1, self.redpanda.nodes + node = self.redpanda.nodes[0] + storage = self.redpanda.node_storage(node) + topic_partitions = storage.partitions("kafka", self.topic_name) + assert len(topic_partitions) == 1, len(topic_partitions) + segment_count = len(topic_partitions[0].segments) + self.redpanda.logger.debug(f"Current segment count: {segment_count}") + return segment_count + + def wait_until_segment_count(self, count): + wait_until( + lambda: self.partition_segments() == count, + timeout_sec=30, + backoff_sec=3, + err_msg=f"Timed out waiting for segment count to reach {count}") + + def produce_until_segment_count(self, count): + timeout_sec = 30 + deadline = time() + timeout_sec + while True: + current_segment_count = self.partition_segments() + if current_segment_count >= count: + return + if time() > deadline: + assert False, f"Unable to reach segment count {count} in {timeout_sec}s, current count {current_segment_count}" + KgoVerifierProducer.oneshot(self.test_ctx, + self.redpanda, + self.topic_name, + 2024, + 10000, + key_set_cardinality=2) + + def ensure_translation(self, dl: DatalakeServices): + (_, max_offset) = self.kafka_cat.list_offsets(topic=self.topic_name, + partition=0) + self.redpanda.logger.debug( + f"Ensuring translation until: {max_offset - 1}") + dl.wait_for_translation_until_offset(self.topic_name, max_offset - 1) + + def do_test_no_gaps(self, dl: DatalakeServices): + + dl.create_iceberg_enabled_topic(self.topic_name, + iceberg_mode="key_value", + config={ + "cleanup.policy": + TopicSpec.CLEANUP_COMPACT, + "segment.bytes": self.segment_size + }) + + for _ in range(5): + self.produce_until_segment_count(5) + # # Ensure everything is translated + self.ensure_translation(dl) + # # Disable iceberg + dl.set_iceberg_mode_on_topic(self.topic_name, "disabled") + # Append more data + self.produce_until_segment_count(8) + # # Compact the data + # # One closed segment and one open (current) segment + self.wait_until_segment_count(2) + # # Enable iceberg again + dl.set_iceberg_mode_on_topic(self.topic_name, "key_value") + + @cluster(num_nodes=4) + @skip_debug_mode + @matrix(cloud_storage_type=supported_storage_types()) + def test_translation_no_gaps(self, cloud_storage_type): + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self.do_test_no_gaps(dl) + + +class CompactionTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self.extra_rp_conf = { + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + "datalake_coordinator_snapshot_max_delay_secs": 10, + "log_compaction_interval_ms": 2000, + "log_segment_size": 2 * 1024**2, # 2 MiB + "compacted_log_segment_size": 1024**2 # 1 MiB + } + + super(CompactionTest, + self).__init__(test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx, + fast_uploads=True), + extra_rp_conf=self.extra_rp_conf, + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + self.msg_size = 1024 # 1 KiB + self.total_data = 100 * 1024**2 # 100 MiB + self.msg_count = int(self.total_data / self.msg_size) + self.kafka_cat = KafkaCat(self.redpanda) + + def setUp(self): + # redpanda will be started by DatalakeServices + pass + + def produce(self): + KgoVerifierProducer.oneshot(self.test_ctx, + self.redpanda, + self.topic_name, + msg_size=self.msg_size, + msg_count=self.msg_count, + key_set_cardinality=100, + validate_latest_values=True, + clean=True) + + def ensure_translation(self, dl: DatalakeServices): + (_, max_offset) = self.kafka_cat.list_offsets(topic=self.topic_name, + partition=0) + self.redpanda.logger.debug( + f"Ensuring translation until: {max_offset - 1}") + dl.wait_for_translation_until_offset(self.topic_name, max_offset - 1) + + def get_max_translated_offset(self, dl: DatalakeServices): + return list(dl.get_max_translated_offset(self.topic_name, + 0).values())[0] + + def partition_segments(self) -> int: + assert len(self.redpanda.nodes) == 1, self.redpanda.nodes + node = self.redpanda.nodes[0] + storage = self.redpanda.node_storage(node) + topic_partitions = storage.partitions("kafka", self.topic_name) + assert len(topic_partitions) == 1, len(topic_partitions) + segment_count = len(topic_partitions[0].segments) + self.redpanda.logger.debug(f"Current segment count: {segment_count}") + return segment_count + + def wait_until_segment_count(self, count): + # Restart each redpanda broker to force roll segments + #self.redpanda.restart_nodes(self.redpanda.nodes) + + wait_until( + lambda: self.partition_segments() == count, + timeout_sec=120, + backoff_sec=3, + err_msg=f"Timed out waiting for segment count to reach {count}") + + def wait_for_compaction(self): + # Restart each redpanda broker to force roll segments + self.redpanda.restart_nodes(self.redpanda.nodes) + + def get_complete_sliding_window_rounds(): + return self.redpanda.metric_sum( + metric_name= + "vectorized_storage_log_complete_sliding_window_rounds_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_name) + + # Sleep until the log has been fully compacted. + self.prev_sliding_window_rounds = -1 + + def compaction_has_completed(): + new_sliding_window_rounds = get_complete_sliding_window_rounds() + res = self.prev_sliding_window_rounds == new_sliding_window_rounds + self.prev_sliding_window_rounds = new_sliding_window_rounds + return res + + wait_until( + compaction_has_completed, + timeout_sec=120, + backoff_sec=self.extra_rp_conf['log_compaction_interval_ms'] / + 1000 * 10, + err_msg="Compaction did not stabilize.") + self.redpanda.logger.debug( + f"Compaction stabilized with {get_complete_sliding_window_rounds()} rounds of sliding window compaction" + ) + + def verify_log_and_table(self, dl: DatalakeServices): + try: + rpk = RpkTool(self.redpanda) + print( + rpk.consume(self.topic_name, + meta_only=True, + offset=":end", + timeout=10)) + except: + pass + # Verify a fully compacted log with a sequential consumer + consumer = KgoVerifierSeqConsumer( + self.test_ctx, + self.redpanda, + self.topic_name, + compacted=True, + validate_latest_values=True, + ) + + consumer.start(clean=False) + consumer.wait(timeout_sec=120) + consumer.free() + + max_consumed_offset = consumer.consumer_status.validator.max_offsets_consumed[ + '0'] + max_translated_offset = self.get_max_translated_offset(dl) + print(max_consumed_offset, max_translated_offset) + assert max_consumed_offset == max_translated_offset + verifier = DatalakeVerifier(self.redpanda, + self.topic_name, + dl.trino(), + compacted=True) + + verifier.start() + verifier.wait() + + def do_test_compaction(self, dl: DatalakeServices): + dl.create_iceberg_enabled_topic( + self.topic_name, + iceberg_mode="key_value", + config={"cleanup.policy": TopicSpec.CLEANUP_COMPACT}) + + for _ in range(5): + self.produce() + # Ensure everything is translated + self.ensure_translation(dl) + # Compact the data + self.wait_for_compaction() + #self.wait_until_segment_count(2) + # Assert on read log and on iceberg table. + # Compaction settling indicates that everything should have been translated + # into the Iceberg table already. + self.verify_log_and_table(dl) + + @cluster(num_nodes=4) + #@skip_debug_mode + @matrix(cloud_storage_type=supported_storage_types()) + def test_compaction(self, cloud_storage_type): + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self.do_test_compaction(dl)