Skip to content

Commit

Permalink
Add an API to schedule snapshot creation (#483)
Browse files Browse the repository at this point in the history
* Added `schedule_snapshot_creation()` API to manually create a
snapshot.

* Unlike `create_snapshot()`, if snapshot creation is already in
progress, it will wait and create another snapshot on the next
available log index number.
  • Loading branch information
greensky00 authored Feb 13, 2024
1 parent 843efd3 commit b54b09c
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 7 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ We welcome contributions. If you find any bugs, potential flaws and edge cases,

Contact
-------
* Jung-Sang Ahn <[email protected]>
* Gene Zhang <[email protected]>
* Jung-Sang Ahn <[email protected]>


License Information
Expand Down
30 changes: 29 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,27 @@ public:
* Manually create a snapshot based on the latest committed
* log index of the state machine.
*
* Note that snapshot creation will fail immediately if the previous
* snapshot task is still running.
*
* @return Log index number of the created snapshot or`0` if failed.
*/
ulong create_snapshot();

/**
* Manually and asynchronously create a snapshot on the next earliest
* available commited log index.
*
* Unlike `create_snapshot`, if the previous snapshot task is running,
* it will wait until the previous task is done. Once the snapshot
* creation is finished, it will be notified via the returned
* `cmd_result` with the log index number of the snapshot.
*
* @return `cmd_result` instance.
* `nullptr` if there is already a scheduled snapshot creation.
*/
ptr< cmd_result<uint64_t> > schedule_snapshot_creation();

/**
* Get the log index number of the last snapshot.
*
Expand Down Expand Up @@ -954,7 +971,8 @@ protected:
void invite_srv_to_join_cluster();
void rm_srv_from_cluster(int32 srv_id);
int get_snapshot_sync_block_size() const;
void on_snapshot_completed(ptr<snapshot>& s,
void on_snapshot_completed(ptr<snapshot> s,
ptr<cmd_result<uint64_t>> manual_creation_cb,
bool result,
ptr<std::exception>& err);
void on_log_compacted(ulong log_idx,
Expand Down Expand Up @@ -1247,6 +1265,16 @@ protected:
*/
std::atomic<bool> snp_in_progress_;

/**
* `true` if a manual snapshot creation is scheduled by the user.
*/
std::atomic<bool> snp_creation_scheduled_;

/**
* Non-null if a manual snapshot creation is cheduled by the user.
*/
ptr< cmd_result<uint64_t> > sched_snp_creation_result_;

/**
* (Read-only, but its contents will change)
* Server context.
Expand Down
42 changes: 40 additions & 2 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,18 @@ ulong raft_server::create_snapshot() {
return snapshot_and_compact(committed_idx, true) ? committed_idx : 0;
}

ptr< cmd_result<uint64_t> > raft_server::schedule_snapshot_creation() {
bool exp = false;
if (!snp_creation_scheduled_.compare_exchange_strong(exp, true)) {
p_wn("snapshot creation is already scheduled");
return nilptr;
}

sched_snp_creation_result_ = cs_new<cmd_result<uint64_t>>();
p_in("schedule snapshot creation");
return sched_snp_creation_result_;
}

ulong raft_server::get_last_snapshot_idx() const {
std::lock_guard<std::mutex> l(last_snapshot_lock_);
return last_snapshot_ ? last_snapshot_->get_last_log_idx(): 0;
Expand All @@ -494,7 +506,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
snapshot_distance = first_snapshot_distance_;
}

if (!forced_creation) {
if (!forced_creation && !snp_creation_scheduled_) {
// If `forced_creation == true`, ignore below conditions.
if ( params->snapshot_distance_ == 0 ||
( committed_idx - log_store_->start_index() + 1 ) < snapshot_distance ) {
Expand Down Expand Up @@ -522,6 +534,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
}

if ( ( forced_creation ||
snp_creation_scheduled_ ||
!local_snp ||
committed_idx >= snapshot_distance + local_snp->get_last_log_idx() ) &&
snp_in_progress_.compare_exchange_strong(f, true) )
Expand All @@ -546,6 +559,14 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
return false;
}

ptr<cmd_result<uint64_t>> manual_creation_cb = nullptr;
if (snp_creation_scheduled_) {
// User scheduled a new snapshot creation.
// Due to `snp_in_progress_` it will happen only once.
manual_creation_cb = sched_snp_creation_result_;
p_in("snapshot creation is scheduled by user");
}

while ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() >= log_store_->start_index() ) {
ptr<log_entry> conf_log
Expand Down Expand Up @@ -593,6 +614,7 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
std::bind( &raft_server::on_snapshot_completed,
this,
new_snapshot,
manual_creation_cb,
std::placeholders::_1,
std::placeholders::_2 );
timer_helper tt;
Expand All @@ -618,7 +640,10 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
}

void raft_server::on_snapshot_completed
( ptr<snapshot>& s, bool result, ptr<std::exception>& err )
( ptr<snapshot> s,
ptr<cmd_result<uint64_t>> manual_creation_cb,
bool result,
ptr<std::exception>& err )
{
do { // Dummy loop
if (err != nilptr) {
Expand Down Expand Up @@ -660,6 +685,19 @@ void raft_server::on_snapshot_completed
}
} while (false);

if (manual_creation_cb.get()) {
// This was a manual request scheduled by the user.
uint64_t idx = 0;
cmd_result_code code = cmd_result_code::FAILED;
if (err == nilptr && result) {
idx = s->get_last_log_idx();
code = cmd_result_code::OK;
}
manual_creation_cb->set_result(idx, err, code);
sched_snp_creation_result_.reset();
snp_creation_scheduled_ = false;
}

snp_in_progress_.store(false);
}

Expand Down
16 changes: 14 additions & 2 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ raft_server::raft_server(context* ctx, const init_options& opt)
, serving_req_(false)
, steps_to_down_(0)
, snp_in_progress_(false)
, snp_creation_scheduled_(false)
, sched_snp_creation_result_(nullptr)
, ctx_(ctx)
, scheduler_(ctx->scheduler_)
, election_exec_(std::bind(&raft_server::handle_election_timeout, this))
Expand Down Expand Up @@ -1120,14 +1122,17 @@ void raft_server::check_leadership_transfer() {
ptr<raft_params> params = ctx_->get_params();
if (!params->leadership_transfer_min_wait_time_) {
// Transferring leadership is disabled.
p_tr("leadership transfer is disabled");
return;
}
if (!leadership_transfer_timer_.timeout()) {
// Leadership period is too short.
p_tr("leadership period is too short: %zu ms",
leadership_transfer_timer_.get_duration_us() / 1000);
return;
}

size_t hb_interval_ms = ctx_->get_params()->heart_beat_interval_;
size_t election_lower = ctx_->get_params()->election_timeout_lower_bound_;

recur_lock(lock_);

Expand All @@ -1146,24 +1151,31 @@ void raft_server::check_leadership_transfer() {
if (peer_elem->get_matched_idx() + params->stale_log_gap_ <
cur_commit_idx) {
// This peer is lagging behind.
p_tr("peer %d is lagging behind, %lu < %lu",
s_conf.get_id(), peer_elem->get_matched_idx(),
cur_commit_idx);
return;
}

uint64_t last_resp_ms = peer_elem->get_resp_timer_us() / 1000;
if (last_resp_ms > hb_interval_ms) {
if (last_resp_ms > election_lower) {
// This replica is not responding.
p_tr("peer %d is not responding, %lu ms ago",
s_conf.get_id(), last_resp_ms);
return;
}
}

if (my_priority_ >= max_priority || successor_id == -1) {
// This leader already has the highest priority.
p_tr("my priority %d is already the highest", my_priority_);
return;
}

if (!state_machine_->allow_leadership_transfer()) {
// Although all conditions are met,
// user does not want to transfer the leadership.
p_tr("state machine does not allow leadership transfer");
return;
}

Expand Down
107 changes: 107 additions & 0 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,110 @@ int snapshot_creation_index_inversion_test() {
return 0;
}

int snapshot_scheduled_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Append a message using separate thread.
ExecArgs exec_args(&s1);
TestSuite::ThreadHolder hh(&exec_args, fake_executer, fake_executer_killer);

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

const size_t NUM = 5;

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
for (size_t ii = 0; ii < NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

s1.fNet->execReqResp(); // replication.
s1.fNet->execReqResp(); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Manually create a snapshot.
uint64_t log_idx = s1.raftServer->create_snapshot();
CHK_GT(log_idx, 0);

// Schedule snapshot creation and wait 500ms, there shouldn't be any progress.
auto sched_ret = s1.raftServer->schedule_snapshot_creation();
TestSuite::sleep_ms(500, "wait for async snapshot creation");
CHK_FALSE(sched_ret->has_result());

uint64_t last_idx = s1.raftServer->get_last_log_idx();

// Append more messages asynchronously.
for (size_t ii = NUM; ii < NUM * 2; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

s1.fNet->execReqResp(); // replication.
s1.fNet->execReqResp(); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Now it should have the result.
CHK_TRUE(sched_ret->has_result());
CHK_EQ(last_idx + 1, sched_ret->get());

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();

fake_executer_killer(&exec_args);
hh.join();
CHK_Z( hh.getResult() );

f_base->destroy();

return 0;
}

int snapshot_randomized_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -3387,6 +3491,9 @@ int main(int argc, char** argv) {
ts.doTest( "snapshot creation index inversion test",
snapshot_creation_index_inversion_test );

ts.doTest( "snapshot scheduled creation test",
snapshot_scheduled_creation_test );

ts.doTest( "snapshot randomized creation test",
snapshot_randomized_creation_test );

Expand Down

0 comments on commit b54b09c

Please sign in to comment.