diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index b4111eead3..793c7c2617 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( election_scheduler.cpp enums.cpp epochs.cpp + fair_queue.cpp frontiers_confirmation.cpp ipc.cpp ledger.cpp diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp new file mode 100644 index 0000000000..178365b171 --- /dev/null +++ b/nano/core_test/fair_queue.cpp @@ -0,0 +1,276 @@ +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +enum class source_enum +{ + unknown = 0, + live, + bootstrap, + bootstrap_legacy, + unchecked, + local, + forced, +}; +} + +TEST (fair_queue, construction) +{ + nano::fair_queue queue; + ASSERT_EQ (queue.total_size (), 0); + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, process_one) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 1; }; + + queue.push (7, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 1); + ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0); + + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (origin.source, source_enum::live); + ASSERT_EQ (origin.channel, nullptr); + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, fifo) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 3); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (origin.source, source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (origin.source, source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 9); + ASSERT_EQ (origin.source, source_enum::live); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, process_many) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 1; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::bootstrap }); + queue.push (9, { source_enum::unchecked }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 3); + ASSERT_EQ (queue.size ({ source_enum::live }), 1); + ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1); + ASSERT_EQ (queue.size ({ source_enum::unchecked }), 1); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (origin.source, source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (origin.source, source_enum::bootstrap); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 9); + ASSERT_EQ (origin.source, source_enum::unchecked); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, max_queue_size) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 2; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 2); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 2); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (origin.source, source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (origin.source, source_enum::live); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, round_robin_with_priority) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const & origin) { + switch (origin.source) + { + case source_enum::live: + return 1; + case source_enum::bootstrap: + return 2; + case source_enum::unchecked: + return 3; + default: + return 0; + } + }; + queue.max_size_query = [] (auto const &) { return 999; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + queue.push (10, { source_enum::bootstrap }); + queue.push (11, { source_enum::bootstrap }); + queue.push (12, { source_enum::bootstrap }); + queue.push (13, { source_enum::unchecked }); + queue.push (14, { source_enum::unchecked }); + queue.push (15, { source_enum::unchecked }); + ASSERT_EQ (queue.total_size (), 9); + + // Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source + ASSERT_EQ (queue.next ().second.source, source_enum::live); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::live); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::live); + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, source_channel) +{ + nano::test::system system{ 1 }; + + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + auto channel1 = nano::test::fake_channel (system.node (0)); + auto channel2 = nano::test::fake_channel (system.node (0)); + auto channel3 = nano::test::fake_channel (system.node (0)); + + queue.push (6, { source_enum::live, channel1 }); + queue.push (7, { source_enum::live, channel2 }); + queue.push (8, { source_enum::live, channel3 }); + queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries + ASSERT_EQ (queue.total_size (), 4); + ASSERT_EQ (queue.queues_size (), 3); // Each pair is a separate queue + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + + auto all = queue.next_batch (999); + ASSERT_EQ (all.size (), 4); + + auto filtered = [&] (auto const & channel) { + auto r = all | std::views::filter ([&] (auto const & entry) { + return entry.second.channel == channel; + }); + std::vector vec (r.begin (), r.end ()); + return vec; + }; + + auto channel1_results = filtered (channel1); + ASSERT_EQ (channel1_results.size (), 2); + + { + auto [result, origin] = channel1_results[0]; + ASSERT_EQ (result, 6); + ASSERT_EQ (origin.source, source_enum::live); + ASSERT_EQ (origin.channel, channel1); + } + { + auto [result, origin] = channel1_results[1]; + ASSERT_EQ (result, 9); + ASSERT_EQ (origin.source, source_enum::live); + ASSERT_EQ (origin.channel, channel1); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, cleanup) +{ + nano::test::system system{ 1 }; + + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + auto channel1 = nano::test::fake_channel (system.node (0)); + auto channel2 = nano::test::fake_channel (system.node (0)); + auto channel3 = nano::test::fake_channel (system.node (0)); + + queue.push (7, { source_enum::live, channel1 }); + queue.push (8, { source_enum::live, channel2 }); + queue.push (9, { source_enum::live, channel3 }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 3); + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + + // Either closing or resetting the channel should remove it from the queue + channel1->close (); + channel2.reset (); + + ASSERT_TRUE (queue.periodic_update ()); + + // Only channel 3 should remain + ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.queues_size (), 1); + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 0); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); +} diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index cafb622826..9410d0c011 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -768,10 +768,9 @@ TEST (network, duplicate_detection) TEST (network, duplicate_revert_publish) { nano::test::system system; - nano::node_flags node_flags; - node_flags.block_processor_full_size = 0; - auto & node (*system.add_node (node_flags)); - ASSERT_TRUE (node.block_processor.full ()); + nano::node_config node_config = system.default_config (); + node_config.block_processor.max_peer_queue = 0; + auto & node (*system.add_node (node_config)); nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis }; std::vector bytes; { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 2aa0526ab8..ba48a347cb 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -525,6 +525,7 @@ TEST (node, expire) ASSERT_TRUE (node0.expired ()); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_publish) { nano::test::system system (1); @@ -671,6 +672,7 @@ TEST (node, fork_keep) ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ())); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_flip) { nano::test::system system (2); @@ -696,8 +698,7 @@ TEST (node, fork_flip) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); nano::publish publish2{ nano::dev::network_params.network, send2 }; - auto ignored_channel{ std::make_shared (node1, std::weak_ptr ()) }; - + auto ignored_channel = nano::test::fake_channel (node1); node1.network.inbound (publish1, ignored_channel); node2.network.inbound (publish2, ignored_channel); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index ae6f432a7e..7de673fbd8 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -117,6 +117,7 @@ TEST (toml, daemon_config_deserialize_defaults) std::stringstream ss; ss << R"toml( [node] + [node.block_processor] [node.diagnostics.txn_tracking] [node.httpcallback] [node.ipc.local] @@ -254,6 +255,12 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size); ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters); + + ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue); + ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue); + ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); + ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); + ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); } TEST (toml, optional_child) @@ -432,6 +439,13 @@ TEST (toml, daemon_config_deserialize_no_defaults) backlog_scan_batch_size = 999 backlog_scan_frequency = 999 + [node.block_processor] + max_peer_queue = 999 + max_system_queue = 999 + priority_live = 999 + priority_bootstrap = 999 + priority_local = 999 + [node.diagnostics.txn_tracking] enable = true ignore_writes_below_block_processor_max_time = false @@ -680,6 +694,12 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size); ASSERT_NE (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters); + + ASSERT_NE (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue); + ASSERT_NE (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue); + ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); + ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); + ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); } /** There should be no required values **/ diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 01bf4b62d1..709902d59e 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -40,6 +40,7 @@ enum class type : uint8_t blockprocessor, blockprocessor_source, blockprocessor_result, + blockprocessor_overfill, bootstrap_server, active, active_started, @@ -85,6 +86,7 @@ enum class detail : uint8_t success, unknown, cache, + queue_overflow, // processing queue queue, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 7600ef9d18..c50f4393bf 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -76,6 +76,7 @@ add_library( election_status.hpp epoch_upgrader.hpp epoch_upgrader.cpp + fair_queue.hpp ipc/action_handler.hpp ipc/action_handler.cpp ipc/flatbuffers_handler.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 5e1e4b8cda..8443493557 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include @@ -17,7 +17,7 @@ */ nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : - block{ block }, + block{ std::move (block) }, source{ source_a } { debug_assert (source != nano::block_source::unknown); @@ -38,6 +38,7 @@ void nano::block_processor::context::set_result (result_t const & result) */ nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : + config{ node_a.config.block_processor }, node (node_a), write_database_queue (write_database_queue_a), next_log (std::chrono::steady_clock::now ()) @@ -49,6 +50,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas block_processed.notify (result, context); } }); + + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::block_source::live: + return config.max_peer_queue; + default: + return config.max_system_queue; + } + }; + + queue.priority_query = [this] (auto const & origin) -> size_t { + switch (origin.source) + { + case nano::block_source::live: + return config.priority_live; + case nano::block_source::bootstrap: + case nano::block_source::bootstrap_legacy: + case nano::block_source::unchecked: + return config.priority_bootstrap; + case nano::block_source::local: + return config.priority_local; + default: + return 1; + } + }; } nano::block_processor::~block_processor () @@ -80,39 +107,44 @@ void nano::block_processor::stop () } } -std::size_t nano::block_processor::size () +// TODO: Remove and replace all checks with calls to size (block_source) +std::size_t nano::block_processor::size () const +{ + nano::unique_lock lock{ mutex }; + return queue.total_size (); +} + +std::size_t nano::block_processor::size (nano::block_source source) const { nano::unique_lock lock{ mutex }; - return blocks.size () + forced.size (); + return queue.size ({ source }); } -bool nano::block_processor::full () +bool nano::block_processor::full () const { return size () >= node.flags.block_processor_full_size; } -bool nano::block_processor::half_full () +bool nano::block_processor::half_full () const { return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block, block_source const source) +bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel) { - if (full ()) - { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); - return; - } if (node.network_params.work.validate_entry (*block)) // true => error { node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); - return; + return false; // Not added } node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + block->hash ().to_string (), + to_string (source), + channel ? channel->to_string () : ""); // TODO: Lazy eval - add_impl (context{ block, source }); + return add_impl (context{ block, source }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -147,11 +179,27 @@ void nano::block_processor::force (std::shared_ptr const & block_a) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + add_impl (context{ block_a, block_source::forced }); +} + +bool nano::block_processor::add_impl (context ctx, std::shared_ptr const & channel) +{ + auto const source = ctx.source; + bool added = false; { - nano::lock_guard lock{ mutex }; - forced.emplace_back (context{ block_a, block_source::forced }); + nano::lock_guard guard{ mutex }; + added = queue.push (std::move (ctx), { source, channel }); } - condition.notify_all (); + if (added) + { + condition.notify_all (); + } + else + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); + node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source)); + } + return added; } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -196,7 +244,7 @@ void nano::block_processor::run () nano::unique_lock lock{ mutex }; while (!stopped) { - if (have_blocks_ready ()) + if (!queue.empty ()) { lock.unlock (); @@ -233,47 +281,16 @@ bool nano::block_processor::should_log () return result; } -bool nano::block_processor::have_blocks_ready () -{ - debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty (); -} - -bool nano::block_processor::have_blocks () -{ - debug_assert (!mutex.try_lock ()); - return have_blocks_ready (); -} - -void nano::block_processor::add_impl (context ctx) -{ - release_assert (ctx.source != nano::block_source::forced); - { - nano::lock_guard guard{ mutex }; - blocks.emplace_back (std::move (ctx)); - } - condition.notify_all (); -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); - debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next + debug_assert (!queue.empty ()); // This should be checked before calling next - if (!forced.empty ()) + if (!queue.empty ()) { - auto entry = std::move (forced.front ()); - release_assert (entry.source == nano::block_source::forced); - forced.pop_front (); - return entry; - } - - if (!blocks.empty ()) - { - auto entry = std::move (blocks.front ()); - release_assert (entry.source != nano::block_source::forced); - blocks.pop_front (); - return entry; + auto [request, origin] = queue.next (); + release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced); + return std::move (request); } release_assert (false, "next() called when no blocks are ready"); @@ -289,19 +306,24 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); + queue.periodic_update (); + timer_l.start (); + // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; - while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) + while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging - if ((blocks.size () + forced.size () > 64) && should_log ()) + if (should_log ()) { - node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); + node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + queue.total_size (), + queue.size ({ nano::block_source::forced })); } auto ctx = next (); @@ -342,6 +364,7 @@ nano::block_status nano::block_processor::process_one (store::write_transaction node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); + node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result }, nano::log::arg{ "source", context.source }, @@ -437,18 +460,12 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr std::unique_ptr nano::block_processor::collect_container_info (std::string const & name) { - std::size_t blocks_count; - std::size_t forced_count; - - { - nano::lock_guard guard{ mutex }; - blocks_count = blocks.size (); - forced_count = forced.size (); - } + nano::lock_guard guard{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocks", queue.total_size (), 0 })); + composite->add_component (std::make_unique (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 })); + composite->add_component (queue.collect_container_info ("queue")); return composite; } @@ -463,3 +480,33 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type) debug_assert (value); return value.value_or (nano::stat::detail{}); } + +/* + * block_processor_config + */ + +nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants) +{ +} + +nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("max_peer_queue", max_peer_queue, "Maximum number of blocks to queue from network peers. \ntype:uint64"); + toml.put ("max_system_queue", max_system_queue, "Maximum number of blocks to queue from system components (local RPC, bootstrap). \ntype:uint64"); + toml.put ("priority_live", priority_live, "Priority for live network blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_bootstrap", priority_bootstrap, "Priority for bootstrap blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_local", priority_local, "Priority for local RPC blocks. Higher priority gets processed more frequently. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("max_peer_queue", max_peer_queue); + toml.get ("max_system_queue", max_system_queue); + toml.get ("priority_live", priority_live); + toml.get ("priority_bootstrap", priority_bootstrap); + toml.get ("priority_local", priority_local); + + return toml.get_error (); +} \ No newline at end of file diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 0d020e8b16..a1537092f0 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -23,7 +24,6 @@ class write_transaction; namespace nano { - enum class block_source { unknown = 0, @@ -38,6 +38,26 @@ enum class block_source std::string_view to_string (block_source); nano::stat::detail to_stat_detail (block_source); +class block_processor_config final +{ +public: + explicit block_processor_config (nano::network_constants const &); + + nano::error deserialize (nano::tomlconfig & toml); + nano::error serialize (nano::tomlconfig & toml) const; + +public: + // Maximum number of blocks to queue from network peers + size_t max_peer_queue{ 128 }; + // Maximum number of blocks to queue from system components (local RPC, bootstrap) + size_t max_system_queue{ 16 * 1024 }; + + // Higher priority gets processed more frequently + size_t priority_live{ 1 }; + size_t priority_bootstrap{ 8 }; + size_t priority_local{ 16 }; +}; + /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations @@ -72,15 +92,14 @@ class block_processor final void start (); void stop (); - std::size_t size (); - bool full (); - bool half_full (); - void add (std::shared_ptr const &, block_source = block_source::live); + std::size_t size () const; + std::size_t size (block_source) const; + bool full () const; + bool half_full () const; + bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); - bool have_blocks_ready (); - bool have_blocks (); std::unique_ptr collect_container_info (std::string const & name); @@ -103,21 +122,21 @@ class block_processor final void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); context next (); - void add_impl (context); + bool add_impl (context, std::shared_ptr const & channel = nullptr); private: // Dependencies + block_processor_config const & config; nano::node & node; nano::write_database_queue & write_database_queue; private: - std::deque blocks; - std::deque forced; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; bool stopped{ false }; nano::condition_variable condition; - nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; + mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; }; } diff --git a/nano/node/bootstrap/bootstrap_legacy.cpp b/nano/node/bootstrap/bootstrap_legacy.cpp index 9badee97cc..98d48e9ead 100644 --- a/nano/node/bootstrap/bootstrap_legacy.cpp +++ b/nano/node/bootstrap/bootstrap_legacy.cpp @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run () // TODO: This check / wait is a heuristic and should be improved. auto wait_start = std::chrono::steady_clock::now (); - while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) + while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) { - condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; }); + condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; }); } if (start_account.number () != std::numeric_limits::max ()) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 3dd31bfd13..66510ba272 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -177,7 +177,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, void nano::bootstrap_ascending::service::wait_blockprocessor () { nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count) + while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count) { condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions } diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp new file mode 100644 index 0000000000..e13ed847c0 --- /dev/null +++ b/nano/node/fair_queue.hpp @@ -0,0 +1,357 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +template +class fair_queue final +{ +public: + struct origin + { + Source source; + std::shared_ptr channel; + + origin (Source source, std::shared_ptr channel = nullptr) : + source{ source }, + channel{ channel } + { + } + }; + +private: + /** + * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. + */ + struct origin_entry + { + Source source; + + // Optional is needed to distinguish between a source with no associated channel and a source with an expired channel + // TODO: Store channel as shared_ptr after networking fixes are done + std::optional> maybe_channel; + + origin_entry (Source source, std::shared_ptr channel = nullptr) : + source{ source } + { + if (channel) + { + maybe_channel = std::weak_ptr{ channel }; + } + } + + origin_entry (origin const & origin) : + origin_entry (origin.source, origin.channel) + { + } + + bool alive () const + { + if (maybe_channel) + { + if (auto channel_l = maybe_channel->lock ()) + { + return channel_l->alive (); + } + else + { + return false; + } + } + else + { + // Some sources (eg. local RPC) don't have an associated channel, never remove their queue + return true; + } + } + + // TODO: Store channel as shared_ptr to avoid this mess + auto operator<=> (origin_entry const & other) const + { + // First compare source + if (auto cmp = source <=> other.source; cmp != 0) + return cmp; + + if (maybe_channel && other.maybe_channel) + { + // Then compare channels by ownership, not by the channel's value or state + std::owner_less> less; + if (less (*maybe_channel, *other.maybe_channel)) + return std::strong_ordering::less; + if (less (*other.maybe_channel, *maybe_channel)) + return std::strong_ordering::greater; + + return std::strong_ordering::equivalent; + } + else + { + if (maybe_channel && !other.maybe_channel) + { + return std::strong_ordering::greater; + } + if (!maybe_channel && other.maybe_channel) + { + return std::strong_ordering::less; + } + return std::strong_ordering::equivalent; + } + } + + operator origin () const + { + return { source, maybe_channel ? maybe_channel->lock () : nullptr }; + } + }; + + struct entry + { + using queue_t = std::deque; + queue_t requests; + + size_t priority; + size_t max_size; + + entry (size_t max_size, size_t priority) : + priority{ priority }, + max_size{ max_size } + { + } + + Request pop () + { + release_assert (!requests.empty ()); + + auto request = std::move (requests.front ()); + requests.pop_front (); + return request; + } + + bool push (Request request) + { + if (requests.size () < max_size) + { + requests.push_back (std::move (request)); + return true; // Added + } + return false; // Dropped + } + + bool empty () const + { + return requests.empty (); + } + + size_t size () const + { + return requests.size (); + } + }; + +public: + using origin_type = origin; + using value_type = std::pair; + +public: + size_t size (origin_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.size (); + } + + size_t max_size (origin_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.max_size; + } + + size_t priority (origin_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.priority; + } + + size_t total_size () const + { + return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { + return total + queue.second.size (); + }); + }; + + bool empty () const + { + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.empty (); + }); + } + + size_t queues_size () const + { + return queues.size (); + } + + void clear () + { + queues.clear (); + } + + /** + * Should be called periodically to clean up stale channels and update queue priorities and max sizes + */ + bool periodic_update (std::chrono::milliseconds interval = std::chrono::milliseconds{ 1000 * 30 }) + { + if (elapsed (last_update, interval)) + { + last_update = std::chrono::steady_clock::now (); + + cleanup (); + update (); + + return true; // Updated + } + return false; // Not updated + } + + /** + * Push a request to the appropriate queue based on the source + * Request will be dropped if the queue is full + * @return true if added, false if dropped + */ + bool push (Request request, origin_type source) + { + auto it = queues.find (source); + + // Create a new queue if it doesn't exist + if (it == queues.end ()) + { + auto max_size = max_size_query (source); + auto priority = priority_query (source); + + // It's safe to not invalidate current iterator, since std::map container guarantees that iterators are not invalidated by insert operations + it = queues.emplace (source, entry{ max_size, priority }).first; + } + release_assert (it != queues.end ()); + + auto & queue = it->second; + return queue.push (std::move (request)); // True if added, false if dropped + } + +public: + using max_size_query_t = std::function; + using priority_query_t = std::function; + + max_size_query_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } }; + priority_query_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } }; + +public: + value_type next () + { + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (iterator == queues.end ()) + { + return true; + } + auto & queue = iterator->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `queue.priority` requests to be processed before moving to the next queue + if (counter >= queue.priority) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (iterator != queues.end ()); + + auto & source = iterator->first; + auto & queue = iterator->second; + + ++counter; + return { queue.pop (), source }; + } + + std::deque next_batch (size_t max_count) + { + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; + } + +private: + void seek_next () + { + counter = 0; + do + { + if (iterator != queues.end ()) + { + ++iterator; + } + if (iterator == queues.end ()) + { + iterator = queues.begin (); + } + release_assert (iterator != queues.end ()); + } while (iterator->second.empty ()); + } + + void cleanup () + { + // Invalidate the current iterator + iterator = queues.end (); + + erase_if (queues, [] (auto const & entry) { + return !entry.first.alive (); + }); + } + + void update () + { + for (auto & [source, queue] : queues) + { + queue.max_size = max_size_query (source); + queue.priority = priority_query (source); + } + } + +private: + std::map queues; + std::map::iterator iterator{ queues.end () }; + size_t counter{ 0 }; + + std::chrono::steady_clock::time_point last_update{}; + +public: + std::unique_ptr collect_container_info (std::string const & name) + { + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); + return composite; + } +}; +} diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 95381bcad8..f7f373b10d 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -336,15 +336,12 @@ class network_message_visitor : public nano::message_visitor } } - void publish (nano::publish const & message_a) override + void publish (nano::publish const & message) override { - if (!node.block_processor.full ()) + bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + if (!added) { - node.process_active (message_a.block); - } - else - { - node.network.publish_filter.clear (message_a.digest); + node.network.publish_filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index ad597389b8..cfa5fcbf66 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -33,7 +33,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, websocket_config{ network_params.network }, ipc_config{ network_params.network }, external_address{ boost::asio::ip::address_v6{}.to_string () }, - rep_crawler{ network_params.network } + rep_crawler{ network_params.network }, + block_processor{ network_params.network } { if (peering_port == 0) { @@ -212,6 +213,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const rep_crawler.serialize (rep_crawler_l); toml.put_child ("rep_crawler", rep_crawler_l); + nano::tomlconfig block_processor_l; + block_processor.serialize (block_processor_l); + toml.put_child ("block_processor", block_processor_l); + return toml.get_error (); } @@ -287,6 +292,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) rep_crawler.deserialize (config_l); } + if (toml.has_key ("block_processor")) + { + auto config_l = toml.get_required_child ("block_processor"); + block_processor.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 2b8f453577..b8c953b270 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -136,6 +137,7 @@ class node_config unsigned backlog_scan_frequency{ 10 }; nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; + nano::block_processor_config block_processor; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index 75f26546f4..98fdc98b86 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -214,7 +214,7 @@ std::vector nano::test::blocks_to_hashes (std::vector nano::test::fake_channel (nano::node & node, nano::account node_id) +std::shared_ptr nano::test::fake_channel (nano::node & node, nano::account node_id) { auto channel = std::make_shared (node); if (!node_id.is_zero ()) diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index a2706dc3f8..e55f53535b 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -382,7 +383,7 @@ namespace test /* * Creates a new fake channel associated with `node` */ - std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); + std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); /* * Start an election on system system_a, node node_a and hash hash_a by reading the block * out of the ledger and adding it to the manual election scheduler queue.