Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for pause_at_block functionality #1056

Merged
merged 9 commits into from
Dec 9, 2024
82 changes: 58 additions & 24 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ struct controller_impl {
const chain_id_type chain_id; // read by thread_pool threads, value will not be changed
bool replaying = false;
bool is_producer_node = false; // true if node is configured as a block producer
block_num_type pause_at_block_num = std::numeric_limits<block_num_type>::max();
const db_read_mode read_mode;
bool in_trx_requiring_checks = false; ///< if true, checks that are normally skipped on replay (e.g. auth checks) cannot be skipped
std::optional<fc::microseconds> subjective_cpu_leeway;
Expand Down Expand Up @@ -1357,9 +1358,9 @@ struct controller_impl {

// When in IRREVERSIBLE mode fork_db blocks are applied and marked valid when they become irreversible
template<typename ForkDB, typename BSP>
bool apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
controller::apply_blocks_result apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) {
if (read_mode != db_read_mode::IRREVERSIBLE)
return true; // ignore
return controller::apply_blocks_result::complete; // ignore
if constexpr (std::is_same_v<block_state_legacy_ptr, std::decay_t<decltype(bsp)>>) {
// before transition to savanna
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
Expand All @@ -1375,22 +1376,23 @@ struct controller_impl {
return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{});
}
// only called during transition when not a proper savanna block
return fork_db_.apply_l<bool>([&](const auto& fork_db_l) {
return fork_db_.apply_l<controller::apply_blocks_result>([&](const auto& fork_db_l) {
block_state_legacy_ptr legacy = fork_db_l.get_block(bsp->id());
fork_db_.switch_to(fork_database::in_use_t::legacy); // apply block uses to know what types to create
block_state_ptr prev = fork_db.get_block(legacy->previous(), include_root_t::yes);
assert(prev);
if( apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{}) ) {
controller::apply_blocks_result r = apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{});
if( r == controller::apply_blocks_result::complete) {
fc::scoped_exit<std::function<void()>> e([&]{fork_db_.switch_to(fork_database::in_use_t::both);});
// irreversible apply was just done, calculate new_valid here instead of in transition_to_savanna()
assert(legacy->action_mroot_savanna);
transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev);
return true;
return r;
}
// add to fork_db as it expects root != head
transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev);
fork_db_.switch_to(fork_database::in_use_t::legacy);
return false;
return r;
});
}
}
Expand Down Expand Up @@ -1573,7 +1575,7 @@ struct controller_impl {
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) {
if (!apply_irreversible_block(fork_db, *bitr))
if (apply_irreversible_block(fork_db, *bitr) != controller::apply_blocks_result::complete)
break;

emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()), __FILE__, __LINE__ );
Expand Down Expand Up @@ -3681,13 +3683,16 @@ struct controller_impl {
}

template<class BSP>
bool apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
controller::apply_blocks_result apply_block( const BSP& bsp, controller::block_status s,
const trx_meta_cache_lookup& trx_lookup ) {
try {
try {
if (should_terminate()) {
shutdown();
return false;
return controller::apply_blocks_result::incomplete;
}
if (should_pause()) {
return controller::apply_blocks_result::paused;
}

auto start = fc::time_point::now(); // want to report total time of applying a block
Expand Down Expand Up @@ -3845,7 +3850,7 @@ struct controller_impl {

commit_block(s);

return true;
return controller::apply_blocks_result::complete;
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
Expand Down Expand Up @@ -4317,7 +4322,7 @@ struct controller_impl {

BSP bsp = std::make_shared<typename BSP::element_type>(*head, b, protocol_features.get_protocol_feature_set(), validator, skip_validate_signee);

if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{})) {
if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{}) == controller::apply_blocks_result::complete) {
// On replay, log_irreversible is not called and so no irreversible_block signal is emitted.
// So emit it explicitly here.
emit( irreversible_block, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ );
Expand All @@ -4341,7 +4346,7 @@ struct controller_impl {

log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
return should_pause() ? controller::apply_blocks_result::paused : controller::apply_blocks_result::complete;
} catch (fc::exception& e) {
if (e.code() != interrupt_exception::code_value) {
wlog("${d}", ("d",e.to_detail_string()));
Expand Down Expand Up @@ -4403,20 +4408,26 @@ struct controller_impl {
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork) { // always complete a switch fork
if (!applied || check_shutdown()) {
result = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork) {
if (check_shutdown()) {
shutdown();
break; // result should be complete since we are shutting down
}
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
result = controller::apply_blocks_result::incomplete; // doesn't really matter since we are shutting down
break;
}
if (result == controller::apply_blocks_result::complete) {
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend();
if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
break;
}
}
}
if (result != controller::apply_blocks_result::complete) {
break;
}
} catch ( const std::bad_alloc& ) {
throw;
Expand Down Expand Up @@ -4940,6 +4951,20 @@ struct controller_impl {
return should_terminate(chain_head.block_num());
}

bool should_pause() const {
if (chain_head.block_num() == pause_at_block_num) {
// when paused new blocks can come in which causes check if we are still paused, do not spam the log
static fc::time_point log_time;
fc::time_point now = fc::time_point::now();
if (log_time < now - fc::seconds(1)) {
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
ilog("Pausing at block #${b}", ("b", pause_at_block_num));
log_time = now;
}
return true;
}
return false;
}

bool is_builtin_activated( builtin_protocol_feature_t f )const {
uint32_t current_block_num = chain_head.block_num();

Expand Down Expand Up @@ -6032,6 +6057,15 @@ bool controller::is_producer_node()const {
return my->is_producer_node;
}

void controller::set_pause_at_block_num(block_num_type block_num) {
my->pause_at_block_num = block_num;
}

block_num_type controller::get_pause_at_block_num()const {
return my->pause_at_block_num;
}


void controller::set_db_read_only_mode() {
mutable_db().set_read_only_mode();
}
Expand Down
8 changes: 6 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ namespace eosio::chain {

/// Apply any blocks that are ready from the fork_db
enum class apply_blocks_result {
complete, // all ready blocks in forkdb have been applied
incomplete // time limit reached, additional blocks may be available in forkdb to process
complete, // all ready blocks in forkdb have been applied
incomplete, // time limit reached, additional blocks may be available in forkdb to process
paused // apply blocks currently paused
};
apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

Expand Down Expand Up @@ -479,6 +480,9 @@ namespace eosio::chain {
void set_producer_node(bool is_producer_node);
bool is_producer_node()const; // thread safe, set at program initialization

void set_pause_at_block_num(block_num_type block_num);
block_num_type get_pause_at_block_num()const;

void set_db_read_only_mode();
void unset_db_read_only_mode();
void init_thread_local_data();
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ namespace eosio { namespace chain {
3170015, "Invalid snapshot request" )
FC_DECLARE_DERIVED_EXCEPTION( snapshot_execution_exception, producer_exception,
3170016, "Snapshot execution exception" )
FC_DECLARE_DERIVED_EXCEPTION( invalid_pause_at_block_request, producer_exception,
3170017, "Invalid pause at block request" )

FC_DECLARE_DERIVED_EXCEPTION( reversible_blocks_exception, chain_exception,
3180000, "Reversible Blocks exception" )
Expand Down
26 changes: 3 additions & 23 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ namespace eosio {
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
void start_expire_timer();
void start_monitors();
void process_blocks();

void expire();
/** \name Peer Timestamps
Expand Down Expand Up @@ -3774,34 +3773,15 @@ namespace eosio {

if (fork_db_add_result == fork_db_add_t::appended_to_head || fork_db_add_result == fork_db_add_t::fork_switch) {
++c->unique_blocks_rcvd_count;
fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num()));
my_impl->process_blocks();

fc_dlog(logger, "post process_incoming_block to app thread, block ${n}", ("n", ptr->block_num()));
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
my_impl->producer_plug->process_blocks();

// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block(block_num, fork_db_add_result);
}
});
}

void net_plugin_impl::process_blocks() {
auto process_incoming_blocks = [](auto self) -> void {
try {
auto r = my_impl->producer_plug->on_incoming_block();
if (r == controller::apply_blocks_result::incomplete) {
app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() {
self(self);
});
}
} catch (...) {} // errors on applied blocks logged in controller
};

app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write,
[process_incoming_blocks]() {
process_incoming_blocks(process_incoming_blocks);
});
}

// thread safe
void net_plugin_impl::start_expire_timer() {
fc::lock_guard g( expire_timer_mtx );
Expand Down Expand Up @@ -4517,7 +4497,7 @@ namespace eosio {
// peers configured. This is a bit of a hack for Spring 1.0.0 until we can add a proper
// pause-at-block (issue #570) which could be used to explicitly request a node to not process beyond
// a specified block.
my_impl->process_blocks();
my_impl->producer_plug->process_blocks();
}
}

Expand Down
5 changes: 4 additions & 1 deletion plugins/producer_api_plugin/producer_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ void producer_api_plugin::plugin_startup() {
app().get_plugin<http_plugin>().add_api({
CALL_WITH_400(producer, producer_rw, producer, pause,
INVOKE_V_V(producer, pause), 201),
CALL_WITH_400(producer, producer_rw, producer, resume,
// TODO: Enable for Spring 2.0.0
// CALL_WITH_400(producer, producer_rw, producer, pause_at_block,
// INVOKE_V_R(producer, pause_at_block, producer_plugin::pause_at_block_params), 201),
CALL_WITH_400(producer, producer_rw, producer, resume,
INVOKE_V_V(producer, resume), 201),
CALL_WITH_400(producer, producer_rw, producer, update_runtime_options,
INVOKE_V_R(producer, update_runtime_options, producer_plugin::runtime_options), 201),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ class producer_plugin : public appbase::plugin<producer_plugin> {

controller::apply_blocks_result on_incoming_block();

struct pause_at_block_params {
chain::block_num_type block_num{0}; // block height to pause block evaluation/production
};

void pause();
void pause_at_block(const pause_at_block_params& params);
void resume();
bool paused() const;
void update_runtime_options(const runtime_options& options);
Expand Down Expand Up @@ -140,6 +145,9 @@ class producer_plugin : public appbase::plugin<producer_plugin> {

void log_failed_transaction(const transaction_id_type& trx_id, const chain::packed_transaction_ptr& packed_trx_ptr, const char* reason) const;

// initiate calls to process_incoming_block to process all queued blocks
void process_blocks();

// thread-safe, called when a new block is received
void received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result);

Expand Down Expand Up @@ -168,3 +176,4 @@ FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_result, (rows)(mo
FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_params, (lower_bound)(limit)(time_limit_ms))
FC_REFLECT(eosio::producer_plugin::unapplied_trx, (trx_id)(expiration)(trx_type)(first_auth)(first_receiver)(first_action)(total_actions)(billed_cpu_time_us)(size))
FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_result, (size)(incoming_size)(trxs)(more))
FC_REFLECT(eosio::producer_plugin::pause_at_block_params, (block_num));
40 changes: 40 additions & 0 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,21 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
}

void pause_at_block(block_num_type block_num) {
auto& chain = chain_plug->chain();

auto head = chain.head();
EOS_ASSERT(block_num > head.block_num(), invalid_pause_at_block_request,
"Pause at block ${bn} <= chain head ${h}", ("bn", block_num)("h", head.block_num()));

fc_ilog(_log, "Set pause at block #${bn}", ("bn", block_num));
chain.set_pause_at_block_num(block_num);
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
}

void resume() {
auto& chain = chain_plug->chain();
chain.set_pause_at_block_num(std::numeric_limits<block_num_type>::max());
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

_pause_production = false;
// reset vote received so production can be explicitly resumed, will pause again when received vote time limit hit again
if (_is_savanna_active)
Expand Down Expand Up @@ -1680,6 +1694,10 @@ void producer_plugin::pause() {
my->_pause_production = true;
}

void producer_plugin::pause_at_block(const pause_at_block_params& params) {
my->pause_at_block(params.block_num);
}

void producer_plugin::resume() {
my->resume();
}
Expand Down Expand Up @@ -2172,6 +2190,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
_time_tracker.clear(); // make sure we start tracking block time after `apply_blocks()`

block_handle head = chain.head();

if (head.block_num() == chain.get_pause_at_block_num())
return start_block_result::waiting_for_block;

fc::time_point now = fc::time_point::now();
block_timestamp_type block_time = calculate_pending_block_time();
producer_authority scheduled_producer = chain.head_active_producers(block_time).get_scheduled_producer(block_time);
Expand Down Expand Up @@ -2998,6 +3020,24 @@ void producer_plugin_impl::produce_block() {
_time_tracker.clear();
}

void producer_plugin::process_blocks() {
auto process_incoming_blocks = [this](auto self) -> void {
try {
auto r = on_incoming_block();
if (r == controller::apply_blocks_result::incomplete) {
app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() {
self(self);
});
}
} catch (...) {} // errors on applied blocks logged in controller
};

app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write,
[process_incoming_blocks]() {
process_incoming_blocks(process_incoming_blocks);
});
}

void producer_plugin::received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result) {
my->_received_block = block_num;
// fork_db_add_t::fork_switch means head block of best fork (different from the current branch) is received.
Expand Down
Loading