Skip to content

Commit

Permalink
Merge pull request #1047 from AntelopeIO/GH-1039-interrupt-block-vali…
Browse files Browse the repository at this point in the history
…dation

Interrupt block validation on new best head
  • Loading branch information
heifner authored Dec 5, 2024
2 parents 5be1d8b + 3684035 commit 631fed0
Show file tree
Hide file tree
Showing 21 changed files with 326 additions and 194 deletions.
61 changes: 9 additions & 52 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1870,12 +1870,8 @@ struct controller_impl {
fork_db_reset_root_to_chain_head();
} else if( !except_ptr && !check_shutdown() && !irreversible_mode() ) {
if (auto fork_db_head = fork_db.head()) {
// applies all blocks up to fork_db head from fork_db, shouldn't return incomplete, but if it does loop until complete
ilog("applying ${n} fork database blocks from ${ch} to ${fh}",
ilog("fork database contains ${n} blocks after head from ${ch} to ${fh}",
("n", fork_db_head->block_num() - chain_head.block_num())("ch", chain_head.block_num())("fh", fork_db_head->block_num()));
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
ilog( "reversible blocks replayed to ${bn} : ${id}", ("bn", fork_db_head->block_num())("id", fork_db_head->id()) );
}
}

Expand Down Expand Up @@ -2042,42 +2038,6 @@ struct controller_impl {
// Furthermore, fork_db.root()->block_num() <= lib_num.
// Also, even though blog.head() may still be nullptr, blog.first_block_num() is guaranteed to be lib_num + 1.

auto finish_init = [&](auto& fork_db) {
if( read_mode != db_read_mode::IRREVERSIBLE ) {
auto pending_head = fork_db.head();
if ( pending_head && pending_head->id() != chain_head.id() ) {
// chain_head equal to root means that read_mode was changed from irreversible mode to head/speculative
bool chain_head_is_root = chain_head.id() == fork_db.root()->id();
if (chain_head_is_root) {
ilog( "read_mode has changed from irreversible: applying best branch from fork database" );
}

// See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0`
if (chain_head_is_root || conf.num_configured_p2p_peers > 0) {
ilog("applying branch from fork database ending with block: ${id}", ("id", pending_head->id()));
// applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
}
}
} else {
// It is possible that the node was shutdown with blocks to process in the fork database. For example, if
// it was syncing and had processed blocks into the fork database but not yet applied them. In general,
// it makes sense to process those blocks on startup. However, if the node was shutdown via
// terminate-at-block, the current expectation is that the node can be restarted to examine the state at
// which it was shutdown. For now, we will only process these blocks if there are peers configured. This
// is a bit of a hack for Spring 1.0.0 until we can add a proper pause-at-block (issue #570) which could
// be used to explicitly request a node to not process beyond a specified block.
if (conf.num_configured_p2p_peers > 0) {
ilog("Process blocks out of fork_db if needed");
log_irreversible();
transition_to_savanna_if_needed();
}
}
};

fork_db_.apply<void>(finish_init);

// At Leap startup, we want to provide to our local finalizers the correct safety information
// to use if they don't already have one.
// If we start at a block prior to the IF transition, that information will be provided when
Expand Down Expand Up @@ -4232,11 +4192,11 @@ struct controller_impl {
assert(!verify_qc_future.valid());
}

bool best_head = fork_db.add(bsp, ignore_duplicate_t::yes);
fork_db_add_t add_result = fork_db.add(bsp, ignore_duplicate_t::yes);
if constexpr (is_proper_savanna_block)
vote_processor.notify_new_block(async_aggregation);

return controller::accepted_block_result{best_head, block_handle{std::move(bsp)}};
return controller::accepted_block_result{add_result, block_handle{std::move(bsp)}};
}

// thread safe, expected to be called from thread other than the main thread
Expand Down Expand Up @@ -4440,7 +4400,6 @@ struct controller_impl {

const auto start_apply_blocks_loop = fc::time_point::now();
for( auto ritr = new_head_branch.rbegin(); ritr != new_head_branch.rend(); ++ritr ) {
const auto start_apply_block = fc::time_point::now();
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
Expand All @@ -4465,11 +4424,9 @@ struct controller_impl {
throw;
} catch (const fc::exception& e) {
if (e.code() == interrupt_exception::code_value) {
if (fc::time_point::now() - start_apply_block < fc::milliseconds(2 * config::block_interval_ms)) {
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
throw; // do not want to remove block from fork_db if not interrupting a long, maybe infinite, block
}
ilog("interrupt while applying block, removing block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
// do not want to remove block from fork_db if interrupted
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
throw;
} else {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
Expand Down Expand Up @@ -4538,7 +4495,7 @@ struct controller_impl {
return applied_trxs;
}

void interrupt_transaction() {
void interrupt_apply_block_transaction() {
// Only interrupt transaction if applying a block. Speculative trxs already have a deadline set so they
// have limited run time already. This is to allow killing a long-running transaction in a block being
// validated.
Expand Down Expand Up @@ -5308,8 +5265,8 @@ deque<transaction_metadata_ptr> controller::abort_block() {
return my->abort_block();
}

void controller::interrupt_transaction() {
my->interrupt_transaction();
void controller::interrupt_apply_block_transaction() {
my->interrupt_apply_block_transaction();
}

boost::asio::io_context& controller::get_thread_pool() {
Expand Down
20 changes: 15 additions & 5 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace eosio::chain {

void open_impl( const char* desc, const std::filesystem::path& fork_db_file, fc::cfile_datastream& ds, validator_t& validator );
void close_impl( std::ofstream& out );
bool add_impl( const bsp_t& n, ignore_duplicate_t ignore_duplicate, bool validate, validator_t& validator );
fork_db_add_t add_impl( const bsp_t& n, ignore_duplicate_t ignore_duplicate, bool validate, validator_t& validator );
bool is_valid() const;

bsp_t get_block_impl( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
Expand Down Expand Up @@ -241,8 +241,8 @@ namespace eosio::chain {
}

template <class BSP>
bool fork_database_impl<BSP>::add_impl(const bsp_t& n, ignore_duplicate_t ignore_duplicate,
bool validate, validator_t& validator) {
fork_db_add_t fork_database_impl<BSP>::add_impl(const bsp_t& n, ignore_duplicate_t ignore_duplicate,
bool validate, validator_t& validator) {
EOS_ASSERT( root, fork_database_exception, "root not yet set" );
EOS_ASSERT( n, fork_database_exception, "attempt to add null block state" );

Expand Down Expand Up @@ -278,15 +278,25 @@ namespace eosio::chain {
EOS_RETHROW_EXCEPTIONS( fork_database_exception, "serialized fork database is incompatible with configured protocol features" )
}

auto prev_head = head_impl(include_root_t::yes);

auto inserted = index.insert(n);
EOS_ASSERT(ignore_duplicate == ignore_duplicate_t::yes || inserted.second, fork_database_exception,
"duplicate block added: ${id}", ("id", n->id()));

return inserted.second && n == head_impl(include_root_t::no);
if (!inserted.second)
return fork_db_add_t::duplicate;
const bool new_head = n == head_impl(include_root_t::no);
if (new_head && n->previous() == prev_head->id())
return fork_db_add_t::appended_to_head;
if (new_head)
return fork_db_add_t::fork_switch;

return fork_db_add_t::added;
}

template<class BSP>
bool fork_database_t<BSP>::add( const bsp_t& n, ignore_duplicate_t ignore_duplicate ) {
fork_db_add_t fork_database_t<BSP>::add( const bsp_t& n, ignore_duplicate_t ignore_duplicate ) {
std::lock_guard g( my->mtx );
return my->add_impl(n, ignore_duplicate, false,
[](block_timestamp_type timestamp,
Expand Down
7 changes: 4 additions & 3 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ namespace eosio::chain {
using resource_limits::resource_limits_manager;
using apply_handler = std::function<void(apply_context&)>;

enum class fork_db_add_t;
using forked_callback_t = std::function<void(const transaction_metadata_ptr&)>;

// lookup transaction_metadata via supplied function to avoid re-creation
Expand Down Expand Up @@ -207,8 +208,8 @@ namespace eosio::chain {
*/
deque<transaction_metadata_ptr> abort_block();

/// Expected to be called from signal handler
void interrupt_transaction();
/// Expected to be called from signal handler, or producer_plugin
void interrupt_apply_block_transaction();

/**
*
Expand All @@ -235,7 +236,7 @@ namespace eosio::chain {
void set_async_aggregation(async_t val);

struct accepted_block_result {
const bool is_new_best_head = false; // true if new best head
const fork_db_add_t add_result;
std::optional<block_handle> block; // empty optional if block is unlinkable
};
// thread-safe
Expand Down
16 changes: 14 additions & 2 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ namespace eosio::chain {
using block_branch_t = std::vector<signed_block_ptr>;
enum class ignore_duplicate_t { no, yes };
enum class include_root_t { no, yes };
enum class fork_db_add_t {
failure, // add failed
duplicate, // already added and ignore_duplicate=true
added, // inserted into an existing branch or started a new branch, but not best branch
appended_to_head, // new best head of current best branch; no fork switch
fork_switch // new best head of new branch, fork switch to new branch
};

// Used for logging of comparison values used for best fork determination
std::string log_fork_comparison(const block_state& bs);
Expand Down Expand Up @@ -67,9 +74,11 @@ namespace eosio::chain {
/**
* Add block state to fork database.
* Must link to existing block in fork database or the root.
* @return true if n becomes the new best head (and was not the best head before)
* @returns fork_db_add_t - result of the add
* @throws unlinkable_block_exception - unlinkable to any branch
* @throws fork_database_exception - no root, n is nullptr, protocol feature error, duplicate when ignore_duplicate=false
*/
bool add( const bsp_t& n, ignore_duplicate_t ignore_duplicate );
fork_db_add_t add( const bsp_t& n, ignore_duplicate_t ignore_duplicate );

void remove( const block_id_type& id );

Expand Down Expand Up @@ -306,3 +315,6 @@ namespace eosio::chain {
static constexpr uint32_t max_supported_version = 3;
};
} /// eosio::chain

FC_REFLECT_ENUM( eosio::chain::fork_db_add_t,
(failure)(duplicate)(added)(appended_to_head)(fork_switch) )
1 change: 1 addition & 0 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ namespace eosio::testing {
// producer become inactive
void produce_min_num_of_blocks_to_spend_time_wo_inactive_prod(const fc::microseconds target_elapsed_time = fc::microseconds());
void push_block(const signed_block_ptr& b);
void apply_blocks();

/**
* These transaction IDs represent transactions available in the head chain state as scheduled
Expand Down
13 changes: 11 additions & 2 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ namespace eosio::testing {
case block_signal::accepted_block:
// should get accepted_block signal after accepted_block_header signal
// or after accepted_block (on fork switch, accepted block signaled when block re-applied)
return present && (itr->second == block_signal::accepted_block_header ||
itr->second == block_signal::accepted_block);
// or first thing on restart if applying out of the forkdb
return !present || (present && (itr->second == block_signal::accepted_block_header ||
itr->second == block_signal::accepted_block));

case block_signal::irreversible_block:
// can be signaled on restart as the first thing since other signals happened before shutdown
Expand Down Expand Up @@ -423,13 +424,15 @@ namespace eosio::testing {
open(std::move(pfs), snapshot_chain_id, [&snapshot,&control=this->control]() {
control->startup( [](){}, []() { return false; }, snapshot );
});
apply_blocks();
}

void base_tester::open( protocol_feature_set&& pfs, const genesis_state& genesis, call_startup_t call_startup ) {
if (call_startup == call_startup_t::yes) {
open(std::move(pfs), genesis.compute_chain_id(), [&genesis,&control=this->control]() {
control->startup( [](){}, []() { return false; }, genesis );
});
apply_blocks();
} else {
open(std::move(pfs), genesis.compute_chain_id(), nullptr);
}
Expand All @@ -439,6 +442,7 @@ namespace eosio::testing {
open(std::move(pfs), expected_chain_id, [&control=this->control]() {
control->startup( [](){}, []() { return false; } );
});
apply_blocks();
}

void base_tester::push_block(const signed_block_ptr& b) {
Expand All @@ -460,6 +464,11 @@ namespace eosio::testing {
_check_for_vote_if_needed(*control, bh);
}

void base_tester::apply_blocks() {
while (control->apply_blocks( {}, {} ) == controller::apply_blocks_result::incomplete)
;
}

signed_block_ptr base_tester::_produce_block( fc::microseconds skip_time, bool skip_pending_trxs ) {
auto res = _produce_block( skip_time, skip_pending_trxs, false );
return res.block;
Expand Down
5 changes: 4 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,10 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
void chain_plugin_impl::plugin_startup()
{ try {
try {
auto shutdown = [](){ return app().quit(); };
auto shutdown = []() {
dlog("controller shutdown, quitting...");
return app().quit();
};
auto check_shutdown = [](){ return app().is_quiting(); };
if (snapshot_path)
chain->startup(shutdown, check_shutdown, std::make_shared<threaded_snapshot_reader>(*snapshot_path));
Expand Down
3 changes: 3 additions & 0 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ namespace eosio {
void register_increment_failed_p2p_connections(std::function<void()>&&);
void register_increment_dropped_trxs(std::function<void()>&&);

// for testing
void broadcast_block(const signed_block_ptr& b, const block_id_type& id);

private:
std::shared_ptr<class net_plugin_impl> my;
};
Expand Down
Loading

0 comments on commit 631fed0

Please sign in to comment.