Skip to content

Commit

Permalink
tests/migration: tolerate skipped transient states, test both out and in
Browse files Browse the repository at this point in the history
1. Pass if we haven't seen a migration in a transient state, as it may just have
gone through it quicker than we can notice
2. Test both outbound and inbound migrations avoiding topic name collisions
  • Loading branch information
bashtanov committed Jul 31, 2024
1 parent ae01133 commit a88a800
Showing 1 changed file with 77 additions and 27 deletions.
104 changes: 77 additions & 27 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ def get_migrations_map(self, node=None):
migrations = self.admin.list_data_migrations(node).json()
return {migration["id"]: migration for migration in migrations}

def wait_for_migration_state(self, id: int, state: str):
def migration_in_state():
def wait_for_migration_states(self, id: int, states: list[str]):
def migration_in_one_of_states():
for n in self.redpanda.nodes:
migrations = self.get_migrations_map(n)
if id not in migrations or migrations[id]["state"] != state:
if id not in migrations or migrations[id][
"state"] not in states:
return False
return True

wait_until(
migration_in_state,
migration_in_one_of_states,
timeout_sec=90,
backoff_sec=1,
err_msg=f"Failed waiting for migration {id} to reach {state} state"
err_msg=
f"Failed waiting for migration {id} to reach on of {states} states"
)

def create_and_wait(self, migration: InboundDataMigration
Expand Down Expand Up @@ -83,25 +85,16 @@ def test_creating_and_listing_migrations(self):

assert len(migrations_map) == 0, "There should be no data migrations"

topics_to_migrate = [NamespacedTopic(t.name) for t in topics]
out_migration = OutboundDataMigration(topics_to_migrate,
# out
outbound_topics = [NamespacedTopic(t.name) for t in topics]
out_migration = OutboundDataMigration(outbound_topics,
consumer_groups=[])

out_migration_id = self.create_and_wait(out_migration)
inbound_topics = [
InboundTopic(
NamespacedTopic(f"topic-{i}"),
alias=None if i == 0 else NamespacedTopic(f"topic-{i}-alias"))
for i in range(3)
]
in_migration = InboundDataMigration(topics=inbound_topics,
consumer_groups=["g-1", "g-2"])

in_migration_id = self.create_and_wait(in_migration)

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(migrations_map) == 2, "There should be two data migrations"
assert len(migrations_map) == 1, "There should be one data migration"

assert migrations_map[out_migration_id]['state'] == 'planned'

Expand All @@ -111,20 +104,77 @@ def test_creating_and_listing_migrations(self):

admin.execute_data_migration_action(out_migration_id,
MigrationAction.prepare)
self.logger.info('waiting for preparing')
self.wait_for_migration_state(out_migration_id, 'preparing')
self.logger.info('waiting for preparing or prepared')
self.wait_for_migration_states(out_migration_id,
['preparing', 'prepared'])
self.logger.info('waiting for prepared')
self.wait_for_migration_state(out_migration_id, 'prepared')
self.wait_for_migration_states(out_migration_id, ['prepared'])
admin.execute_data_migration_action(out_migration_id,
MigrationAction.execute)
self.logger.info('waiting for executing')
self.wait_for_migration_state(out_migration_id, 'executing')
self.wait_for_migration_state(out_migration_id, 'executed')
self.logger.info('waiting for executing or executed')
self.wait_for_migration_states(out_migration_id,
['executing', 'executed'])
self.logger.info('waiting for executed')
self.wait_for_migration_states(out_migration_id, ['executed'])
admin.execute_data_migration_action(out_migration_id,
MigrationAction.finish)
self.wait_for_migration_state(out_migration_id, 'cut_over')
self.wait_for_migration_state(out_migration_id, 'finished')
self.logger.info('done')
self.logger.info('waiting for cut_over or finished')
self.wait_for_migration_states(out_migration_id,
['cut_over', 'finished'])
self.wait_for_migration_states(out_migration_id, ['finished'])

assert migrations_map[out_migration_id]['state'] == 'planned'

for t in topics:
assert self.client().describe_topic(t.name).partitions == []
# in
#alias=None if i == 0 else NamespacedTopic(f"topic-{i}-alias"))
inbound_topics = [
InboundTopic(NamespacedTopic(t.name),
alias=NamespacedTopic(f"{t.name}-alias"))
for t in topics[:3]
]
in_migration = InboundDataMigration(topics=inbound_topics,
consumer_groups=["g-1", "g-2"])
in_migration_id = self.create_and_wait(in_migration)

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(migrations_map) == 2, "There should be two data migrations"

assert len(
migrations_map[in_migration_id]['migration']['topics']) == len(
inbound_topics), "migration should contain all topics"

for t in inbound_topics:
self.logger.info(
f"inbound topic: {self.client().describe_topic(t.src_topic.topic)}"
)

admin.execute_data_migration_action(in_migration_id,
MigrationAction.prepare)
self.logger.info('waiting for preparing or prepared')
self.wait_for_migration_states(in_migration_id,
['preparing', 'prepared'])
self.logger.info('waiting for prepared')
self.wait_for_migration_states(in_migration_id, ['prepared'])
admin.execute_data_migration_action(in_migration_id,
MigrationAction.execute)
self.logger.info('waiting for executing or executed')
self.wait_for_migration_states(in_migration_id,
['executing', 'executed'])
self.logger.info('waiting for executed')
self.wait_for_migration_states(in_migration_id, ['executed'])
admin.execute_data_migration_action(in_migration_id,
MigrationAction.finish)
self.logger.info('waiting for cut_over or finished')
self.wait_for_migration_states(in_migration_id,
['cut_over', 'finished'])
self.wait_for_migration_states(in_migration_id, ['finished'])

for t in topics:
self.logger.info(
f'topic {t.name} is {self.client().describe_topic(t.name)}')

# TODO: check unhappy scenarios like this
# admin.execute_data_migration_action(out_migration_id,
Expand Down

0 comments on commit a88a800

Please sign in to comment.