Skip to content

Commit

Permalink
tests/migrations: reduce parameter matrix
Browse files Browse the repository at this point in the history
do not vary use_alias if canceling before inbound migration
  • Loading branch information
bashtanov committed Sep 12, 2024
1 parent 9a3ec5e commit 3d517c5
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import random
import threading
import time
from typing import Callable
from enum import Enum
from typing import Callable, NamedTuple, Literal, List
import typing
from requests.exceptions import ConnectionError

Expand Down Expand Up @@ -68,6 +69,31 @@ def _loop(self):
)


class CancellationStage(NamedTuple):
dir: Literal['in', 'out']
stage: Literal['planned', 'preparing', 'prepared', 'executing', 'executed']


class TmtpdiParams(NamedTuple):
cancellation: CancellationStage | None
use_alias: bool


def generate_tmptpdi_params() -> List[TmtpdiParams]:
cancelation_stages = [
CancellationStage(dir, stage)
for dir in typing.get_args(CancellationStage.dir)
for stage in typing.get_args(CancellationStage.stage)
]
return [
TmtpdiParams(cancelation, use_alias)
for cancelation in [None] + cancelation_stages
for use_alias in (True, False)
# alias only affects inbound, pointless to vary if cancel earlier
if not use_alias or cancelation is None or cancelation.dir == 'in'
]


class DataMigrationsApiTest(RedpandaTest):
log_segment_size = 10 * 1024

Expand Down Expand Up @@ -420,14 +446,10 @@ def cancel_inbound(self, migration_id, topic_name):
self.assert_no_topics()

@cluster(num_nodes=4, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
@matrix(use_alias=[True, False],
transfer_leadership=[True, False],
cancellation=[None] +
[(dir, stage) for dir in ('in', 'out')
for stage in ('planned', 'preparing', 'prepared', 'executing',
'executed')])
def test_migrated_topic_data_integrity(self, use_alias,
transfer_leadership, cancellation):
@matrix(transfer_leadership=[True, False],
params=generate_tmptpdi_params())
def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
params: TmtpdiParams):
rpk = RpkTool(self.redpanda)
self.redpanda.si_settings.set_expected_damage(
{"ntr_no_topic_manifest", "missing_segments"})
Expand All @@ -448,13 +470,13 @@ def test_migrated_topic_data_integrity(self, use_alias,
consumer_groups=[])
out_migration_id = self.create_and_wait(out_migration)

if cancellation == ('out', 'planned'):
if params.cancellation == CancellationStage('out', 'planned'):
return self.cancel_outbound(out_migration_id,
workload_topic.name, producer)

admin.execute_data_migration_action(out_migration_id,
MigrationAction.prepare)
if cancellation == ('out', 'preparing'):
if params.cancellation == CancellationStage('out', 'preparing'):
self.wait_for_migration_states(out_migration_id,
['preparing', 'prepared'])
return self.cancel_outbound(out_migration_id,
Expand All @@ -471,13 +493,13 @@ def test_migrated_topic_data_integrity(self, use_alias,
metadata_locked=True,
read_blocked=False,
produce_blocked=False)
if cancellation == ('out', 'prepared'):
if params.cancellation == CancellationStage('out', 'prepared'):
return self.cancel_outbound(out_migration_id,
workload_topic.name, producer)

admin.execute_data_migration_action(out_migration_id,
MigrationAction.execute)
if cancellation == ('out', 'executing'):
if params.cancellation == CancellationStage('out', 'executing'):
self.wait_for_migration_states(out_migration_id,
['executing', 'executed'])
return self.cancel_outbound(out_migration_id,
Expand All @@ -493,7 +515,7 @@ def test_migrated_topic_data_integrity(self, use_alias,
metadata_locked=True,
read_blocked=False,
produce_blocked=True)
if cancellation == ('out', 'executed'):
if params.cancellation == CancellationStage('out', 'executed'):
return self.cancel_outbound(out_migration_id,
workload_topic.name, producer)

Expand Down Expand Up @@ -523,9 +545,9 @@ def test_migrated_topic_data_integrity(self, use_alias,
admin.delete_data_migration(out_migration_id)

# attach topic back
inbound_topic_name = "aliased-workload-topic" if use_alias else workload_topic.name
inbound_topic_name = "aliased-workload-topic" if params.use_alias else workload_topic.name
alias = None
if use_alias:
if params.use_alias:
alias = NamespacedTopic(topic=inbound_topic_name)

in_tl_thread = TransferLeadersBackgroundThread(
Expand All @@ -549,15 +571,15 @@ def test_migrated_topic_data_integrity(self, use_alias,
expected_to_pass=False,
operation=lambda topic: rpk.create_topic(topic=topic,
replicas=3))
if cancellation == ('in', 'planned'):
if params.cancellation == CancellationStage('in', 'planned'):
cancellation = None
self.cancel_inbound(in_migration_id, inbound_topic_name)
continue

admin.execute_data_migration_action(in_migration_id,
MigrationAction.prepare)

if cancellation == ('in', 'preparing'):
if params.cancellation == CancellationStage('in', 'preparing'):
cancellation = None
self.wait_for_migration_states(in_migration_id,
['preparing', 'prepared'])
Expand All @@ -577,7 +599,7 @@ def test_migrated_topic_data_integrity(self, use_alias,
read_blocked=True,
produce_blocked=True)

if cancellation == ('in', 'prepared'):
if params.cancellation == CancellationStage('in', 'prepared'):
cancellation = None
self.cancel_inbound(in_migration_id, inbound_topic_name)
continue
Expand All @@ -590,7 +612,7 @@ def test_migrated_topic_data_integrity(self, use_alias,

admin.execute_data_migration_action(in_migration_id,
MigrationAction.execute)
if cancellation == ('in', 'executing'):
if params.cancellation == CancellationStage('in', 'executing'):
cancellation = None
self.wait_for_migration_states(in_migration_id,
['executing', 'executed'])
Expand All @@ -608,7 +630,7 @@ def test_migrated_topic_data_integrity(self, use_alias,
metadata_locked=True,
read_blocked=True,
produce_blocked=True)
if cancellation == ('in', 'executed'):
if params.cancellation == CancellationStage('in', 'executed'):
cancellation = None
self.cancel_inbound(in_migration_id, inbound_topic_name)
continue
Expand Down

0 comments on commit 3d517c5

Please sign in to comment.