Skip to content

Commit

Permalink
GH-2102 Integrate qc and vote if possible off the main thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Apr 8, 2024
1 parent a9058a1 commit 6d64c27
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 49 deletions.
2 changes: 1 addition & 1 deletion libraries/chain/block_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ block_state_ptr block_state::create_if_genesis_block(const block_state_legacy& b
.validation_mroots = { validation_tree.get_root() }
};

result.validated = bsp.is_valid();
result.validated.store(bsp.is_valid());
result.pub_keys_recovered = bsp._pub_keys_recovered;
result.cached_trxs = bsp._cached_trxs;
result.action_mroot = action_mroot_svnn;
Expand Down
48 changes: 20 additions & 28 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ struct controller_impl {
chain_id( chain_id ),
read_mode( cfg.read_mode ),
thread_pool(),
my_finalizers{ .t_startup = fc::time_point::now(), .persist_file_path = cfg.finalizers_dir / "safety.dat" },
my_finalizers(fc::time_point::now(), cfg.finalizers_dir / "safety.dat"),
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) {
Expand Down Expand Up @@ -3373,8 +3373,8 @@ struct controller_impl {
auto start = fc::time_point::now();

const bool already_valid = bsp->is_valid();
if (!already_valid) // has not been validated (applied) before, only in forkdb, integrate and possibly vote now
integrate_qc(bsp);
if (!already_valid) // has not been validated (applied) before, only in forkdb, see if we can vote now
consider_voting(bsp);

const signed_block_ptr& b = bsp->block;
const auto& new_protocol_feature_activations = bsp->get_new_protocol_feature_activations();
Expand Down Expand Up @@ -3544,14 +3544,14 @@ struct controller_impl {
}

bool node_has_voted_if_finalizer(const block_id_type& id) const {
if (my_finalizers.finalizers.empty())
if (my_finalizers.empty())
return true;

std::optional<bool> voted = fork_db.apply_s<std::optional<bool>>([&](auto& forkdb) -> std::optional<bool> {
auto bsp = forkdb.get_block(id);
if (bsp) {
return std::ranges::all_of(my_finalizers.finalizers, [&bsp](auto& f) {
return bsp->has_voted(f.first);
return my_finalizers.all_of_public_keys([&bsp](const auto& k) {
return bsp->has_voted(k);
});
}
return false;
Expand All @@ -3560,20 +3560,15 @@ struct controller_impl {
return !voted || *voted;
}

// thread safe
void create_and_send_vote_msg(const block_state_ptr& bsp) {
if (!bsp->block->is_proper_svnn_block())
return;

// Each finalizer configured on the node which is present in the active finalizer policy
// may create and sign a vote
// TODO: as a future optimization, we could run maybe_vote on a thread (it would need a
// lock around the file access). We should document that the voted_block is emitted
// off the main thread. net_plugin is fine for this to be emitted from any thread.
// Just need to update the comment in net_plugin
// Each finalizer configured on the node which is present in the active finalizer policy may create and sign a vote.
my_finalizers.maybe_vote(
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) {
// net plugin subscribed to this signal. it will broadcast the vote message
// on receiving the signal
// net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal
emit(voted_block, vote);

// also aggregate our own vote into the pending_qc for this block.
Expand Down Expand Up @@ -3729,6 +3724,11 @@ struct controller_impl {
EOS_ASSERT( id == bsp->id(), block_validate_exception,
"provided id ${id} does not match block id ${bid}", ("id", id)("bid", bsp->id()) );

if constexpr (savanna_mode) {
integrate_received_qc_to_block(bsp); // Save the received QC as soon as possible, no matter whether the block itself is valid or not
consider_voting(bsp);
}

if (conf.terminate_at_block == 0 || bsp->block_num() <= conf.terminate_at_block) {
forkdb.add(bsp, mark_valid_t::no, ignore_duplicate_t::yes);
}
Expand Down Expand Up @@ -3814,7 +3814,7 @@ struct controller_impl {
return;
}

// Save the QC. This is safe as the function is called by push_block & accept_block from application thread.
// Save the QC.
dlog("setting valid qc: ${rqc} into claimed block ${bn} ${id}", ("rqc", qc_ext.qc.to_qc_claim())("bn", claimed->block_num())("id", claimed->id()));
claimed->set_valid_qc(received_qc);

Expand All @@ -3830,6 +3830,9 @@ struct controller_impl {
}
}

void consider_voting(const block_state_legacy_ptr&) {}

// thread safe
void consider_voting(const block_state_ptr& bsp) {
// 1. Get the `core.final_on_strong_qc_block_num` for the block you are considering to vote on and use that to find the actual block ID
// of the ancestor block that has that block number.
Expand All @@ -3844,20 +3847,12 @@ struct controller_impl {
}
}

template <typename BSP>
void integrate_qc(const BSP& bsp) {
if constexpr (std::is_same_v<BSP, block_state_ptr>) {
integrate_received_qc_to_block(bsp);
consider_voting(bsp);
}
}

template <class BSP>
void accept_block(const BSP& bsp) {
assert(bsp && bsp->block);

// Save the received QC as soon as possible, no matter whether the block itself is valid or not
integrate_qc(bsp);
// consider voting again as final_on_strong_qc_block may have been validated since the bsp was created in create_block_state_i
consider_voting(bsp);

auto do_accept_block = [&](auto& forkdb) {
if constexpr (std::is_same_v<BSP, typename std::decay_t<decltype(forkdb.head())>>)
Expand All @@ -3877,9 +3872,6 @@ struct controller_impl {
{
assert(bsp && bsp->block);

// Save the received QC as soon as possible, no matter whether the block itself is valid or not
integrate_qc(bsp);

controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace eosio::chain {

struct block_state_accessor {
static bool is_valid(const block_state& bs) { return bs.is_valid(); }
static void set_valid(block_state& bs, bool v) { bs.validated = v; }
static void set_valid(block_state& bs, bool v) { bs.validated.store(v); }
};

struct block_state_legacy_accessor {
Expand Down
5 changes: 3 additions & 2 deletions libraries/chain/include/eosio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/action_receipt.hpp>
#include <eosio/chain/incremental_merkle.hpp>
#include <eosio/chain/thread_utils.hpp>

namespace eosio::chain {

Expand Down Expand Up @@ -73,7 +74,7 @@ struct block_state : public block_header_state { // block_header_state provi

// ------ updated for votes, used for fork_db ordering ------------------------------
private:
bool validated = false; // We have executed the block's trxs and verified that action merkle root (block id) matches.
copyable_atomic<bool> validated{false}; // We have executed the block's trxs and verified that action merkle root (block id) matches.

// ------ data members caching information available elsewhere ----------------------
bool pub_keys_recovered = false;
Expand All @@ -82,7 +83,7 @@ struct block_state : public block_header_state { // block_header_state provi
std::optional<digest_type> base_digest; // For finality_data sent to SHiP, computed on demand in get_finality_data()

// ------ private methods -----------------------------------------------------------
bool is_valid() const { return validated; }
bool is_valid() const { return validated.load(); }
bool is_pub_keys_recovered() const { return pub_keys_recovered; }
deque<transaction_metadata_ptr> extract_trxs_metas();
void set_trxs_metas(deque<transaction_metadata_ptr>&& trxs_metas, bool keys_recovered);
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
Expand Down
42 changes: 34 additions & 8 deletions libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once
#include "eosio/chain/block_state.hpp"
#include <eosio/chain/block_state.hpp>
#include <fc/crypto/bls_utils.hpp>
#include <fc/io/cfile.hpp>
#include <compare>
#include <mutex>
#include <ranges>

// -------------------------------------------------------------------------------------------
// this file defines the classes:
Expand Down Expand Up @@ -45,6 +47,7 @@ namespace eosio::chain {
};

// ----------------------------------------------------------------------------------------
// Access is protected by my_finalizers_t mutex
struct finalizer {
enum class vote_decision { no_vote, strong_vote, weak_vote };
struct vote_result {
Expand All @@ -58,7 +61,6 @@ namespace eosio::chain {
finalizer_safety_information fsi;

vote_result decide_vote(const block_state_ptr& bsp);

std::optional<vote_message> maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp,
const digest_type& digest);
};
Expand All @@ -68,22 +70,38 @@ namespace eosio::chain {
using fsi_t = finalizer_safety_information;
using fsi_map = std::map<bls_public_key, fsi_t>;

private:
const block_timestamp_type t_startup; // nodeos startup time, used for default safety_information
const std::filesystem::path persist_file_path; // where we save the safety data
mutable std::mutex mtx;
mutable fc::datastream<fc::cfile> persist_file; // we want to keep the file open for speed
std::map<bls_public_key, finalizer> finalizers; // the active finalizers for this node
std::map<bls_public_key, finalizer> finalizers; // the active finalizers for this node, loaded at startup, not mutated afterwards
fsi_map inactive_safety_info; // loaded at startup, not mutated afterwards
fsi_t default_fsi = fsi_t::unset_fsi(); // default provided at leap startup
mutable bool inactive_safety_info_written{false};

template<class F>
public:
my_finalizers_t(block_timestamp_type startup_time, const std::filesystem::path& persist_file_path)
: t_startup(startup_time)
, persist_file_path(persist_file_path)
{}

template<class F> // thread safe
void maybe_vote(const finalizer_policy& fin_pol,
const block_state_ptr& bsp,
const digest_type& digest,
F&& process_vote) {

if (finalizers.empty())
return;

std::vector<vote_message> votes;
votes.reserve(finalizers.size());

// Possible improvement in the future, look at locking only individual finalizers and releasing the lock for writing the file.
// Would require making sure that only the latest is ever written to the file.
std::unique_lock g(mtx);

// first accumulate all the votes
for (const auto& f : fin_pol.finalizers) {
if (auto it = finalizers.find(f.public_key); it != finalizers.end()) {
Expand All @@ -95,20 +113,28 @@ namespace eosio::chain {
// then save the safety info and, if successful, gossip the votes
if (!votes.empty()) {
save_finalizer_safety_info();
g.unlock();
for (const auto& vote : votes)
std::forward<F>(process_vote)(vote);
}
}

size_t size() const { return finalizers.size(); }
void set_keys(const std::map<std::string, std::string>& finalizer_keys);
size_t size() const { return finalizers.size(); } // doesn't change, thread safe
bool empty() const { return finalizers.empty(); } // doesn't change, thread safe

template<typename F>
bool all_of_public_keys(F&& f) const { // only access keys which do not change, thread safe
return std::ranges::all_of(std::views::keys(finalizers), std::forward<F>(f));
}

void set_keys(const std::map<std::string, std::string>& finalizer_keys); // only call on startup
void set_default_safety_information(const fsi_t& fsi);

// following two member functions could be private, but are used in testing
// following two member functions could be private, but are used in testing, not thread safe
void save_finalizer_safety_info() const;
fsi_map load_finalizer_safety_info();

// for testing purposes only
// for testing purposes only, not thread safe
const fsi_t& get_fsi(const bls_public_key& k) { return finalizers[k].fsi; }
void set_fsi(const bls_public_key& k, const fsi_t& fsi) { finalizers[k].fsi = fsi; }
};
Expand Down
2 changes: 1 addition & 1 deletion plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3995,7 +3995,7 @@ namespace eosio {
on_active_schedule(chain_plug->chain().active_producers());
}

// called from application thread
// called from other threads including net threads
void net_plugin_impl::on_voted_block(const vote_message& msg) {
fc_dlog(logger, "on voted signal: block #${bn} ${id}.., ${t}, key ${k}..",
("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16))
Expand Down
16 changes: 8 additions & 8 deletions unittests/finalizer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ BOOST_AUTO_TEST_CASE( basic_finalizer_safety_file_io ) try {
bls_pub_priv_key_map_t local_finalizers = { { k.pubkey_str, k.privkey_str } };

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
fset.set_keys(local_finalizers);

fset.set_fsi(k.pubkey, fsi);
Expand All @@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE( basic_finalizer_safety_file_io ) try {
}

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
fset.set_keys(local_finalizers); // that's when the finalizer safety file is read

// make sure the safety info for our finalizer that we saved above is restored correctly
Expand All @@ -123,7 +123,7 @@ BOOST_AUTO_TEST_CASE( corrupt_finalizer_safety_file ) try {
bls_pub_priv_key_map_t local_finalizers = { { k.pubkey_str, k.privkey_str } };

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
fset.set_keys(local_finalizers);

fset.set_fsi(k.pubkey, fsi);
Expand All @@ -140,7 +140,7 @@ BOOST_AUTO_TEST_CASE( corrupt_finalizer_safety_file ) try {
}

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
BOOST_REQUIRE_THROW(fset.set_keys(local_finalizers), // that's when the finalizer safety file is read
finalizer_safety_exception);

Expand All @@ -159,7 +159,7 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_io ) try {
std::vector<bls_keys_t> keys = create_keys(10);

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<1, 3, 5, 6>(keys);
fset.set_keys(local_finalizers);

Expand All @@ -171,7 +171,7 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_io ) try {
}

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<3>(keys);
fset.set_keys(local_finalizers);

Expand All @@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_io ) try {
}

{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<3>(keys);
fset.set_keys(local_finalizers);

Expand All @@ -197,7 +197,7 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_io ) try {
// even though we didn't activate finalizers 1, 5, or 6 in the prior test, and we wrote the safety file,
// make sure we have not lost the fsi that was set originally for these finalizers.
{
my_finalizers_t fset{.t_startup = block_timestamp_type{}, .persist_file_path = safety_file_path};
my_finalizers_t fset{block_timestamp_type{}, safety_file_path};
bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<1, 5, 6>(keys);
fset.set_keys(local_finalizers);

Expand Down

0 comments on commit 6d64c27

Please sign in to comment.