diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index fe873e4530..f94e2427f5 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -992,6 +992,7 @@ struct controller_impl { const chain_id_type chain_id; // read by thread_pool threads, value will not be changed bool replaying = false; bool is_producer_node = false; // true if node is configured as a block producer + block_num_type pause_at_block_num = std::numeric_limits::max(); const db_read_mode read_mode; bool in_trx_requiring_checks = false; ///< if true, checks that are normally skipped on replay (e.g. auth checks) cannot be skipped std::optional subjective_cpu_leeway; @@ -1357,9 +1358,9 @@ struct controller_impl { // When in IRREVERSIBLE mode fork_db blocks are applied and marked valid when they become irreversible template - bool apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) { + controller::apply_blocks_result apply_irreversible_block(ForkDB& fork_db, const BSP& bsp) { if (read_mode != db_read_mode::IRREVERSIBLE) - return true; // ignore + return controller::apply_blocks_result::complete; // ignore if constexpr (std::is_same_v>) { // before transition to savanna return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{}); @@ -1375,22 +1376,23 @@ struct controller_impl { return apply_block(bsp, controller::block_status::complete, trx_meta_cache_lookup{}); } // only called during transition when not a proper savanna block - return fork_db_.apply_l([&](const auto& fork_db_l) { + return fork_db_.apply_l([&](const auto& fork_db_l) { block_state_legacy_ptr legacy = fork_db_l.get_block(bsp->id()); fork_db_.switch_to(fork_database::in_use_t::legacy); // apply block uses to know what types to create block_state_ptr prev = fork_db.get_block(legacy->previous(), include_root_t::yes); assert(prev); - if( apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{}) ) { + controller::apply_blocks_result r = apply_block(legacy, controller::block_status::complete, trx_meta_cache_lookup{}); + if( r == controller::apply_blocks_result::complete) { fc::scoped_exit> e([&]{fork_db_.switch_to(fork_database::in_use_t::both);}); // irreversible apply was just done, calculate new_valid here instead of in transition_to_savanna() assert(legacy->action_mroot_savanna); transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev); - return true; + return r; } // add to fork_db as it expects root != head transition_add_to_savanna_fork_db(fork_db, legacy, bsp, prev); fork_db_.switch_to(fork_database::in_use_t::legacy); - return false; + return r; }); } } @@ -1573,7 +1575,7 @@ struct controller_impl { auto it = v.begin(); for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) { - if (!apply_irreversible_block(fork_db, *bitr)) + if (apply_irreversible_block(fork_db, *bitr) != controller::apply_blocks_result::complete) break; emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()), __FILE__, __LINE__ ); @@ -3681,13 +3683,16 @@ struct controller_impl { } template - bool apply_block( const BSP& bsp, controller::block_status s, - const trx_meta_cache_lookup& trx_lookup ) { + controller::apply_blocks_result apply_block( const BSP& bsp, controller::block_status s, + const trx_meta_cache_lookup& trx_lookup ) { try { try { if (should_terminate()) { shutdown(); - return false; + return controller::apply_blocks_result::incomplete; + } + if (should_pause()) { + return controller::apply_blocks_result::paused; } auto start = fc::time_point::now(); // want to report total time of applying a block @@ -3845,7 +3850,7 @@ struct controller_impl { commit_block(s); - return true; + return controller::apply_blocks_result::complete; } catch ( const std::bad_alloc& ) { throw; } catch ( const boost::interprocess::bad_alloc& ) { @@ -4317,7 +4322,7 @@ struct controller_impl { BSP bsp = std::make_shared(*head, b, protocol_features.get_protocol_feature_set(), validator, skip_validate_signee); - if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{})) { + if (apply_block(bsp, controller::block_status::irreversible, trx_meta_cache_lookup{}) == controller::apply_blocks_result::complete) { // On replay, log_irreversible is not called and so no irreversible_block signal is emitted. // So emit it explicitly here. emit( irreversible_block, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); @@ -4341,7 +4346,7 @@ struct controller_impl { log_irreversible(); transition_to_savanna_if_needed(); - return controller::apply_blocks_result::complete; + return should_pause() ? controller::apply_blocks_result::paused : controller::apply_blocks_result::complete; } catch (fc::exception& e) { if (e.code() != interrupt_exception::code_value) { wlog("${d}", ("d",e.to_detail_string())); @@ -4403,20 +4408,26 @@ struct controller_impl { auto except = std::exception_ptr{}; const auto& bsp = *ritr; try { - bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated - : controller::block_status::complete, trx_lookup ); - if (!switch_fork) { // always complete a switch fork - if (!applied || check_shutdown()) { + result = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated + : controller::block_status::complete, trx_lookup ); + if (!switch_fork) { + if (check_shutdown()) { shutdown(); - break; // result should be complete since we are shutting down - } - // Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected - // to call apply_blocks again if this returns incomplete. - const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend(); - if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) { - result = controller::apply_blocks_result::incomplete; + result = controller::apply_blocks_result::incomplete; // doesn't really matter since we are shutting down break; } + if (result == controller::apply_blocks_result::complete) { + // Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected + // to call apply_blocks again if this returns incomplete. + const bool more_blocks_to_process = ritr + 1 != new_head_branch.rend(); + if (!replaying && more_blocks_to_process && fc::time_point::now() - start_apply_blocks_loop > fc::milliseconds(500)) { + result = controller::apply_blocks_result::incomplete; + break; + } + } + } + if (result != controller::apply_blocks_result::complete) { + break; } } catch ( const std::bad_alloc& ) { throw; @@ -4940,6 +4951,20 @@ struct controller_impl { return should_terminate(chain_head.block_num()); } + bool should_pause() const { + if (chain_head.block_num() == pause_at_block_num) { + // when paused new blocks can come in which causes check if we are still paused, do not spam the log + static fc::time_point log_time; + fc::time_point now = fc::time_point::now(); + if (log_time < now - fc::seconds(1)) { + ilog("Pausing at block #${b}", ("b", pause_at_block_num)); + log_time = now; + } + return true; + } + return false; + } + bool is_builtin_activated( builtin_protocol_feature_t f )const { uint32_t current_block_num = chain_head.block_num(); @@ -6032,6 +6057,15 @@ bool controller::is_producer_node()const { return my->is_producer_node; } +void controller::set_pause_at_block_num(block_num_type block_num) { + my->pause_at_block_num = block_num; +} + +block_num_type controller::get_pause_at_block_num()const { + return my->pause_at_block_num; +} + + void controller::set_db_read_only_mode() { mutable_db().set_read_only_mode(); } diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index fc485486a0..d9ab86b993 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -244,8 +244,9 @@ namespace eosio::chain { /// Apply any blocks that are ready from the fork_db enum class apply_blocks_result { - complete, // all ready blocks in forkdb have been applied - incomplete // time limit reached, additional blocks may be available in forkdb to process + complete, // all ready blocks in forkdb have been applied + incomplete, // time limit reached, additional blocks may be available in forkdb to process + paused // apply blocks currently paused }; apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup); @@ -479,6 +480,9 @@ namespace eosio::chain { void set_producer_node(bool is_producer_node); bool is_producer_node()const; // thread safe, set at program initialization + void set_pause_at_block_num(block_num_type block_num); + block_num_type get_pause_at_block_num()const; + void set_db_read_only_mode(); void unset_db_read_only_mode(); void init_thread_local_data(); diff --git a/libraries/chain/include/eosio/chain/exceptions.hpp b/libraries/chain/include/eosio/chain/exceptions.hpp index 70e658d303..82f07a829e 100644 --- a/libraries/chain/include/eosio/chain/exceptions.hpp +++ b/libraries/chain/include/eosio/chain/exceptions.hpp @@ -609,6 +609,8 @@ namespace eosio { namespace chain { 3170015, "Invalid snapshot request" ) FC_DECLARE_DERIVED_EXCEPTION( snapshot_execution_exception, producer_exception, 3170016, "Snapshot execution exception" ) + FC_DECLARE_DERIVED_EXCEPTION( invalid_pause_at_block_request, producer_exception, + 3170017, "Invalid pause at block request" ) FC_DECLARE_DERIVED_EXCEPTION( reversible_blocks_exception, chain_exception, 3180000, "Reversible Blocks exception" ) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 0a49072766..57104faecf 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -457,7 +457,6 @@ namespace eosio { void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); void start_monitors(); - void process_blocks(); void expire(); /** \name Peer Timestamps @@ -3774,9 +3773,8 @@ namespace eosio { if (fork_db_add_result == fork_db_add_t::appended_to_head || fork_db_add_result == fork_db_add_t::fork_switch) { ++c->unique_blocks_rcvd_count; - fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num())); - my_impl->process_blocks(); - + fc_dlog(logger, "post process_incoming_block to app thread, block ${n}", ("n", ptr->block_num())); + my_impl->producer_plug->process_blocks(); // ready to process immediately, so signal producer to interrupt start_block my_impl->producer_plug->received_block(block_num, fork_db_add_result); @@ -3784,24 +3782,6 @@ namespace eosio { }); } - void net_plugin_impl::process_blocks() { - auto process_incoming_blocks = [](auto self) -> void { - try { - auto r = my_impl->producer_plug->on_incoming_block(); - if (r == controller::apply_blocks_result::incomplete) { - app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() { - self(self); - }); - } - } catch (...) {} // errors on applied blocks logged in controller - }; - - app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, - [process_incoming_blocks]() { - process_incoming_blocks(process_incoming_blocks); - }); - } - // thread safe void net_plugin_impl::start_expire_timer() { fc::lock_guard g( expire_timer_mtx ); @@ -4517,7 +4497,7 @@ namespace eosio { // peers configured. This is a bit of a hack for Spring 1.0.0 until we can add a proper // pause-at-block (issue #570) which could be used to explicitly request a node to not process beyond // a specified block. - my_impl->process_blocks(); + my_impl->producer_plug->process_blocks(); } } diff --git a/plugins/producer_api_plugin/producer_api_plugin.cpp b/plugins/producer_api_plugin/producer_api_plugin.cpp index 0439351f04..299676d32f 100644 --- a/plugins/producer_api_plugin/producer_api_plugin.cpp +++ b/plugins/producer_api_plugin/producer_api_plugin.cpp @@ -119,7 +119,10 @@ void producer_api_plugin::plugin_startup() { app().get_plugin().add_api({ CALL_WITH_400(producer, producer_rw, producer, pause, INVOKE_V_V(producer, pause), 201), - CALL_WITH_400(producer, producer_rw, producer, resume, + // TODO: Enable for Spring 2.0.0 + // CALL_WITH_400(producer, producer_rw, producer, pause_at_block, + // INVOKE_V_R(producer, pause_at_block, producer_plugin::pause_at_block_params), 201), + CALL_WITH_400(producer, producer_rw, producer, resume, INVOKE_V_V(producer, resume), 201), CALL_WITH_400(producer, producer_rw, producer, update_runtime_options, INVOKE_V_R(producer, update_runtime_options, producer_plugin::runtime_options), 201), diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index ffe586cff7..03302b7c33 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -83,7 +83,12 @@ class producer_plugin : public appbase::plugin { controller::apply_blocks_result on_incoming_block(); + struct pause_at_block_params { + chain::block_num_type block_num{0}; // block height to pause block evaluation/production + }; + void pause(); + void pause_at_block(const pause_at_block_params& params); void resume(); bool paused() const; void update_runtime_options(const runtime_options& options); @@ -140,6 +145,9 @@ class producer_plugin : public appbase::plugin { void log_failed_transaction(const transaction_id_type& trx_id, const chain::packed_transaction_ptr& packed_trx_ptr, const char* reason) const; + // initiate calls to process_incoming_block to process all queued blocks + void process_blocks(); + // thread-safe, called when a new block is received void received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result); @@ -168,3 +176,4 @@ FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_result, (rows)(mo FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_params, (lower_bound)(limit)(time_limit_ms)) FC_REFLECT(eosio::producer_plugin::unapplied_trx, (trx_id)(expiration)(trx_type)(first_auth)(first_receiver)(first_action)(total_actions)(billed_cpu_time_us)(size)) FC_REFLECT(eosio::producer_plugin::get_unapplied_transactions_result, (size)(incoming_size)(trxs)(more)) +FC_REFLECT(eosio::producer_plugin::pause_at_block_params, (block_num)); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index a85e87f9cf..ef62b642c6 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1161,7 +1161,21 @@ class producer_plugin_impl : public std::enable_shared_from_thischain(); + + auto head = chain.head(); + EOS_ASSERT(block_num > head.block_num(), invalid_pause_at_block_request, + "Pause at block ${bn} <= chain head ${h}", ("bn", block_num)("h", head.block_num())); + + fc_ilog(_log, "Set pause at block #${bn}", ("bn", block_num)); + chain.set_pause_at_block_num(block_num); + } + void resume() { + auto& chain = chain_plug->chain(); + chain.set_pause_at_block_num(std::numeric_limits::max()); + _pause_production = false; // reset vote received so production can be explicitly resumed, will pause again when received vote time limit hit again if (_is_savanna_active) @@ -1680,6 +1694,10 @@ void producer_plugin::pause() { my->_pause_production = true; } +void producer_plugin::pause_at_block(const pause_at_block_params& params) { + my->pause_at_block(params.block_num); +} + void producer_plugin::resume() { my->resume(); } @@ -2172,6 +2190,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { _time_tracker.clear(); // make sure we start tracking block time after `apply_blocks()` block_handle head = chain.head(); + + if (head.block_num() == chain.get_pause_at_block_num()) + return start_block_result::waiting_for_block; + fc::time_point now = fc::time_point::now(); block_timestamp_type block_time = calculate_pending_block_time(); producer_authority scheduled_producer = chain.head_active_producers(block_time).get_scheduled_producer(block_time); @@ -2998,6 +3020,24 @@ void producer_plugin_impl::produce_block() { _time_tracker.clear(); } +void producer_plugin::process_blocks() { + auto process_incoming_blocks = [this](auto self) -> void { + try { + auto r = on_incoming_block(); + if (r == controller::apply_blocks_result::incomplete) { + app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() { + self(self); + }); + } + } catch (...) {} // errors on applied blocks logged in controller + }; + + app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, + [process_incoming_blocks]() { + process_incoming_blocks(process_incoming_blocks); + }); +} + void producer_plugin::received_block(uint32_t block_num, chain::fork_db_add_t fork_db_add_result) { my->_received_block = block_num; // fork_db_add_t::fork_switch means head block of best fork (different from the current branch) is received.