Skip to content

Commit

Permalink
Merge pull request #4535 from clemahieu/move_write_database_queue
Browse files Browse the repository at this point in the history
Move write database queue
  • Loading branch information
clemahieu authored Apr 4, 2024
2 parents ec6207f + f4a04ea commit 85c3238
Show file tree
Hide file tree
Showing 22 changed files with 71 additions and 77 deletions.
22 changes: 9 additions & 13 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
}

TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
Expand All @@ -35,8 +33,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand All @@ -52,8 +49,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
nano::confirming_set confirming_set (ctx.ledger ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand Down Expand Up @@ -118,7 +114,7 @@ TEST (confirmation_callback, confirmed_history)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
node_flags.disable_ascending_bootstrap = true;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
Expand Down Expand Up @@ -155,7 +151,7 @@ TEST (confirmation_callback, confirmed_history)
ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ()));
{
// The write guard prevents the confirmation height processor doing any writes
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
auto write_guard = node->store.write_queue.wait (nano::store::writer::testing);

// Confirm send1
election->force_confirm ();
Expand All @@ -166,13 +162,13 @@ TEST (confirmation_callback, confirmed_history)
auto transaction = node->store.tx_begin_read ();
ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ()));

ASSERT_TIMELY (10s, node->write_database_queue.contains (nano::writer::confirmation_height));
ASSERT_TIMELY (10s, node->store.write_queue.contains (nano::store::writer::confirmation_height));

// Confirm that no inactive callbacks have been called when the confirmation height processor has already iterated over it, waiting to write
ASSERT_EQ (0, node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::inactive_conf_height, nano::stat::dir::out));
}

ASSERT_TIMELY (10s, !node->write_database_queue.contains (nano::writer::confirmation_height));
ASSERT_TIMELY (10s, !node->store.write_queue.contains (nano::store::writer::confirmation_height));

auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ()));
Expand All @@ -196,7 +192,7 @@ TEST (confirmation_callback, dependent_election)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config, node_flags);
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/ledger_confirm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ TEST (ledger_confirm, pruned_source)
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
ledger.pruning = true;
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1, key2;
nano::block_builder builder;
Expand Down Expand Up @@ -868,7 +868,7 @@ TEST (ledger_confirmDeathTest, rollback_added_block)
ASSERT_TRUE (!store->init_error ());
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1;
nano::block_builder builder;
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ TEST (node, block_processor_full)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
node_flags.block_processor_full_size = 3;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
Expand Down Expand Up @@ -2709,7 +2709,7 @@ TEST (node, block_processor_half_full)
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 6;
node_flags.force_use_write_database_queue = true;
node_flags.force_use_write_queue = true;
auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags);
nano::state_block_builder builder;
auto send1 = builder.make_block ()
Expand Down Expand Up @@ -2740,7 +2740,7 @@ TEST (node, block_processor_half_full)
.work (*node.work_generate_blocking (send2->hash ()))
.build ();
// The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::testing);
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);
node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2);
Expand Down Expand Up @@ -3097,7 +3097,7 @@ TEST (node, rollback_vote_self)

{
// The write guard prevents the block processor from performing the rollback
auto write_guard = node.write_database_queue.wait (nano::writer::testing);
auto write_guard = node.store.write_queue.wait (nano::store::writer::testing);

ASSERT_EQ (0, election->votes_with_weight ().size ());
// Vote with key to switch the winner
Expand Down
2 changes: 0 additions & 2 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ add_library(
websocketconfig.cpp
websocket_stream.hpp
websocket_stream.cpp
write_database_queue.hpp
write_database_queue.cpp
messages.hpp
messages.cpp
xorshift.hpp)
Expand Down
7 changes: 3 additions & 4 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ void nano::block_processor::context::set_result (result_t const & result)
* block_processor
*/

nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
write_database_queue (write_database_queue_a),
next_log (std::chrono::steady_clock::now ())
{
batch_processed.add ([this] (auto const & items) {
Expand Down Expand Up @@ -300,7 +299,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
processed_batch_t processed;

auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }));
nano::timer<std::chrono::milliseconds> timer_l;

Expand Down Expand Up @@ -509,4 +508,4 @@ nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("priority_local", priority_local);

return toml.get_error ();
}
}
4 changes: 1 addition & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace nano
{
class block;
class node;
class write_database_queue;
}

namespace nano::store
Expand Down Expand Up @@ -86,7 +85,7 @@ class block_processor final
};

public:
block_processor (nano::node &, nano::write_database_queue &);
block_processor (nano::node &);
~block_processor ();

void start ();
Expand Down Expand Up @@ -127,7 +126,6 @@ class block_processor final
private: // Dependencies
block_processor_config const & config;
nano::node & node;
nano::write_database_queue & write_database_queue;

private:
nano::fair_queue<context, block_source> queue;
Expand Down
7 changes: 3 additions & 4 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>

nano::confirming_set::confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time) :
nano::confirming_set::confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time) :
ledger{ ledger },
write_queue{ write_queue },
batch_time{ batch_time }
{
}
Expand Down Expand Up @@ -72,7 +71,7 @@ void nano::confirming_set::run ()
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{
lock.unlock (); // Waiting for db write is potentially slow
auto guard = write_queue.wait (nano::writer::confirmation_height);
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.store.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time
Expand Down
4 changes: 1 addition & 3 deletions nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace nano
{
class block;
class ledger;
class write_database_queue;
}

namespace nano
Expand All @@ -27,7 +26,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;

public:
confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
~confirming_set ();
// Adds a block to the set of blocks to be confirmed
void add (nano::block_hash const & hash);
Expand All @@ -45,7 +44,6 @@ class confirming_set final
private:
void run ();
nano::ledger & ledger;
nano::write_database_queue & write_queue;
std::chrono::milliseconds batch_time;
std::unordered_set<nano::block_hash> set;
std::unordered_set<nano::block_hash> processing;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/make_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#include <nano/store/lmdb/lmdb.hpp>
#include <nano/store/rocksdb/rocksdb.hpp>

std::unique_ptr<nano::store::component> nano::make_store (nano::logger & logger, std::filesystem::path const & path, nano::ledger_constants & constants, bool read_only, bool add_db_postfix, nano::rocksdb_config const & rocksdb_config, nano::txn_tracking_config const & txn_tracking_config_a, std::chrono::milliseconds block_processor_batch_max_time_a, nano::lmdb_config const & lmdb_config_a, bool backup_before_upgrade)
std::unique_ptr<nano::store::component> nano::make_store (nano::logger & logger, std::filesystem::path const & path, nano::ledger_constants & constants, bool read_only, bool add_db_postfix, nano::rocksdb_config const & rocksdb_config, nano::txn_tracking_config const & txn_tracking_config_a, std::chrono::milliseconds block_processor_batch_max_time_a, nano::lmdb_config const & lmdb_config_a, bool backup_before_upgrade, bool force_use_write_queue)
{
if (rocksdb_config.enable)
{
return std::make_unique<nano::store::rocksdb::component> (logger, add_db_postfix ? path / "rocksdb" : path, constants, rocksdb_config, read_only);
return std::make_unique<nano::store::rocksdb::component> (logger, add_db_postfix ? path / "rocksdb" : path, constants, rocksdb_config, read_only, force_use_write_queue);
}

return std::make_unique<nano::store::lmdb::component> (logger, add_db_postfix ? path / "data.ldb" : path, constants, txn_tracking_config_a, block_processor_batch_max_time_a, lmdb_config_a, backup_before_upgrade);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/make_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ class component;

namespace nano
{
std::unique_ptr<nano::store::component> make_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false);
std::unique_ptr<nano::store::component> make_store (nano::logger &, std::filesystem::path const & path, nano::ledger_constants & constants, bool open_read_only = false, bool add_db_postfix = true, nano::rocksdb_config const & rocksdb_config = nano::rocksdb_config{}, nano::txn_tracking_config const & txn_tracking_config_a = nano::txn_tracking_config{}, std::chrono::milliseconds block_processor_batch_max_time_a = std::chrono::milliseconds (5000), nano::lmdb_config const & lmdb_config_a = nano::lmdb_config{}, bool backup_before_upgrade = false, bool force_use_write_queue = false);
}
9 changes: 4 additions & 5 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
io_ctx_shared{ io_ctx_a },
io_ctx{ *io_ctx_shared },
node_id{ load_or_create_node_id (application_path_a) },
write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable)),
node_initialized_latch (1),
config (config_a),
network_params{ config.network_params },
Expand All @@ -146,7 +145,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
flags (flags_a),
work (work_a),
distributed_work (*this),
store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade)),
store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade, flags.force_use_write_queue)),
store (*store_impl),
unchecked{ config.max_unchecked_blocks, stats, flags.disable_block_processor_unchecked_deletion },
wallets_store_impl (std::make_unique<nano::mdb_wallets_store> (application_path_a / "wallets.ldb", config_a.lmdb_config)),
Expand All @@ -171,8 +170,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
tcp_listener{ std::make_shared<nano::transport::tcp_listener> (network.port, *this, config.tcp_incoming_connections_max) },
application_path (application_path_a),
port_mapping (*this),
block_processor (*this, write_database_queue),
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, write_database_queue, config.confirming_set_batch_time) },
block_processor (*this),
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, config.confirming_set_batch_time) },
confirming_set{ *confirming_set_impl },
active_impl{ std::make_unique<nano::active_transactions> (*this, confirming_set, block_processor) },
active{ *active_impl },
Expand Down Expand Up @@ -1005,7 +1004,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::pruning);
auto scoped_write_guard = store.write_queue.wait (nano::store::writer::pruning);
auto write_transaction (store.tx_begin_write ({ tables::blocks, tables::pruned }));
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
Expand Down
2 changes: 0 additions & 2 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <nano/node/vote_processor.hpp>
#include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/utility.hpp>

#include <boost/program_options.hpp>
Expand Down Expand Up @@ -138,7 +137,6 @@ class node final : public std::enable_shared_from_this<node>

public:
const nano::keypair node_id;
nano::write_database_queue write_database_queue;
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class node_flags final
bool allow_bootstrap_peers_duplicates{ false };
bool disable_max_peers_per_ip{ false }; // For testing only
bool disable_max_peers_per_subnetwork{ false }; // For testing only
bool force_use_write_database_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used.
bool force_use_write_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used.
bool disable_search_pending{ false }; // For testing only
bool enable_pruning{ false };
bool fast_bootstrap{ false };
Expand Down
6 changes: 3 additions & 3 deletions nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,14 +1136,14 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
ASSERT_TRUE (!store->init_error ());
nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::store::write_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
std::atomic<bool> stopped{ false };
boost::latch initialized_latch{ 0 };

nano::block_hash block_hash_being_processed{ 0 };
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set{ ledger, write_queue };
nano::store::write_queue write_queue{ false };
nano::confirming_set confirming_set{ ledger };

auto const num_accounts = 100000;

Expand Down
4 changes: 3 additions & 1 deletion nano/store/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ add_library(
rocksdb/version.cpp
transaction.cpp
version.cpp
versioning.cpp)
versioning.cpp
write_queue.hpp
write_queue.cpp)

target_link_libraries(
nano_store
Expand Down
3 changes: 2 additions & 1 deletion nano/store/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <nano/store/confirmation_height.hpp>
#include <nano/store/rep_weight.hpp>

nano::store::component::component (nano::store::block & block_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a, nano::store::rep_weight & rep_weight_a) :
nano::store::component::component (nano::store::block & block_store_a, nano::store::account & account_store_a, nano::store::pending & pending_store_a, nano::store::online_weight & online_weight_store_a, nano::store::pruned & pruned_store_a, nano::store::peer & peer_store_a, nano::store::confirmation_height & confirmation_height_store_a, nano::store::final_vote & final_vote_store_a, nano::store::version & version_store_a, nano::store::rep_weight & rep_weight_a, bool use_noops_a) :
block (block_store_a),
account (account_store_a),
pending (pending_store_a),
Expand All @@ -17,6 +17,7 @@ nano::store::component::component (nano::store::block & block_store_a, nano::sto
confirmation_height (confirmation_height_store_a),
final_vote (final_vote_store_a),
version (version_store_a),
write_queue (use_noops_a),
rep_weight (rep_weight_a)
{
}
Expand Down
Loading

0 comments on commit 85c3238

Please sign in to comment.