Skip to content

Commit

Permalink
Merge pull request #1056 from AntelopeIO/GH-570-pause-at-block
Browse files Browse the repository at this point in the history
Refactor for pause_at_block functionality
  • Loading branch information
heifner authored Dec 9, 2024
2 parents 631fed0 + 4d909e5 commit 3b4eaa6
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 50 deletions.
82 changes: 58 additions & 24 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ struct controller_impl {
const chain_id_type chain_id; // read by thread_pool threads, value will not be changed
bool replaying = false;
bool is_producer_node = false; // true if node is configured as a block producer
block_num_type pause_at_block_num = std::numeric_limits<block_num_type>::max();
const db_read_mode read_mode;
bool in_trx_requiring_checks = false; ///< if true, checks that are normally skipped on replay (e.g. auth checks) cannot be skipped
std::optional<fc::microseconds> subjective_cpu_leeway;
Expand Down Expand Up @@ -1357,9 +1358,9 @@ struct controller_impl {

// When in IRREVERSIBLE mode fork_db blocks are applied and marked valid when they become irreversible
template<typename ForkDB, typename BSP>
bool apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
controller::apply_blocks_result apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
if (read_mode != db_read_mode::IRREVERSIBLE)
return true; // ignore
return controller::apply_blocks_result::complete; // ignore
if constexpr (std::is_same_v<block_state_legacy_ptr, std::decay_t<decltype(bsp)>>) {
// before transition to savanna
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
Expand All @@ -1375,22 +1376,23 @@ struct controller_impl {
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
}
// only called during transition when not a proper savanna block
return fork_db_.apply_l<bool>([&](const auto& fork_db_l) {
return fork_db_.apply_l<controller::apply_blocks_result>([&](const auto& fork_db_l) {
block_state_legacy_ptr legacy = fork_db_l.get_block(bsp->id());
fork_db_.switch_to(fork_database::in_use_t::legacy); // apply block uses to know what types to create
block_state_ptr prev = fork_db.get_block(legacy->previous(), include_root_t::yes);
assert(prev);
if( apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{}) ) {
controller::apply_blocks_result r = apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{});
if( r == controller::apply_blocks_result::complete) {
fc::scoped_exit<std::function<void()>> e([&]{fork_db_.switch_to(fork_database::in_use_t::both);});
// irreversible apply was just done, calculate new_valid here instead of in transition_to_savanna()
assert(legacy->action_mroot_savanna);
transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev);
return true;
return r;
}
// add to fork_db as it expects root != head
transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev);
fork_db_.switch_to(fork_database::in_use_t::legacy);
return false;
return r;
});
}
}
Expand Down Expand Up @@ -1573,7 +1575,7 @@ struct controller_impl {
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) {
if (!apply_irreversible_block(fork_db, *bitr))
if (apply_irreversible_block(fork_db, *bitr) != controller::apply_blocks_result::complete)
break;

emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()), __FILE__, __LINE__ );
Expand Down Expand Up @@ -3681,13 +3683,16 @@ struct controller_impl {
}

template<class BSP>
bool apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
controller::apply_blocks_result apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
try {
try {
if (should_terminate()) {
shutdown();
return false;
return controller::apply_blocks_result::incomplete;
}
if (should_pause()) {
return controller::apply_blocks_result::paused;
}

auto start = fc::time_point::now(); // want to report total time of applying a block
Expand Down Expand Up @@ -3845,7 +3850,7 @@ struct controller_impl {

commit_block(s);

return true;
return controller::apply_blocks_result::complete;
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
Expand Down Expand Up @@ -4317,7 +4322,7 @@ struct controller_impl {

BSP bsp = std::make_shared<typename BSP::element_type>(*head, b, protocol_features.get_protocol_feature_set(), validator, skip_validate_signee);

if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{})) {
if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{}) == controller::apply_blocks_result::complete) {
// On replay, log_irreversible is not called and so no irreversible_block signal is emitted.
// So emit it explicitly here.
emit( irreversible_block, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ );
Expand All @@ -4341,7 +4346,7 @@ struct controller_impl {

log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
return should_pause() ? controller::apply_blocks_result::paused : controller::apply_blocks_result::complete;
} catch (fc::exception& e) {
if (e.code() != interrupt_exception::code_value) {
wlog("${d}", ("d",e.to_detail_string()));
Expand Down Expand Up @@ -4403,20 +4408,26 @@ struct controller_impl {
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork) { // always complete a switch fork
if (!applied || check_shutdown()) {
result = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork) {
if (check_shutdown()) {
shutdown();
break; // result should be complete since we are shutting down
}
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
result = controller::apply_blocks_result::incomplete; // doesn't really matter since we are shutting down
break;
}
if (result == controller::apply_blocks_result::complete) {
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
break;
}
}
}
if (result != controller::apply_blocks_result::complete) {
break;
}
} catch ( const std::bad_alloc& ) {
throw;
Expand Down Expand Up @@ -4940,6 +4951,20 @@ struct controller_impl {
return should_terminate(chain_head.block_num());
}

bool should_pause() const {
if (chain_head.block_num() == pause_at_block_num) {
// when paused new blocks can come in which causes check if we are still paused, do not spam the log
static fc::time_point log_time;
fc::time_point now = fc::time_point::now();
if (log_time < now - fc::seconds(1)) {
ilog("Pausing at block #${b}", ("b", pause_at_block_num));
log_time = now;
}
return true;
}
return false;
}

bool is_builtin_activated( builtin_protocol_feature_t f )const {
uint32_t current_block_num = chain_head.block_num();

Expand Down Expand Up @@ -6032,6 +6057,15 @@ bool controller::is_producer_node()const {
return my->is_producer_node;
}

void controller::set_pause_at_block_num(block_num_type block_num) {
my->pause_at_block_num = block_num;
}

block_num_type controller::get_pause_at_block_num()const {
return my->pause_at_block_num;
}


void controller::set_db_read_only_mode() {
mutable_db().set_read_only_mode();
}
Expand Down
8 changes: 6 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ namespace eosio::chain {

/// Apply any blocks that are ready from the fork_db
enum class apply_blocks_result {
complete, // all ready blocks in forkdb have been applied
incomplete // time limit reached, additional blocks may be available in forkdb to process
complete, // all ready blocks in forkdb have been applied
incomplete, // time limit reached, additional blocks may be available in forkdb to process
paused // apply blocks currently paused
};
apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

Expand Down Expand Up @@ -479,6 +480,9 @@ namespace eosio::chain {
void set_producer_node(bool is_producer_node);
bool is_producer_node()const; // thread safe, set at program initialization

void set_pause_at_block_num(block_num_type block_num);
block_num_type get_pause_at_block_num()const;

void set_db_read_only_mode();
void unset_db_read_only_mode();
void init_thread_local_data();
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ namespace eosio { namespace chain {
3170015, "Invalid snapshot request" )
FC_DECLARE_DERIVED_EXCEPTION( snapshot_execution_exception, producer_exception,
3170016, "Snapshot execution exception" )
FC_DECLARE_DERIVED_EXCEPTION( invalid_pause_at_block_request, producer_exception,
3170017, "Invalid pause at block request" )

FC_DECLARE_DERIVED_EXCEPTION( reversible_blocks_exception, chain_exception,
3180000, "Reversible Blocks exception" )
Expand Down
26 changes: 3 additions & 23 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ namespace eosio {
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
void start_expire_timer();
void start_monitors();
void process_blocks();

void expire();
/** \name Peer Timestamps
Expand Down Expand Up @@ -3774,34 +3773,15 @@ namespace eosio {

if (fork_db_add_result == fork_db_add_t::appended_to_head || fork_db_add_result == fork_db_add_t::fork_switch) {
++c->unique_blocks_rcvd_count;
fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num()));
my_impl->process_blocks();

fc_dlog(logger, "post process_incoming_block to app thread, block ${n}", ("n", ptr->block_num()));
my_impl->producer_plug->process_blocks();

// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block(block_num, fork_db_add_result);
}
});
}

void net_plugin_impl::process_blocks() {
auto process_incoming_blocks = [](auto self) -> void {
try {
auto r = my_impl->producer_plug->on_incoming_block();
if (r == controller::apply_blocks_result::incomplete) {
app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() {
self(self);
});
}
} catch (...) {} // errors on applied blocks logged in controller
};

app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write,
[process_incoming_blocks]() {
process_incoming_blocks(process_incoming_blocks);
});
}

// thread safe
void net_plugin_impl::start_expire_timer() {
fc::lock_guard g( expire_timer_mtx );
Expand Down Expand Up @@ -4517,7 +4497,7 @@ namespace eosio {
// peers configured. This is a bit of a hack for Spring 1.0.0 until we can add a proper
// pause-at-block (issue #570) which could be used to explicitly request a node to not process beyond
// a specified block.
my_impl->process_blocks();
my_impl->producer_plug->process_blocks();
}
}

Expand Down
5 changes: 4 additions & 1 deletion plugins/producer_api_plugin/producer_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ void producer_api_plugin::plugin_startup() {
app().get_plugin<http_plugin>().add_api({
CALL_WITH_400(producer, producer_rw, producer, pause,
INVOKE_V_V(producer, pause), 201),
CALL_WITH_400(producer, producer_rw, producer, resume,
// TODO: Enable for Spring 2.0.0
// CALL_WITH_400(producer, producer_rw, producer, pause_at_block,
// INVOKE_V_R(producer, pause_at_block, producer_plugin::pause_at_block_params), 201),
CALL_WITH_400(producer, producer_rw, producer, resume,
INVOKE_V_V(producer, resume), 201),
CALL_WITH_400(producer, producer_rw, producer, update_runtime_options,
INVOKE_V_R(producer, update_runtime_options, producer_plugin::runtime_options), 201),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ class producer_plugin : public appbase::plugin<producer_plugin> {

controller::apply_blocks_result on_incoming_block();

struct pause_at_block_params {
chain::block_num_type block_num{0}; // block height to pause block evaluation/production
};

void pause();
void pause_at_block(const pause_at_block_params& params);
void resume();
bool paused() const;
void update_runtime_options(const runtime_options& options);
Expand Down Expand Up @@ -140,6 +145,9 @@ class producer_plugin : public appbase::plugin<producer_plugin> {

void log_failed_transaction(const transaction_id_type& trx_id, const chain::packed_transaction_ptr& packed_trx_ptr, const char* reason) const;

// initiate calls to process_incoming_block to process all queued blocks
void process_blocks();

// thread-safe, called when a new block is received
void received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result);

Expand Down Expand Up @@ -168,3 +176,4 @@ FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_result, (rows)(mo
FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_params, (lower_bound)(limit)(time_limit_ms))
FC_REFLECT(eosio::producer_plugin::unapplied_trx, (trx_id)(expiration)(trx_type)(first_auth)(first_receiver)(first_action)(total_actions)(billed_cpu_time_us)(size))
FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_result, (size)(incoming_size)(trxs)(more))
FC_REFLECT(eosio::producer_plugin::pause_at_block_params, (block_num));
40 changes: 40 additions & 0 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,21 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
}

void pause_at_block(block_num_type block_num) {
auto& chain = chain_plug->chain();

auto head = chain.head();
EOS_ASSERT(block_num > head.block_num(), invalid_pause_at_block_request,
"Pause at block ${bn} <= chain head ${h}", ("bn", block_num)("h", head.block_num()));

fc_ilog(_log, "Set pause at block #${bn}", ("bn", block_num));
chain.set_pause_at_block_num(block_num);
}

void resume() {
auto& chain = chain_plug->chain();
chain.set_pause_at_block_num(std::numeric_limits<block_num_type>::max());

_pause_production = false;
// reset vote received so production can be explicitly resumed, will pause again when received vote time limit hit again
if (_is_savanna_active)
Expand Down Expand Up @@ -1680,6 +1694,10 @@ void producer_plugin::pause() {
my->_pause_production = true;
}

void producer_plugin::pause_at_block(const pause_at_block_params& params) {
my->pause_at_block(params.block_num);
}

void producer_plugin::resume() {
my->resume();
}
Expand Down Expand Up @@ -2172,6 +2190,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
_time_tracker.clear(); // make sure we start tracking block time after `apply_blocks()`

block_handle head = chain.head();

if (head.block_num() == chain.get_pause_at_block_num())
return start_block_result::waiting_for_block;

fc::time_point now = fc::time_point::now();
block_timestamp_type block_time = calculate_pending_block_time();
producer_authority scheduled_producer = chain.head_active_producers(block_time).get_scheduled_producer(block_time);
Expand Down Expand Up @@ -2998,6 +3020,24 @@ void producer_plugin_impl::produce_block() {
_time_tracker.clear();
}

void producer_plugin::process_blocks() {
auto process_incoming_blocks = [this](auto self) -> void {
try {
auto r = on_incoming_block();
if (r == controller::apply_blocks_result::incomplete) {
app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() {
self(self);
});
}
} catch (...) {} // errors on applied blocks logged in controller
};

app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write,
[process_incoming_blocks]() {
process_incoming_blocks(process_incoming_blocks);
});
}

void producer_plugin::received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result) {
my->_received_block = block_num;
// fork_db_add_t::fork_switch means head block of best fork (different from the current branch) is received.
Expand Down

0 comments on commit 3b4eaa6

Please sign in to comment.