diff --git a/nano/core_test/confirming_set.cpp b/nano/core_test/confirming_set.cpp index 31d28f2f69..59451d96e3 100644 --- a/nano/core_test/confirming_set.cpp +++ b/nano/core_test/confirming_set.cpp @@ -1,9 +1,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -16,44 +18,61 @@ using namespace std::chrono_literals; +namespace +{ +struct confirming_set_context +{ + nano::logger & logger; + nano::stats & stats; + nano::ledger & ledger; + + nano::unchecked_map unchecked; + nano::block_processor block_processor; + nano::confirming_set confirming_set; + + explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) : + logger{ ledger_context.logger () }, + stats{ ledger_context.stats () }, + ledger{ ledger_context.ledger () }, + unchecked{ 0, stats, false }, + block_processor{ node_config, ledger, unchecked, stats, logger }, + confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger } + { + } +}; +} + TEST (confirming_set, construction) { - nano::test::system system; - auto & node = *system.add_node (); - auto ctx = nano::test::ledger_empty (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () }; + auto ledger_ctx = nano::test::ledger_empty (); + confirming_set_context ctx{ ledger_ctx }; } TEST (confirming_set, add_exists) { - nano::test::system system; - auto & node = *system.add_node (); - auto ctx = nano::test::ledger_send_receive (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () }; - auto send = ctx.blocks ()[0]; + auto ledger_ctx = nano::test::ledger_send_receive (); + confirming_set_context ctx{ ledger_ctx }; + nano::confirming_set & confirming_set = ctx.confirming_set; + auto send = ledger_ctx.blocks ()[0]; confirming_set.add (send->hash ()); ASSERT_TRUE (confirming_set.contains (send->hash ())); } TEST (confirming_set, process_one) { - nano::test::system system; - auto & node = *system.add_node (); - auto ctx = nano::test::ledger_send_receive (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () }; + auto ledger_ctx = nano::test::ledger_send_receive (); + confirming_set_context ctx{ ledger_ctx }; + nano::confirming_set & confirming_set = ctx.confirming_set; std::atomic count = 0; std::mutex mutex; std::condition_variable condition; confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); }); - confirming_set.add (ctx.blocks ()[0]->hash ()); + confirming_set.add (ledger_ctx.blocks ()[0]->hash ()); nano::test::start_stop_guard guard{ confirming_set }; std::unique_lock lock{ mutex }; ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 1; })); - ASSERT_EQ (1, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); - ASSERT_EQ (2, ctx.ledger ().cemented_count ()); + ASSERT_EQ (1, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); + ASSERT_EQ (2, ctx.ledger.cemented_count ()); } TEST (confirming_set, process_multiple) diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 0cc4953f0e..24cacfbba6 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -45,6 +45,14 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_ publish (context.block); } }); + + // Stop all rolled back active transactions except initial + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { + if (block->qualified_root () != rollback_root) + { + erase (block->qualified_root ()); + } + }); } nano::active_elections::~active_elections () diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d0e8716494..0836d62b39 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -38,10 +39,13 @@ void nano::block_processor::context::set_result (result_t const & result) * block_processor */ -nano::block_processor::block_processor (nano::node & node_a) : - config{ node_a.config.block_processor }, - node (node_a), - next_log (std::chrono::steady_clock::now ()) +nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ node_config.block_processor }, + network_params{ node_config.network_params }, + ledger{ ledger_a }, + unchecked{ unchecked_a }, + stats{ stats_a }, + logger{ logger_a } { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. @@ -78,6 +82,11 @@ nano::block_processor::block_processor (nano::node & node_a) : return 1; } }; + + // Requeue blocks that could not be immediately processed + unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { + add (info.block, nano::block_source::unchecked); + }); } nano::block_processor::~block_processor () @@ -124,14 +133,14 @@ std::size_t nano::block_processor::size (nano::block_source source) const bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel, std::function callback) { - if (node.network_params.work.validate_entry (*block)) // true => error + if (network_params.work.validate_entry (*block)) // true => error { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return false; // Not added } - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); + logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", block->hash ().to_string (), to_string (source), channel ? channel->to_string () : ""); // TODO: Lazy eval @@ -141,8 +150,8 @@ bool nano::block_processor::add (std::shared_ptr const & block, blo std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); + logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); context ctx{ block, source }; auto future = ctx.get_future (); @@ -155,8 +164,8 @@ std::optional nano::block_processor::add_blocking (std::shar } catch (std::future_error const &) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); - node.logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ()); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); + logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ()); } return std::nullopt; @@ -164,8 +173,8 @@ std::optional nano::block_processor::add_blocking (std::shar void nano::block_processor::force (std::shared_ptr const & block_a) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); - node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); + logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); add_impl (context{ block_a, block_source::forced }); } @@ -184,45 +193,38 @@ bool nano::block_processor::add_impl (context ctx, std::shared_ptrhash () != hash) { // Replace our block with the winner and roll back any dependent blocks - node.logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ()); + logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ()); std::vector> rollback_list; - if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) + if (ledger.rollback (transaction, successor->hash (), rollback_list)) { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); - node.logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ()); + stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); + logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ()); } else { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback); - node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ()); + stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback); + logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ()); } - // Deleting from votes cache, stop active transaction - for (auto & i : rollback_list) + // Notify observers of the rolled back blocks + for (auto const & block : rollback_list) { - rolled_back.notify (i); - - node.history.erase (i->root ()); - // Stop all rolled back active transactions except initial - if (i->hash () != successor->hash ()) - { - node.active.erase (*i); - } + rolled_back.notify (block, fork_block.qualified_root ()); } } } @@ -237,7 +239,7 @@ void nano::block_processor::run () // TODO: Cleaner periodical logging if (should_log ()) { - node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", queue.size (), queue.size ({ nano::block_source::forced })); } @@ -319,7 +321,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock.unlock (); - auto transaction = node.ledger.tx_begin_write (nano::store::writer::blockprocessor); + auto transaction = ledger.tx_begin_write (nano::store::writer::blockprocessor); nano::timer timer; timer.start (); @@ -350,7 +352,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100)) { - node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); + logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); } return processed; @@ -360,12 +362,12 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction { auto block = context.block; auto const hash = block->hash (); - nano::block_status result = node.ledger.process (transaction_a, block); + nano::block_status result = ledger.process (transaction_a, block); - node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); - node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); + stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); + stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); - node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, + logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result }, nano::log::arg{ "source", context.source }, nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) }, @@ -376,40 +378,41 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction { case nano::block_status::progress: { - queue_unchecked (transaction_a, hash); - /* For send blocks check epoch open unchecked (gap pending). - For state blocks check only send subtype and only if block epoch is not last epoch. - If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. */ + unchecked.trigger (hash); + + /* + * For send blocks check epoch open unchecked (gap pending). + * For state blocks check only send subtype and only if block epoch is not last epoch. + * If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. + */ if (block->type () == nano::block_type::send || (block->type () == nano::block_type::state && block->is_send () && std::underlying_type_t (block->sideband ().details.epoch) < std::underlying_type_t (nano::epoch::max))) { - /* block->destination () for legacy send blocks - block->link () for state blocks (send subtype) */ - queue_unchecked (transaction_a, block->destination ()); + unchecked.trigger (block->destination ()); } break; } case nano::block_status::gap_previous: { - node.unchecked.put (block->previous (), block); - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous); + unchecked.put (block->previous (), block); + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous); break; } case nano::block_status::gap_source: { release_assert (block->source_field () || block->link_field ()); - node.unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block); - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); + unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block); + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); break; } case nano::block_status::gap_epoch_open_pending: { - node.unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); + unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); break; } case nano::block_status::old: { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old); + stats.inc (nano::stat::type::ledger, nano::stat::detail::old); break; } case nano::block_status::bad_signature: @@ -426,7 +429,7 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction } case nano::block_status::fork: { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork); + stats.inc (nano::stat::type::ledger, nano::stat::detail::fork); break; } case nano::block_status::opened_burn_account: @@ -453,11 +456,6 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction return result; } -void nano::block_processor::queue_unchecked (secure::write_transaction const & transaction_a, nano::hash_or_account const & hash_or_account_a) -{ - node.unchecked.trigger (hash_or_account_a); -} - nano::container_info nano::block_processor::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 837631b0ba..4fcae4118a 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -78,7 +78,7 @@ class block_processor final }; public: - explicit block_processor (nano::node &); + block_processor (nano::node_config const &, nano::ledger &, nano::unchecked_map &, nano::stats &, nano::logger &); ~block_processor (); void start (); @@ -102,27 +102,32 @@ class block_processor final // The batch observer feeds the processed observer nano::observer_set block_processed; nano::observer_set batch_processed; - nano::observer_set const &> rolled_back; + + // Rolled back blocks + nano::observer_set const &, nano::qualified_root const &> rolled_back; + +private: // Dependencies + block_processor_config const & config; + nano::network_params const & network_params; + nano::ledger & ledger; + nano::unchecked_map & unchecked; + nano::stats & stats; + nano::logger & logger; private: void run (); // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (secure::write_transaction const &, nano::block const & block); nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false); - void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); std::deque next_batch (size_t max_count); context next (); bool add_impl (context, std::shared_ptr const & channel = nullptr); -private: // Dependencies - block_processor_config const & config; - nano::node & node; - private: nano::fair_queue queue; - std::chrono::steady_clock::time_point next_log; + std::chrono::steady_clock::time_point next_log{ std::chrono::steady_clock::now () }; bool stopped{ false }; nano::condition_variable condition; diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 57b2169655..1c290ca8f2 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -32,6 +32,8 @@ class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; class rep_tiers; +class telemetry; +class unchecked_map; class stats; class vote_cache; class vote_generator; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index ad1a2201fe..8202087705 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -54,7 +54,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_ } }); - block_processor.rolled_back.add ([this] (auto const & block) { + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { nano::lock_guard guard{ mutex }; auto erased = local_blocks.get ().erase (block->hash ()); stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index af3cadfc39..db3f98939c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -114,7 +114,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy application_path (application_path_a), port_mapping_impl{ std::make_unique (*this) }, port_mapping{ *port_mapping_impl }, - block_processor (*this), + block_processor_impl{ std::make_unique (config, ledger, unchecked, stats, logger) }, + block_processor{ *block_processor_impl }, confirming_set_impl{ std::make_unique (config.confirming_set, ledger, block_processor, stats, logger) }, confirming_set{ *confirming_set_impl }, active_impl{ std::make_unique (*this, confirming_set, block_processor) }, @@ -162,10 +163,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy process_live_dispatcher.connect (block_processor); - unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { - block_processor.add (info.block, nano::block_source::unchecked); - }); - vote_cache.rep_weight_query = [this] (nano::account const & rep) { return ledger.weight (rep); }; @@ -190,6 +187,11 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy active.recently_confirmed.erase (hash); }); + // Do some cleanup of rolled back blocks + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { + history.erase (block->root ()); + }); + if (!init_error ()) { // Notify election schedulers when AEC frees election slot diff --git a/nano/node/node.hpp b/nano/node/node.hpp index a1a4538ae8..f3b918fb12 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -184,7 +183,8 @@ class node final : public std::enable_shared_from_this nano::node_observers observers; std::unique_ptr port_mapping_impl; nano::port_mapping & port_mapping; - nano::block_processor block_processor; + std::unique_ptr block_processor_impl; + nano::block_processor & block_processor; std::unique_ptr confirming_set_impl; nano::confirming_set & confirming_set; std::unique_ptr active_impl; diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 45b8ed3fcf..809cef33f0 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1120,8 +1120,6 @@ TEST (confirmation_height, many_accounts_send_receive_self) TEST (confirmation_height, many_accounts_send_receive_self_no_elections) { nano::test::system system; - auto & node = *system.nodes[0]; - if (nano::rocksdb_config::using_rocksdb_in_tests ()) { // Don't test this in rocksdb mode @@ -1140,8 +1138,12 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections) nano::block_hash block_hash_being_processed{ 0 }; nano::store::write_queue write_queue; + + nano::node_config node_config; + nano::unchecked_map unchecked{ 0, stats, false }; + nano::block_processor block_processor{ node_config, ledger, unchecked, stats, logger }; nano::confirming_set_config confirming_set_config{}; - nano::confirming_set confirming_set{ confirming_set_config, ledger, node.block_processor, stats, logger }; + nano::confirming_set confirming_set{ confirming_set_config, ledger, block_processor, stats, logger }; auto const num_accounts = 100000;