Skip to content

Commit

Permalink
Merge pull request redpanda-data#24522 from mmaslankaprv/CORE-8485-re…
Browse files Browse the repository at this point in the history
…set-translation-state-on-snapshot

[CORE-8485] Reset translation state on snapshot
  • Loading branch information
mmaslankaprv authored Dec 12, 2024
2 parents 10bc74b + 8985f57 commit f4c472f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 10 deletions.
9 changes: 8 additions & 1 deletion src/v/datalake/translation/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ translation_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
co_return raft::stm_snapshot::create(0, snapshot_offset, std::move(result));
}

ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) { co_return; }
ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) {
// reset offset to not initalized when handling Raft snapshot, this way
// state machine will not hold any obsolete state that should be overriden
// with the snapshot.
vlog(_log.debug, "Applying raft snapshot, resetting state");
_highest_translated_offset = kafka::offset{};
co_return;
}

ss::future<iobuf> translation_stm::take_snapshot(model::offset) {
co_return iobuf{};
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/translation/tests/state_machine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct translator_stm_fixture : stm_raft_fixture<stm> {
};

TEST_F_CORO(translator_stm_fixture, state_machine_ops) {
enable_offset_translation();
co_await initialize_state_machines();
co_await wait_for_leader(5s);
scoped_config config;
Expand Down
24 changes: 17 additions & 7 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ raft_node_instance::raft_node_instance(
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval)
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation)
: raft_node_instance(
id,
revision,
Expand All @@ -357,7 +358,8 @@ raft_node_instance::raft_node_instance(
std::move(leader_update_clb),
enable_longest_log_detection,
std::move(election_timeout),
std::move(heartbeat_interval)) {}
std::move(heartbeat_interval),
with_offset_translation) {}

raft_node_instance::raft_node_instance(
model::node_id id,
Expand All @@ -368,7 +370,8 @@ raft_node_instance::raft_node_instance(
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval)
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -391,7 +394,8 @@ raft_node_instance::raft_node_instance(
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection)
, _election_timeout(std::move(election_timeout))
, _heartbeat_interval(std::move(heartbeat_interval)) {
, _heartbeat_interval(std::move(heartbeat_interval))
, _with_offset_translation(with_offset_translation) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

Expand Down Expand Up @@ -424,7 +428,11 @@ raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
co_await _storage.invoke_on_all(&storage::api::start);
storage::ntp_config ntp_cfg(ntp(), _base_directory);

auto log = co_await _storage.local().log_mgr().manage(std::move(ntp_cfg));
auto log = co_await _storage.local().log_mgr().manage(
std::move(ntp_cfg),
test_group,
_with_offset_translation ? model::offset_translator_batch_types()
: std::vector<model::record_batch_type>{});

_raft = ss::make_lw_shared<consensus>(
_id,
Expand Down Expand Up @@ -591,7 +599,8 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) {
},
_enable_longest_log_detection,
_election_timeout.bind(),
_heartbeat_interval.bind());
_heartbeat_interval.bind(),
_with_offset_translation);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -613,7 +622,8 @@ raft_node_instance& raft_fixture::add_node(
},
_enable_longest_log_detection,
_election_timeout.bind(),
_heartbeat_interval.bind());
_heartbeat_interval.bind(),
_with_offset_translation);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
10 changes: 8 additions & 2 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval);
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation = false);

raft_node_instance(
model::node_id id,
Expand All @@ -168,7 +169,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval);
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation = false);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -265,6 +267,7 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
bool _enable_longest_log_detection;
config::binding<std::chrono::milliseconds> _election_timeout;
config::binding<std::chrono::milliseconds> _heartbeat_interval;
bool _with_offset_translation;
};

class raft_fixture
Expand Down Expand Up @@ -530,6 +533,8 @@ class raft_fixture
_heartbeat_interval.update(std::move(timeout));
}

void enable_offset_translation() { _with_offset_translation = true; }

protected:
class raft_not_leader_exception : std::exception {};

Expand Down Expand Up @@ -561,6 +566,7 @@ class raft_fixture
std::optional<leader_update_clb_t> _leader_clb;
config::mock_property<std::chrono::milliseconds> _election_timeout{500ms};
config::mock_property<std::chrono::milliseconds> _heartbeat_interval{50ms};
bool _with_offset_translation = false;
};

template<class... STM>
Expand Down

0 comments on commit f4c472f

Please sign in to comment.