Skip to content

Commit

Permalink
Simplify dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 20, 2024
1 parent 4cfda02 commit 58f7169
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 101 deletions.
57 changes: 38 additions & 19 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/test_common/ledger_context.hpp>
Expand All @@ -16,44 +18,61 @@

using namespace std::chrono_literals;

namespace
{
struct confirming_set_context
{
nano::logger & logger;
nano::stats & stats;
nano::ledger & ledger;

nano::unchecked_map unchecked;
nano::block_processor block_processor;
nano::confirming_set confirming_set;

explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) :
logger{ ledger_context.logger () },
stats{ ledger_context.stats () },
ledger{ ledger_context.ledger () },
unchecked{ 0, stats, false },
block_processor{ node_config, ledger, unchecked, stats, logger },
confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger }
{
}
};
}

TEST (confirming_set, construction)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_empty ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_empty ();
confirming_set_context ctx{ ledger_ctx };
}

TEST (confirming_set, add_exists)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto send = ctx.blocks ()[0];
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
auto send = ledger_ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.contains (send->hash ()));
}

TEST (confirming_set, process_one)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ledger_ctx.blocks ()[0]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 1; }));
ASSERT_EQ (1, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, ctx.ledger ().cemented_count ());
ASSERT_EQ (1, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, ctx.ledger.cemented_count ());
}

TEST (confirming_set, process_multiple)
Expand Down
8 changes: 8 additions & 0 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
publish (context.block);
}
});

// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
if (block->qualified_root () != rollback_root)
{
erase (block->qualified_root ());
}
});
}

nano::active_elections::~active_elections ()
Expand Down
124 changes: 61 additions & 63 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/node.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/store/component.hpp>
Expand Down Expand Up @@ -38,10 +39,13 @@ void nano::block_processor::context::set_result (result_t const & result)
* block_processor
*/

nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
next_log (std::chrono::steady_clock::now ())
nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ node_config.block_processor },
network_params{ node_config.network_params },
ledger{ ledger_a },
unchecked{ unchecked_a },
stats{ stats_a },
logger{ logger_a }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
Expand Down Expand Up @@ -78,6 +82,11 @@ nano::block_processor::block_processor (nano::node & node_a) :
return 1;
}
};

// Requeue blocks that could not be immediately processed
unchecked.satisfied.add ([this] (nano::unchecked_info const & info) {
add (info.block, nano::block_source::unchecked);
});
}

nano::block_processor::~block_processor ()
Expand Down Expand Up @@ -124,14 +133,14 @@ std::size_t nano::block_processor::size (nano::block_source source) const

bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel, std::function<void (nano::block_status)> callback)
{
if (node.network_params.work.validate_entry (*block)) // true => error
if (network_params.work.validate_entry (*block)) // true => error
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return false; // Not added
}

node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval
Expand All @@ -141,8 +150,8 @@ bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo

std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));

context ctx{ block, source };
auto future = ctx.get_future ();
Expand All @@ -155,17 +164,17 @@ std::optional<nano::block_status> nano::block_processor::add_blocking (std::shar
}
catch (std::future_error const &)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout);
node.logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ());
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout);
logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ());
}

return std::nullopt;
}

void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());

add_impl (context{ block_a, block_source::forced });
}
Expand All @@ -184,45 +193,38 @@ bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transpo
}
else
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
}
return added;
}

void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & block)
void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & fork_block)
{
auto hash = block.hash ();
auto successor_hash = node.ledger.any.block_successor (transaction, block.qualified_root ());
auto successor = successor_hash ? node.ledger.any.block_get (transaction, successor_hash.value ()) : nullptr;
auto const hash = fork_block.hash ();
auto const successor_hash = ledger.any.block_successor (transaction, fork_block.qualified_root ());
auto const successor = successor_hash ? ledger.any.block_get (transaction, successor_hash.value ()) : nullptr;
if (successor != nullptr && successor->hash () != hash)
{
// Replace our block with the winner and roll back any dependent blocks
node.logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());
logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());

std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
if (ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
node.logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ());
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ());
}
else
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
}

// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
// Notify observers of the rolled back blocks
for (auto const & block : rollback_list)
{
rolled_back.notify (i);

node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
rolled_back.notify (block, fork_block.qualified_root ());
}
}
}
Expand All @@ -237,7 +239,7 @@ void nano::block_processor::run ()
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}
Expand Down Expand Up @@ -319,7 +321,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

lock.unlock ();

auto transaction = node.ledger.tx_begin_write (nano::store::writer::blockprocessor);
auto transaction = ledger.tx_begin_write (nano::store::writer::blockprocessor);

nano::timer<std::chrono::milliseconds> timer;
timer.start ();
Expand Down Expand Up @@ -350,7 +352,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100))
{
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}

return processed;
Expand All @@ -360,12 +362,12 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
{
auto block = context.block;
auto const hash = block->hash ();
nano::block_status result = node.ledger.process (transaction_a, block);
nano::block_status result = ledger.process (transaction_a, block);

node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));
stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));

node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
nano::log::arg{ "result", result },
nano::log::arg{ "source", context.source },
nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) },
Expand All @@ -376,40 +378,41 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
{
case nano::block_status::progress:
{
queue_unchecked (transaction_a, hash);
/* For send blocks check epoch open unchecked (gap pending).
For state blocks check only send subtype and only if block epoch is not last epoch.
If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. */
unchecked.trigger (hash);

/*
* For send blocks check epoch open unchecked (gap pending).
* For state blocks check only send subtype and only if block epoch is not last epoch.
* If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account.
*/
if (block->type () == nano::block_type::send || (block->type () == nano::block_type::state && block->is_send () && std::underlying_type_t<nano::epoch> (block->sideband ().details.epoch) < std::underlying_type_t<nano::epoch> (nano::epoch::max)))
{
/* block->destination () for legacy send blocks
block->link () for state blocks (send subtype) */
queue_unchecked (transaction_a, block->destination ());
unchecked.trigger (block->destination ());
}
break;
}
case nano::block_status::gap_previous:
{
node.unchecked.put (block->previous (), block);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous);
unchecked.put (block->previous (), block);
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous);
break;
}
case nano::block_status::gap_source:
{
release_assert (block->source_field () || block->link_field ());
node.unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block);
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
break;
}
case nano::block_status::gap_epoch_open_pending:
{
node.unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
break;
}
case nano::block_status::old:
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
break;
}
case nano::block_status::bad_signature:
Expand All @@ -426,7 +429,7 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
}
case nano::block_status::fork:
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
break;
}
case nano::block_status::opened_burn_account:
Expand All @@ -453,11 +456,6 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
return result;
}

void nano::block_processor::queue_unchecked (secure::write_transaction const & transaction_a, nano::hash_or_account const & hash_or_account_a)
{
node.unchecked.trigger (hash_or_account_a);
}

nano::container_info nano::block_processor::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down
Loading

0 comments on commit 58f7169

Please sign in to comment.