From e11935832de18d3035b8af11b6787d61152b08b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 4 Mar 2024 19:41:17 +0100 Subject: [PATCH 1/9] Return proper channel type from `nano::test::fake_channel` --- nano/test_common/testutil.cpp | 2 +- nano/test_common/testutil.hpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index ce80cfeae0..e7a7b97156 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -212,7 +212,7 @@ std::vector nano::test::blocks_to_hashes (std::vector nano::test::fake_channel (nano::node & node, nano::account node_id) +std::shared_ptr nano::test::fake_channel (nano::node & node, nano::account node_id) { auto channel = std::make_shared (node); if (!node_id.is_zero ()) diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 60821e7ddd..ee0561ce7d 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -364,7 +365,7 @@ namespace test /* * Creates a new fake channel associated with `node` */ - std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); + std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); /* * Start an election on system system_a, node node_a and hash hash_a by reading the block * out of the ledger and adding it to the manual election scheduler queue. From 619258d7c4a1a98ad16753c7d0518b6f97ea77e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 10 Mar 2024 19:57:01 +0100 Subject: [PATCH 2/9] Introduce `fair_queue` class --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/fair_queue.cpp | 275 +++++++++++++++++++++++++++++++ nano/node/CMakeLists.txt | 1 + nano/node/fair_queue.hpp | 296 ++++++++++++++++++++++++++++++++++ 4 files changed, 573 insertions(+) create mode 100644 nano/core_test/fair_queue.cpp create mode 100644 nano/node/fair_queue.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index ba92b42681..b460de3ee6 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( election_scheduler.cpp enums.cpp epochs.cpp + fair_queue.cpp frontiers_confirmation.cpp ipc.cpp ledger.cpp diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp new file mode 100644 index 0000000000..b2fa7970bd --- /dev/null +++ b/nano/core_test/fair_queue.cpp @@ -0,0 +1,275 @@ +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +enum class source_enum +{ + unknown = 0, + live, + bootstrap, + bootstrap_legacy, + unchecked, + local, + forced, +}; +} + +TEST (fair_queue, construction) +{ + nano::fair_queue queue; + ASSERT_EQ (queue.total_size (), 0); + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, process_one) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 1; }; + + queue.push (7, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 1); + ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0); + + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.channel, nullptr); + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, fifo) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 3); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 9); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, process_many) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 1; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::bootstrap }); + queue.push (9, { source_enum::unchecked }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 3); + ASSERT_EQ (queue.size ({ source_enum::live }), 1); + ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1); + ASSERT_EQ (queue.size ({ source_enum::unchecked }), 1); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (std::get (origin.sources), source_enum::bootstrap); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 9); + ASSERT_EQ (std::get (origin.sources), source_enum::unchecked); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, max_queue_size) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 2; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + ASSERT_EQ (queue.total_size (), 2); + ASSERT_EQ (queue.queues_size (), 1); + ASSERT_EQ (queue.size ({ source_enum::live }), 2); + + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 7); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + { + auto [result, origin] = queue.next (); + ASSERT_EQ (result, 8); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, round_robin_with_priority) +{ + nano::fair_queue queue; + queue.priority_query = [] (auto const & origin) { + switch (std::get (origin.sources)) + { + case source_enum::live: + return 1; + case source_enum::bootstrap: + return 2; + case source_enum::unchecked: + return 3; + default: + return 0; + } + }; + queue.max_size_query = [] (auto const &) { return 999; }; + + queue.push (7, { source_enum::live }); + queue.push (8, { source_enum::live }); + queue.push (9, { source_enum::live }); + queue.push (10, { source_enum::bootstrap }); + queue.push (11, { source_enum::bootstrap }); + queue.push (12, { source_enum::bootstrap }); + queue.push (13, { source_enum::unchecked }); + queue.push (14, { source_enum::unchecked }); + queue.push (15, { source_enum::unchecked }); + ASSERT_EQ (queue.total_size (), 9); + + // Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); + ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, source_channel) +{ + nano::test::system system{ 1 }; + + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + auto channel1 = nano::test::fake_channel (system.node (0)); + auto channel2 = nano::test::fake_channel (system.node (0)); + auto channel3 = nano::test::fake_channel (system.node (0)); + + queue.push (6, { source_enum::live, channel1 }); + queue.push (7, { source_enum::live, channel2 }); + queue.push (8, { source_enum::live, channel3 }); + queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries + ASSERT_EQ (queue.total_size (), 4); + ASSERT_EQ (queue.queues_size (), 3); // Each pair is a separate queue + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + + auto all = queue.next_batch (999); + ASSERT_EQ (all.size (), 4); + + auto filtered = [&] (auto const & channel) { + auto r = all | std::views::filter ([&] (auto const & entry) { + return entry.second.channel == channel; + }); + std::vector vec (r.begin (), r.end ()); + return vec; + }; + + auto channel1_results = filtered (channel1); + ASSERT_EQ (channel1_results.size (), 2); + + { + auto [result, origin] = channel1_results[0]; + ASSERT_EQ (result, 6); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.channel, channel1); + } + { + auto [result, origin] = channel1_results[1]; + ASSERT_EQ (result, 9); + ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.channel, channel1); + } + + ASSERT_TRUE (queue.empty ()); +} + +TEST (fair_queue, cleanup) +{ + nano::test::system system{ 1 }; + + nano::fair_queue queue; + queue.priority_query = [] (auto const &) { return 1; }; + queue.max_size_query = [] (auto const &) { return 999; }; + + auto channel1 = nano::test::fake_channel (system.node (0)); + auto channel2 = nano::test::fake_channel (system.node (0)); + auto channel3 = nano::test::fake_channel (system.node (0)); + + queue.push (7, { source_enum::live, channel1 }); + queue.push (8, { source_enum::live, channel2 }); + queue.push (9, { source_enum::live, channel3 }); + ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.queues_size (), 3); + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + + channel1->close (); + channel2->close (); + + ASSERT_TRUE (queue.periodic_update ()); + + // Only channel 3 should remain + ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.queues_size (), 1); + + ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0); + ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 0); + ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); +} diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index f8f3b5407e..e81b62e40a 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -78,6 +78,7 @@ add_library( election_insertion_result.hpp epoch_upgrader.hpp epoch_upgrader.cpp + fair_queue.hpp inactive_cache_information.hpp inactive_cache_information.cpp inactive_cache_status.hpp diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp new file mode 100644 index 0000000000..935fa8dae8 --- /dev/null +++ b/nano/node/fair_queue.hpp @@ -0,0 +1,296 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +template +class fair_queue final +{ +public: + struct source + { + std::tuple sources; + std::shared_ptr channel; + + source (std::tuple sources, std::shared_ptr channel = nullptr) : + sources{ sources }, + channel{ channel } + { + } + + bool alive () const + { + if (channel) + { + return channel->alive (); + } + // Some sources (eg. local RPC) don't have an associated channel, never remove their queue + return true; + } + + auto operator<=> (source const &) const = default; + }; + +private: + struct entry + { + using queue_t = std::deque; + queue_t requests; + + size_t priority; + size_t max_size; + + entry (size_t max_size, size_t priority) : + priority{ priority }, + max_size{ max_size } + { + } + + Request pop () + { + release_assert (!requests.empty ()); + + auto request = std::move (requests.front ()); + requests.pop_front (); + return request; + } + + bool push (Request request) + { + if (requests.size () < max_size) + { + requests.push_back (std::move (request)); + return true; // Added + } + return false; // Dropped + } + + bool empty () const + { + return requests.empty (); + } + + size_t size () const + { + return requests.size (); + } + }; + +public: + using source_type = source; + using value_type = std::pair; + +public: + size_t size (source_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.size (); + } + + size_t max_size (source_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.max_size; + } + + size_t priority (source_type source) const + { + auto it = queues.find (source); + return it == queues.end () ? 0 : it->second.priority; + } + + size_t total_size () const + { + return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { + return total + queue.second.size (); + }); + }; + + bool empty () const + { + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.empty (); + }); + } + + size_t queues_size () const + { + return queues.size (); + } + + void clear () + { + queues.clear (); + } + + /** + * Should be called periodically to clean up stale channels and update queue priorities and max sizes + */ + bool periodic_update (std::chrono::milliseconds interval = std::chrono::milliseconds{ 1000 * 30 }) + { + if (elapsed (last_update, interval)) + { + last_update = std::chrono::steady_clock::now (); + + cleanup (); + update (); + + return true; // Updated + } + return false; // Not updated + } + + /** + * Push a request to the appropriate queue based on the source + * Request will be dropped if the queue is full + * @return true if added, false if dropped + */ + bool push (Request request, source_type source) + { + auto it = queues.find (source); + + // Create a new queue if it doesn't exist + if (it == queues.end ()) + { + // TODO: Right now this is constant and initialized when the queue is created, but it could be made dynamic + auto max_size = max_size_query (source); + auto priority = priority_query (source); + + // It's safe to not invalidate current iterator, since std::map container guarantees that iterators are not invalidated by insert operations + it = queues.emplace (source, entry{ max_size, priority }).first; + } + release_assert (it != queues.end ()); + + auto & queue = it->second; + return queue.push (std::move (request)); // True if added, false if dropped + } + +public: + using max_size_query_t = std::function; + using priority_query_t = std::function; + + max_size_query_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } }; + priority_query_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } }; + +public: + value_type next () + { + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (iterator == queues.end ()) + { + return true; + } + auto & queue = iterator->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `queue.priority` requests to be processed before moving to the next queue + if (counter >= queue.priority) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (iterator != queues.end ()); + + auto & source = iterator->first; + auto & queue = iterator->second; + + ++counter; + return { queue.pop (), source }; + } + + std::deque next_batch (size_t max_count) + { + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; + } + +private: + void seek_next () + { + counter = 0; + do + { + if (iterator != queues.end ()) + { + ++iterator; + } + if (iterator == queues.end ()) + { + iterator = queues.begin (); + } + release_assert (iterator != queues.end ()); + } while (iterator->second.empty ()); + } + + void cleanup () + { + // Invalidate the current iterator + iterator = queues.end (); + + erase_if (queues, [] (auto const & entry) { + return !entry.first.alive (); + }); + } + + void update () + { + for (auto & [source, queue] : queues) + { + queue.max_size = max_size_query (source); + queue.priority = priority_query (source); + } + } + +private: + std::map queues; + std::map::iterator iterator{ queues.end () }; + size_t counter{ 0 }; + + std::chrono::steady_clock::time_point last_update{}; + +public: + std::unique_ptr collect_container_info (std::string const & name) + { + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); + return composite; + } +}; +} From 6602c7c2647ab768833b679a4d84f16d8e835059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 10 Mar 2024 20:00:20 +0100 Subject: [PATCH 3/9] Use per peer fair queue in block processor --- nano/core_test/network.cpp | 7 +- nano/core_test/node.cpp | 5 +- nano/lib/stats_enums.hpp | 2 + nano/node/blockprocessor.cpp | 188 ++++++++++++++-------- nano/node/blockprocessor.hpp | 41 +++-- nano/node/bootstrap/bootstrap_legacy.cpp | 4 +- nano/node/bootstrap_ascending/service.cpp | 2 +- nano/node/fair_queue.hpp | 2 +- nano/node/network.cpp | 11 +- nano/node/nodeconfig.cpp | 13 +- nano/node/nodeconfig.hpp | 2 + 11 files changed, 180 insertions(+), 97 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index bf9ed2e651..6ada05260b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -813,10 +813,9 @@ TEST (network, duplicate_detection) TEST (network, duplicate_revert_publish) { nano::test::system system; - nano::node_flags node_flags; - node_flags.block_processor_full_size = 0; - auto & node (*system.add_node (node_flags)); - ASSERT_TRUE (node.block_processor.full ()); + nano::node_config node_config = system.default_config (); + node_config.block_processor.max_peer_queue = 0; + auto & node (*system.add_node (node_config)); nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis }; std::vector bytes; { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index cf81a51bdb..2886c4a2b7 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -528,6 +528,7 @@ TEST (node, expire) ASSERT_TRUE (node0.expired ()); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_publish) { nano::test::system system (1); @@ -674,6 +675,7 @@ TEST (node, fork_keep) ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ())); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_flip) { nano::test::system system (2); @@ -699,8 +701,7 @@ TEST (node, fork_flip) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); nano::publish publish2{ nano::dev::network_params.network, send2 }; - auto ignored_channel{ std::make_shared (node1, std::weak_ptr ()) }; - + auto ignored_channel = nano::test::fake_channel (node1); node1.network.inbound (publish1, ignored_channel); node2.network.inbound (publish2, ignored_channel); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 6d1ac20b29..648d883132 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -38,6 +38,7 @@ enum class type : uint8_t blockprocessor, blockprocessor_source, blockprocessor_result, + blockprocessor_overfill, bootstrap_server, active, active_started, @@ -80,6 +81,7 @@ enum class detail : uint8_t none, success, unknown, + queue_overflow, // processing queue queue, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 3bb23c8ee2..c38d2ad016 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include @@ -15,7 +15,7 @@ */ nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : - block{ block }, + block{ std::move (block) }, source{ source_a } { debug_assert (source != nano::block_source::unknown); @@ -36,6 +36,7 @@ void nano::block_processor::context::set_result (result_t const & result) */ nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : + config{ node_a.config.block_processor }, node (node_a), write_database_queue (write_database_queue_a), next_log (std::chrono::steady_clock::now ()) @@ -47,6 +48,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas block_processed.notify (result, context); } }); + + queue.max_size_query = [this] (auto const & origin) { + switch (std::get (origin.sources)) + { + case nano::block_source::live: + return config.max_peer_queue; + default: + return config.max_system_queue; + } + }; + + queue.priority_query = [this] (auto const & origin) -> size_t { + switch (std::get (origin.sources)) + { + case nano::block_source::live: + return config.priority_live; + case nano::block_source::bootstrap: + case nano::block_source::bootstrap_legacy: + case nano::block_source::unchecked: + return config.priority_bootstrap; + case nano::block_source::local: + return config.priority_local; + default: + return 1; + } + }; } nano::block_processor::~block_processor () @@ -78,39 +105,44 @@ void nano::block_processor::stop () } } -std::size_t nano::block_processor::size () +// TODO: Remove and replace all checks with calls to size (block_source) +std::size_t nano::block_processor::size () const +{ + nano::unique_lock lock{ mutex }; + return queue.total_size (); +} + +std::size_t nano::block_processor::size (nano::block_source source) const { nano::unique_lock lock{ mutex }; - return blocks.size () + forced.size (); + return queue.size ({ source }); } -bool nano::block_processor::full () +bool nano::block_processor::full () const { return size () >= node.flags.block_processor_full_size; } -bool nano::block_processor::half_full () +bool nano::block_processor::half_full () const { return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block, block_source const source) +bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel) { - if (full ()) - { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); - return; - } if (node.network_params.work.validate_entry (*block)) // true => error { node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); - return; + return false; // Not added } node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + block->hash ().to_string (), + to_string (source), + channel ? channel->to_string () : ""); // TODO: Lazy eval - add_impl (context{ block, source }); + return add_impl (context{ block, source }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -145,11 +177,27 @@ void nano::block_processor::force (std::shared_ptr const & block_a) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + add_impl (context{ block_a, block_source::forced }); +} + +bool nano::block_processor::add_impl (context ctx, std::shared_ptr const & channel) +{ + auto const source = ctx.source; + bool added = false; { - nano::lock_guard lock{ mutex }; - forced.emplace_back (context{ block_a, block_source::forced }); + nano::lock_guard guard{ mutex }; + added = queue.push (std::move (ctx), { source, channel }); } - condition.notify_all (); + if (added) + { + condition.notify_all (); + } + else + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); + node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source)); + } + return added; } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -193,7 +241,7 @@ void nano::block_processor::run () nano::unique_lock lock{ mutex }; while (!stopped) { - if (have_blocks_ready ()) + if (!queue.empty ()) { lock.unlock (); @@ -230,47 +278,16 @@ bool nano::block_processor::should_log () return result; } -bool nano::block_processor::have_blocks_ready () -{ - debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty (); -} - -bool nano::block_processor::have_blocks () -{ - debug_assert (!mutex.try_lock ()); - return have_blocks_ready (); -} - -void nano::block_processor::add_impl (context ctx) -{ - release_assert (ctx.source != nano::block_source::forced); - { - nano::lock_guard guard{ mutex }; - blocks.emplace_back (std::move (ctx)); - } - condition.notify_all (); -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); - debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next - - if (!forced.empty ()) - { - auto entry = std::move (forced.front ()); - release_assert (entry.source == nano::block_source::forced); - forced.pop_front (); - return entry; - } + debug_assert (!queue.empty ()); // This should be checked before calling next - if (!blocks.empty ()) + if (!queue.empty ()) { - auto entry = std::move (blocks.front ()); - release_assert (entry.source != nano::block_source::forced); - blocks.pop_front (); - return entry; + auto [request, origin] = queue.next (); + release_assert (std::get (origin.sources) != nano::block_source::forced || request.source == nano::block_source::forced); + return std::move (request); } release_assert (false, "next() called when no blocks are ready"); @@ -286,19 +303,24 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); + queue.periodic_update (); + timer_l.start (); + // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; - while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) + while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging - if ((blocks.size () + forced.size () > 64) && should_log ()) + if (should_log ()) { - node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); + node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + queue.total_size (), + queue.size ({ nano::block_source::forced })); } auto ctx = next (); @@ -339,6 +361,7 @@ nano::block_status nano::block_processor::process_one (store::write_transaction node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); + node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result }, nano::log::arg{ "source", context.source }, @@ -434,18 +457,12 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr std::unique_ptr nano::block_processor::collect_container_info (std::string const & name) { - std::size_t blocks_count; - std::size_t forced_count; - - { - nano::lock_guard guard{ mutex }; - blocks_count = blocks.size (); - forced_count = forced.size (); - } + nano::lock_guard guard{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocks", queue.total_size (), 0 })); + composite->add_component (std::make_unique (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 })); + composite->add_component (queue.collect_container_info ("queue")); return composite; } @@ -460,3 +477,38 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type) debug_assert (value); return value.value_or (nano::stat::detail{}); } + +/* + * block_processor_config + */ + +nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants) +{ + if (network_constants.is_beta_network ()) + { + // Bump max queue sizes for beta network to allow for more aggressive block propagation for saturation testing + max_peer_queue = 1024; + } +} + +nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("max_peer_queue", max_peer_queue, "Maximum number of blocks to queue from network peers. \ntype:uint64"); + toml.put ("max_system_queue", max_system_queue, "Maximum number of blocks to queue from system components (local RPC, bootstrap). \ntype:uint64"); + toml.put ("priority_live", priority_live, "Priority for live network blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_bootstrap", priority_bootstrap, "Priority for bootstrap blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_local", priority_local, "Priority for local RPC blocks. Higher priority gets processed more frequently. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("max_peer_queue", max_peer_queue); + toml.get ("max_system_queue", max_system_queue); + toml.get ("priority_live", priority_live); + toml.get ("priority_bootstrap", priority_bootstrap); + toml.get ("priority_local", priority_local); + + return toml.get_error (); +} \ No newline at end of file diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 0d020e8b16..a1537092f0 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -23,7 +24,6 @@ class write_transaction; namespace nano { - enum class block_source { unknown = 0, @@ -38,6 +38,26 @@ enum class block_source std::string_view to_string (block_source); nano::stat::detail to_stat_detail (block_source); +class block_processor_config final +{ +public: + explicit block_processor_config (nano::network_constants const &); + + nano::error deserialize (nano::tomlconfig & toml); + nano::error serialize (nano::tomlconfig & toml) const; + +public: + // Maximum number of blocks to queue from network peers + size_t max_peer_queue{ 128 }; + // Maximum number of blocks to queue from system components (local RPC, bootstrap) + size_t max_system_queue{ 16 * 1024 }; + + // Higher priority gets processed more frequently + size_t priority_live{ 1 }; + size_t priority_bootstrap{ 8 }; + size_t priority_local{ 16 }; +}; + /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations @@ -72,15 +92,14 @@ class block_processor final void start (); void stop (); - std::size_t size (); - bool full (); - bool half_full (); - void add (std::shared_ptr const &, block_source = block_source::live); + std::size_t size () const; + std::size_t size (block_source) const; + bool full () const; + bool half_full () const; + bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); - bool have_blocks_ready (); - bool have_blocks (); std::unique_ptr collect_container_info (std::string const & name); @@ -103,21 +122,21 @@ class block_processor final void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); context next (); - void add_impl (context); + bool add_impl (context, std::shared_ptr const & channel = nullptr); private: // Dependencies + block_processor_config const & config; nano::node & node; nano::write_database_queue & write_database_queue; private: - std::deque blocks; - std::deque forced; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; bool stopped{ false }; nano::condition_variable condition; - nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; + mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; }; } diff --git a/nano/node/bootstrap/bootstrap_legacy.cpp b/nano/node/bootstrap/bootstrap_legacy.cpp index 9badee97cc..98d48e9ead 100644 --- a/nano/node/bootstrap/bootstrap_legacy.cpp +++ b/nano/node/bootstrap/bootstrap_legacy.cpp @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run () // TODO: This check / wait is a heuristic and should be improved. auto wait_start = std::chrono::steady_clock::now (); - while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) + while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) { - condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; }); + condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; }); } if (start_account.number () != std::numeric_limits::max ()) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index e75a26c247..83645c7456 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -177,7 +177,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, void nano::bootstrap_ascending::service::wait_blockprocessor () { nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count) + while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count) { condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions } diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 935fa8dae8..a23626f0dc 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -34,7 +34,7 @@ class fair_queue final source (std::tuple sources, std::shared_ptr channel = nullptr) : sources{ sources }, - channel{ channel } + channel{ std::move (channel) } { } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 80660cad00..6c4f649c00 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -253,15 +253,12 @@ class network_message_visitor : public nano::message_visitor } } - void publish (nano::publish const & message_a) override + void publish (nano::publish const & message) override { - if (!node.block_processor.full ()) + bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + if (!added) { - node.process_active (message_a.block); - } - else - { - node.network.publish_filter.clear (message_a.digest); + node.network.publish_filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 2d7e23ab44..0b6faa3fb3 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -33,7 +33,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, websocket_config{ network_params.network }, ipc_config{ network_params.network }, external_address{ boost::asio::ip::address_v6{}.to_string () }, - rep_crawler{ network_params.network } + rep_crawler{ network_params.network }, + block_processor{ network_params.network } { if (peering_port == 0) { @@ -210,6 +211,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const rep_crawler.serialize (rep_crawler_l); toml.put_child ("rep_crawler", rep_crawler_l); + nano::tomlconfig block_processor_l; + block_processor.serialize (block_processor_l); + toml.put_child ("block_processor", block_processor_l); + return toml.get_error (); } @@ -285,6 +290,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) rep_crawler.deserialize (config_l); } + if (toml.has_key ("block_processor")) + { + auto config_l = toml.get_required_child ("block_processor"); + block_processor.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 252f6e07fb..6b9b757452 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -130,6 +131,7 @@ class node_config unsigned backlog_scan_frequency{ 10 }; nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; + nano::block_processor_config block_processor; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; From bf9d18c9b63583a14ba0e6105f5c03fb9533fa4e Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Mon, 11 Mar 2024 10:16:49 +0000 Subject: [PATCH 4/9] Removing unused headers. --- nano/node/fair_queue.hpp | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index a23626f0dc..e9013b44b9 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -1,25 +1,16 @@ #pragma once -#include -#include -#include -#include -#include #include -#include #include -#include - #include #include -#include #include #include -#include -#include +#include +#include +#include #include -#include namespace nano { From 97822348a45e0861da74ef9940313a2e3ef32e48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Mar 2024 15:38:21 +0100 Subject: [PATCH 5/9] Simplify source tuple --- nano/core_test/fair_queue.cpp | 42 +++++++++++++++++------------------ nano/node/blockprocessor.cpp | 6 ++--- nano/node/fair_queue.hpp | 17 ++++++++------ 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp index b2fa7970bd..4078dea080 100644 --- a/nano/core_test/fair_queue.cpp +++ b/nano/core_test/fair_queue.cpp @@ -43,7 +43,7 @@ TEST (fair_queue, process_one) auto [result, origin] = queue.next (); ASSERT_EQ (result, 7); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); ASSERT_EQ (origin.channel, nullptr); ASSERT_TRUE (queue.empty ()); @@ -65,17 +65,17 @@ TEST (fair_queue, fifo) { auto [result, origin] = queue.next (); ASSERT_EQ (result, 7); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } { auto [result, origin] = queue.next (); ASSERT_EQ (result, 8); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } { auto [result, origin] = queue.next (); ASSERT_EQ (result, 9); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } ASSERT_TRUE (queue.empty ()); @@ -99,17 +99,17 @@ TEST (fair_queue, process_many) { auto [result, origin] = queue.next (); ASSERT_EQ (result, 7); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } { auto [result, origin] = queue.next (); ASSERT_EQ (result, 8); - ASSERT_EQ (std::get (origin.sources), source_enum::bootstrap); + ASSERT_EQ (origin.source, source_enum::bootstrap); } { auto [result, origin] = queue.next (); ASSERT_EQ (result, 9); - ASSERT_EQ (std::get (origin.sources), source_enum::unchecked); + ASSERT_EQ (origin.source, source_enum::unchecked); } ASSERT_TRUE (queue.empty ()); @@ -131,12 +131,12 @@ TEST (fair_queue, max_queue_size) { auto [result, origin] = queue.next (); ASSERT_EQ (result, 7); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } { auto [result, origin] = queue.next (); ASSERT_EQ (result, 8); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); } ASSERT_TRUE (queue.empty ()); @@ -146,7 +146,7 @@ TEST (fair_queue, round_robin_with_priority) { nano::fair_queue queue; queue.priority_query = [] (auto const & origin) { - switch (std::get (origin.sources)) + switch (origin.source) { case source_enum::live: return 1; @@ -172,15 +172,15 @@ TEST (fair_queue, round_robin_with_priority) ASSERT_EQ (queue.total_size (), 9); // Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::unchecked); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::bootstrap); - ASSERT_EQ (std::get (queue.next ().second.sources), source_enum::live); + ASSERT_EQ (queue.next ().second.source, source_enum::live); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::unchecked); + ASSERT_EQ (queue.next ().second.source, source_enum::live); + ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap); + ASSERT_EQ (queue.next ().second.source, source_enum::live); ASSERT_TRUE (queue.empty ()); } @@ -225,13 +225,13 @@ TEST (fair_queue, source_channel) { auto [result, origin] = channel1_results[0]; ASSERT_EQ (result, 6); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); ASSERT_EQ (origin.channel, channel1); } { auto [result, origin] = channel1_results[1]; ASSERT_EQ (result, 9); - ASSERT_EQ (std::get (origin.sources), source_enum::live); + ASSERT_EQ (origin.source, source_enum::live); ASSERT_EQ (origin.channel, channel1); } diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index c38d2ad016..109aeba091 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -50,7 +50,7 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas }); queue.max_size_query = [this] (auto const & origin) { - switch (std::get (origin.sources)) + switch (origin.source) { case nano::block_source::live: return config.max_peer_queue; @@ -60,7 +60,7 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas }; queue.priority_query = [this] (auto const & origin) -> size_t { - switch (std::get (origin.sources)) + switch (origin.source) { case nano::block_source::live: return config.priority_live; @@ -286,7 +286,7 @@ auto nano::block_processor::next () -> context if (!queue.empty ()) { auto [request, origin] = queue.next (); - release_assert (std::get (origin.sources) != nano::block_source::forced || request.source == nano::block_source::forced); + release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced); return std::move (request); } diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index e9013b44b9..b0af6e84d8 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -14,17 +14,20 @@ namespace nano { -template +template class fair_queue final { public: - struct source + /** + * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. + */ + struct origin { - std::tuple sources; + Source source; std::shared_ptr channel; - source (std::tuple sources, std::shared_ptr channel = nullptr) : - sources{ sources }, + origin (Source source, std::shared_ptr channel = nullptr) : + source{ source }, channel{ std::move (channel) } { } @@ -39,7 +42,7 @@ class fair_queue final return true; } - auto operator<=> (source const &) const = default; + auto operator<=> (origin const &) const = default; }; private: @@ -88,7 +91,7 @@ class fair_queue final }; public: - using source_type = source; + using source_type = origin; using value_type = std::pair; public: From 8513ba89bc9cc031f3ac602e4aaa2cb58ee7550e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Mar 2024 21:37:54 +0100 Subject: [PATCH 6/9] Config tests --- nano/core_test/toml.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 4e13e7b422..64866d810f 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -117,6 +117,7 @@ TEST (toml, daemon_config_deserialize_defaults) std::stringstream ss; ss << R"toml( [node] + [node.block_processor] [node.diagnostics.txn_tracking] [node.httpcallback] [node.ipc.local] @@ -251,6 +252,12 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size); ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters); + + ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue); + ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue); + ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); + ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); + ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); } TEST (toml, optional_child) @@ -427,6 +434,13 @@ TEST (toml, daemon_config_deserialize_no_defaults) backlog_scan_batch_size = 999 backlog_scan_frequency = 999 + [node.block_processor] + max_peer_queue = 999 + max_system_queue = 999 + priority_live = 999 + priority_bootstrap = 999 + priority_local = 999 + [node.diagnostics.txn_tracking] enable = true ignore_writes_below_block_processor_max_time = false @@ -671,6 +685,12 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size); ASSERT_NE (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters); + + ASSERT_NE (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue); + ASSERT_NE (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue); + ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); + ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); + ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); } /** There should be no required values **/ From 00348b34bb33ea42cc18ea56022fbed6fae26f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 11 Mar 2024 21:48:31 +0100 Subject: [PATCH 7/9] Renamings --- nano/node/fair_queue.hpp | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index b0af6e84d8..e066ec8948 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -91,23 +91,23 @@ class fair_queue final }; public: - using source_type = origin; - using value_type = std::pair; + using origin_type = origin; + using value_type = std::pair; public: - size_t size (source_type source) const + size_t size (origin_type source) const { auto it = queues.find (source); return it == queues.end () ? 0 : it->second.size (); } - size_t max_size (source_type source) const + size_t max_size (origin_type source) const { auto it = queues.find (source); return it == queues.end () ? 0 : it->second.max_size; } - size_t priority (source_type source) const + size_t priority (origin_type source) const { auto it = queues.find (source); return it == queues.end () ? 0 : it->second.priority; @@ -159,14 +159,13 @@ class fair_queue final * Request will be dropped if the queue is full * @return true if added, false if dropped */ - bool push (Request request, source_type source) + bool push (Request request, origin_type source) { auto it = queues.find (source); // Create a new queue if it doesn't exist if (it == queues.end ()) { - // TODO: Right now this is constant and initialized when the queue is created, but it could be made dynamic auto max_size = max_size_query (source); auto priority = priority_query (source); @@ -180,8 +179,8 @@ class fair_queue final } public: - using max_size_query_t = std::function; - using priority_query_t = std::function; + using max_size_query_t = std::function; + using priority_query_t = std::function; max_size_query_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } }; priority_query_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } }; @@ -272,8 +271,8 @@ class fair_queue final } private: - std::map queues; - std::map::iterator iterator{ queues.end () }; + std::map queues; + std::map::iterator iterator{ queues.end () }; size_t counter{ 0 }; std::chrono::steady_clock::time_point last_update{}; From 378752165253648a7a345898e26f806304b171c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:09:28 +0100 Subject: [PATCH 8/9] Remove beta specific config values --- nano/node/blockprocessor.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 109aeba091..b5c5e20e31 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -484,11 +484,6 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type) nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants) { - if (network_constants.is_beta_network ()) - { - // Bump max queue sizes for beta network to allow for more aggressive block propagation for saturation testing - max_peer_queue = 1024; - } } nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const From 676f71525bf87b4685e10904ec29a99dd8a02faa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 3 Apr 2024 22:32:33 +0200 Subject: [PATCH 9/9] Store origin channel as weak_ptr --- nano/core_test/fair_queue.cpp | 3 +- nano/node/fair_queue.hpp | 92 ++++++++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp index 4078dea080..178365b171 100644 --- a/nano/core_test/fair_queue.cpp +++ b/nano/core_test/fair_queue.cpp @@ -260,8 +260,9 @@ TEST (fair_queue, cleanup) ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1); ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1); + // Either closing or resetting the channel should remove it from the queue channel1->close (); - channel2->close (); + channel2.reset (); ASSERT_TRUE (queue.periodic_update ()); diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index e066ec8948..e13ed847c0 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -18,9 +18,6 @@ template class fair_queue final { public: - /** - * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. - */ struct origin { Source source; @@ -28,24 +25,95 @@ class fair_queue final origin (Source source, std::shared_ptr channel = nullptr) : source{ source }, - channel{ std::move (channel) } + channel{ channel } { } + }; - bool alive () const +private: + /** + * Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request. + */ + struct origin_entry + { + Source source; + + // Optional is needed to distinguish between a source with no associated channel and a source with an expired channel + // TODO: Store channel as shared_ptr after networking fixes are done + std::optional> maybe_channel; + + origin_entry (Source source, std::shared_ptr channel = nullptr) : + source{ source } { if (channel) { - return channel->alive (); + maybe_channel = std::weak_ptr{ channel }; + } + } + + origin_entry (origin const & origin) : + origin_entry (origin.source, origin.channel) + { + } + + bool alive () const + { + if (maybe_channel) + { + if (auto channel_l = maybe_channel->lock ()) + { + return channel_l->alive (); + } + else + { + return false; + } + } + else + { + // Some sources (eg. local RPC) don't have an associated channel, never remove their queue + return true; + } + } + + // TODO: Store channel as shared_ptr to avoid this mess + auto operator<=> (origin_entry const & other) const + { + // First compare source + if (auto cmp = source <=> other.source; cmp != 0) + return cmp; + + if (maybe_channel && other.maybe_channel) + { + // Then compare channels by ownership, not by the channel's value or state + std::owner_less> less; + if (less (*maybe_channel, *other.maybe_channel)) + return std::strong_ordering::less; + if (less (*other.maybe_channel, *maybe_channel)) + return std::strong_ordering::greater; + + return std::strong_ordering::equivalent; + } + else + { + if (maybe_channel && !other.maybe_channel) + { + return std::strong_ordering::greater; + } + if (!maybe_channel && other.maybe_channel) + { + return std::strong_ordering::less; + } + return std::strong_ordering::equivalent; } - // Some sources (eg. local RPC) don't have an associated channel, never remove their queue - return true; } - auto operator<=> (origin const &) const = default; + operator origin () const + { + return { source, maybe_channel ? maybe_channel->lock () : nullptr }; + } }; -private: struct entry { using queue_t = std::deque; @@ -271,8 +339,8 @@ class fair_queue final } private: - std::map queues; - std::map::iterator iterator{ queues.end () }; + std::map queues; + std::map::iterator iterator{ queues.end () }; size_t counter{ 0 }; std::chrono::steady_clock::time_point last_update{};