Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: Populated fork db ASAP; Vote processing off the main thread #2385

Merged
merged 32 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ad64fc7
GH-2102 Use vector of atomic bools to avoid mutex lock for duplicate …
heifner Apr 2, 2024
3887b2c
GH-2102 Add block to fork db as soon as header is validated so it is …
heifner Apr 2, 2024
bd43d61
GH-2102 May already be in forkdb since we are adding as soon as heade…
heifner Apr 3, 2024
c84bab1
GH-2102 Add logging on app().quit()
heifner Apr 3, 2024
cf50336
GH-2102 Integrate qc and vote when switching forks if first time bloc…
heifner Apr 3, 2024
053fe72
GH-2102 Log irreversible even when head not updated
heifner Apr 3, 2024
4066109
GH-2102 Check for validated block to avoid extra processing
heifner Apr 4, 2024
9411c05
GH-2102 Init processed on reflection
heifner Apr 4, 2024
0c82e7a
GH-2102 Handle corner case of trx locally applied but not in a block yet
heifner Apr 4, 2024
b9deb58
GH-2102 Only report fork switch on actual fork switch. maybe_switch_f…
heifner Apr 4, 2024
8a7b973
GH-2102 Check for terminate_at_block during apply of fork db blocks
heifner Apr 4, 2024
274e44a
GH-2102 If forked out again then could still be in local state
heifner Apr 4, 2024
2e49794
GH-2102 Allow ctrl-c shutdown during sync of a large number of blocks…
heifner Apr 4, 2024
4c3535f
GH-2102 Fix sync issue with receiving a current block while syncing
heifner Apr 4, 2024
112bb69
GH-2102 On startup pending_head forkdb blocks are processed. If asked…
heifner Apr 4, 2024
914f218
GH-2102 Fix comparison in waitForBlock
heifner Apr 5, 2024
5e9af0c
GH-2102 Better error reporting and a bit more tolerance for trxs in b…
heifner Apr 5, 2024
2449830
GH-2102 Add additional logging for applied blocks
heifner Apr 5, 2024
e464aee
Revert "GH-2102 Fix comparison in waitForBlock"
heifner Apr 5, 2024
a74450c
GH-2102 Improve logging during sync
heifner Apr 5, 2024
cafd0b6
GH-2102 Add a large_atomic wrapper around mutex
heifner Apr 5, 2024
302b957
GH-2102 Use large_atomic for if_irreversible_block_id
heifner Apr 5, 2024
be77ca6
GH-2102 No need to recalculate finality digest
heifner Apr 5, 2024
23945ff
GH-2102 Move valid_qc into pending_qc and make thread safe
heifner Apr 5, 2024
ba5006e
GH-2102 Improve test conditions
heifner Apr 5, 2024
9de11ff
GH-2102 Move produced/received block logging into controller so it lo…
heifner Apr 6, 2024
8e98c18
GH-2102 Fix log_applied to not be called during replay
heifner Apr 6, 2024
a9058a1
GH-2102 Add a copyable atomic type
heifner Apr 8, 2024
6d64c27
GH-2102 Integrate qc and vote if possible off the main thread.
heifner Apr 8, 2024
6bd8128
Merge remote-tracking branch 'origin/hotstuff_integration' into GH-21…
heifner Apr 8, 2024
33210c0
GH-2102 Small cleanup from PR review
heifner Apr 9, 2024
5cc1b87
GH-2102 Move Produced block log into controller_impl commit_block
heifner Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions libraries/chain/block_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ block_state_ptr block_state::create_if_genesis_block(const block_state_legacy& b
// TODO: https://github.com/AntelopeIO/leap/issues/2057
// TODO: Do not aggregate votes on blocks created from block_state_legacy. This can be removed when #2057 complete.
result.pending_qc = pending_quorum_certificate{result.active_finalizer_policy->finalizers.size(), result.active_finalizer_policy->threshold, result.active_finalizer_policy->max_weak_sum_before_weak_final()};
result.valid_qc = {}; // best qc received from the network inside block extension, empty until first savanna proper IF block
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

// build leaf_node and validation_tree
valid_t::finality_leaf_node_t leaf_node {
Expand All @@ -101,7 +100,7 @@ block_state_ptr block_state::create_if_genesis_block(const block_state_legacy& b
.validation_mroots = { validation_tree.get_root() }
};

result.validated = bsp.is_valid();
result.validated.store(bsp.is_valid());
result.pub_keys_recovered = bsp._pub_keys_recovered;
result.cached_trxs = bsp._cached_trxs;
result.action_mroot = *bsp.action_mroot_savanna;
Expand Down Expand Up @@ -248,33 +247,6 @@ void block_state::verify_qc(const valid_quorum_certificate& qc) const {
invalid_qc_claim, "signature validation failed" );
}

std::optional<quorum_certificate> block_state::get_best_qc() const {
// if pending_qc does not have a valid QC, consider valid_qc only
if( !pending_qc.is_quorum_met() ) {
if( valid_qc ) {
return quorum_certificate{ block_num(), *valid_qc };
} else {
return std::nullopt;
}
}

// extract valid QC from pending_qc
valid_quorum_certificate valid_qc_from_pending = pending_qc.to_valid_quorum_certificate();

// if valid_qc does not have value, consider valid_qc_from_pending only
if( !valid_qc ) {
return quorum_certificate{ block_num(), valid_qc_from_pending };
}

// Both valid_qc and valid_qc_from_pending have value. Compare them and select a better one.
// Strong beats weak. Tie break by valid_qc.
const auto& best_qc =
valid_qc->is_strong() == valid_qc_from_pending.is_strong() ?
*valid_qc : // tie broke by valid_qc
valid_qc->is_strong() ? *valid_qc : valid_qc_from_pending; // strong beats weak
return quorum_certificate{ block_num(), best_qc };
}

valid_t block_state::new_valid(const block_header_state& next_bhs, const digest_type& action_mroot, const digest_type& strong_digest) const {
assert(valid);
assert(next_bhs.core.last_final_block_num() >= core.last_final_block_num());
Expand Down
281 changes: 183 additions & 98 deletions libraries/chain/controller.cpp

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace eosio::chain {

struct block_state_accessor {
static bool is_valid(const block_state& bs) { return bs.is_valid(); }
static void set_valid(block_state& bs, bool v) { bs.validated = v; }
static void set_valid(block_state& bs, bool v) { bs.validated.store(v); }
};

struct block_state_legacy_accessor {
Expand Down Expand Up @@ -128,6 +128,7 @@ namespace eosio::chain {

bsp_t get_block_impl( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
bool block_exists_impl( const block_id_type& id ) const;
bool validated_block_exists_impl( const block_id_type& id ) const;
void reset_root_impl( const bsp_t& root_bs );
void rollback_head_to_root_impl();
void advance_root_impl( const block_id_type& id );
Expand Down Expand Up @@ -664,7 +665,19 @@ namespace eosio::chain {
return index.find( id ) != index.end();
}

// ------------------ fork_database -------------------------
template<class BSP>
bool fork_database_t<BSP>::validated_block_exists(const block_id_type& id) const {
std::lock_guard g( my->mtx );
return my->validated_block_exists_impl(id);
}

template<class BSP>
bool fork_database_impl<BSP>::validated_block_exists_impl(const block_id_type& id) const {
auto itr = index.find( id );
return itr != index.end() && bs_accessor_t::is_valid(*(*itr));
}

// ------------------ fork_database -------------------------

fork_database::fork_database(const std::filesystem::path& data_dir)
: data_dir(data_dir)
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/hotstuff/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
} else {
sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()});
}
return vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig };
return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }};
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
}
return {};
}
Expand Down
108 changes: 75 additions & 33 deletions libraries/chain/hotstuff/hotstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,51 @@ inline std::vector<uint32_t> bitset_to_vector(const hs_bitset& bs) {
}

bool pending_quorum_certificate::has_voted(size_t index) const {
std::lock_guard g(*_mtx);
return _strong_votes._bitset.at(index) || _weak_votes._bitset.at(index);
return _strong_votes.has_voted(index) || _weak_votes.has_voted(index);
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
}

bool pending_quorum_certificate::has_voted_no_lock(bool strong, size_t index) const {
if (strong) {
return _strong_votes._bitset[index];
return _strong_votes.has_voted(index);
}
return _weak_votes.has_voted(index);
}

void pending_quorum_certificate::votes_t::reflector_init() {
_processed = std::vector<std::atomic<bool>>(_bitset.size());
for (size_t i = 0; i < _bitset.size(); ++i) {
if (_bitset[i]) {
_processed[i].store(true, std::memory_order_relaxed);
}
}
return _weak_votes._bitset[index];
}

bool pending_quorum_certificate::votes_t::has_voted(size_t index) const {
assert(index < _processed.size());
return _processed[index].load(std::memory_order_relaxed);
}


vote_status pending_quorum_certificate::votes_t::add_vote(size_t index, const bls_signature& sig) {
if (_bitset[index]) { // check here as could have come in while unlocked
return vote_status::duplicate; // shouldn't be already present
}
_processed[index].store(true, std::memory_order_relaxed);
_bitset.set(index);
_sig.aggregate(sig); // works even if _sig is default initialized (fp2::zero())
return vote_status::success;
}

void pending_quorum_certificate::votes_t::reset(size_t num_finalizers) {
if (num_finalizers != _bitset.size())
_bitset.resize(num_finalizers);
_bitset.reset();
_sig = bls_aggregate_signature();
}

pending_quorum_certificate::pending_quorum_certificate()
: _mtx(std::make_unique<std::mutex>()) {
}

pending_quorum_certificate::pending_quorum_certificate(size_t num_finalizers, uint64_t quorum, uint64_t max_weak_sum_before_weak_final)
: _mtx(std::make_unique<std::mutex>())
, _quorum(quorum)
, _max_weak_sum_before_weak_final(max_weak_sum_before_weak_final) {
_weak_votes.resize(num_finalizers);
_strong_votes.resize(num_finalizers);
, _max_weak_sum_before_weak_final(max_weak_sum_before_weak_final)
, _weak_votes(num_finalizers)
, _strong_votes(num_finalizers) {
}

bool pending_quorum_certificate::is_quorum_met() const {
Expand Down Expand Up @@ -133,34 +141,30 @@ vote_status pending_quorum_certificate::add_vote(block_num_type block_num, bool
const bls_public_key& pubkey, const bls_signature& sig, uint64_t weight) {
vote_status s = vote_status::success;

std::unique_lock g(*_mtx);
state_t pre_state = _state;
state_t post_state = pre_state;
if (has_voted_no_lock(strong, index)) {
s = vote_status::duplicate;
} else {
g.unlock();
if (!fc::crypto::blslib::verify(pubkey, proposal_digest, sig)) {
wlog( "signature from finalizer ${i} cannot be verified", ("i", index) );
s = vote_status::invalid_signature;
} else {
g.lock();
s = strong ? add_strong_vote(index, sig, weight)
: add_weak_vote(index, sig, weight);
post_state = _state;
g.unlock();
}
dlog("block_num: ${bn}, vote strong: ${sv}, duplicate", ("bn", block_num)("sv", strong));
return vote_status::duplicate;
}

if (!fc::crypto::blslib::verify(pubkey, proposal_digest, sig)) {
wlog( "signature from finalizer ${i} cannot be verified", ("i", index) );
return vote_status::invalid_signature;
}

std::unique_lock g(*_mtx);
state_t pre_state = _state;
s = strong ? add_strong_vote(index, sig, weight)
: add_weak_vote(index, sig, weight);
state_t post_state = _state;
g.unlock();
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved

dlog("block_num: ${bn}, vote strong: ${sv}, status: ${s}, pre-state: ${pre}, post-state: ${state}, quorum_met: ${q}",
("bn", block_num)("sv", strong)("s", s)("pre", pre_state)("state", post_state)("q", is_quorum_met(post_state)));
return s;
}

// thread safe
// called by get_best_qc which acquires a mutex
valid_quorum_certificate pending_quorum_certificate::to_valid_quorum_certificate() const {
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard g(*_mtx);

valid_quorum_certificate valid_qc;

if( _state == state_t::strong ) {
Expand All @@ -177,6 +181,44 @@ valid_quorum_certificate pending_quorum_certificate::to_valid_quorum_certificate
return valid_qc;
}

std::optional<quorum_certificate> pending_quorum_certificate::get_best_qc(block_num_type block_num) const {
std::lock_guard g(*_mtx);
// if pending_qc does not have a valid QC, consider valid_qc only
if( !is_quorum_met_no_lock() ) {
if( _valid_qc ) {
return std::optional{quorum_certificate{ block_num, *_valid_qc }};
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
} else {
return std::nullopt;
}
}

// extract valid QC from pending_qc
valid_quorum_certificate valid_qc_from_pending = to_valid_quorum_certificate();

// if valid_qc does not have value, consider valid_qc_from_pending only
if( !_valid_qc ) {
return std::optional{quorum_certificate{ block_num, valid_qc_from_pending }};
}

// Both valid_qc and valid_qc_from_pending have value. Compare them and select a better one.
// Strong beats weak. Tie break by valid_qc.
const auto& best_qc =
_valid_qc->is_strong() == valid_qc_from_pending.is_strong() ?
*_valid_qc : // tie broken by valid_qc
_valid_qc->is_strong() ? *_valid_qc : valid_qc_from_pending; // strong beats weak
return std::optional{quorum_certificate{ block_num, best_qc }};
}

void pending_quorum_certificate::set_valid_qc(const valid_quorum_certificate& qc) {
std::lock_guard g(*_mtx);
_valid_qc = qc;
}

bool pending_quorum_certificate::valid_qc_is_strong() const {
std::lock_guard g(*_mtx);
return _valid_qc && _valid_qc->is_strong();
}

bool pending_quorum_certificate::is_quorum_met_no_lock() const {
return is_quorum_met(_state);
}
Expand Down
12 changes: 7 additions & 5 deletions libraries/chain/include/eosio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/action_receipt.hpp>
#include <eosio/chain/incremental_merkle.hpp>
#include <eosio/chain/thread_utils.hpp>

namespace eosio::chain {

Expand Down Expand Up @@ -69,12 +70,11 @@ struct block_state : public block_header_state { // block_header_state provi
digest_type strong_digest; // finalizer_digest (strong, cached so we can quickly validate votes)
weak_digest_t weak_digest; // finalizer_digest (weak, cached so we can quickly validate votes)
pending_quorum_certificate pending_qc; // where we accumulate votes we receive
std::optional<valid_quorum_certificate> valid_qc; // best qc received from the network inside block extension
std::optional<valid_t> valid;

// ------ updated for votes, used for fork_db ordering ------------------------------
private:
bool validated = false; // We have executed the block's trxs and verified that action merkle root (block id) matches.
copyable_atomic<bool> validated{false}; // We have executed the block's trxs and verified that action merkle root (block id) matches.

// ------ data members caching information available elsewhere ----------------------
bool pub_keys_recovered = false;
Expand All @@ -83,7 +83,7 @@ struct block_state : public block_header_state { // block_header_state provi
std::optional<digest_type> base_digest; // For finality_data sent to SHiP, computed on demand in get_finality_data()

// ------ private methods -----------------------------------------------------------
bool is_valid() const { return validated; }
bool is_valid() const { return validated.load(); }
bool is_pub_keys_recovered() const { return pub_keys_recovered; }
deque<transaction_metadata_ptr> extract_trxs_metas();
void set_trxs_metas(deque<transaction_metadata_ptr>&& trxs_metas, bool keys_recovered);
Expand All @@ -103,7 +103,9 @@ struct block_state : public block_header_state { // block_header_state provi
const extensions_type& header_extensions() const { return block_header_state::header.header_extensions; }
uint32_t irreversible_blocknum() const { return core.last_final_block_num(); } // backwards compatibility
uint32_t last_final_block_num() const { return core.last_final_block_num(); }
std::optional<quorum_certificate> get_best_qc() const;
std::optional<quorum_certificate> get_best_qc() const { return pending_qc.get_best_qc(block_num()); } // thread safe
bool valid_qc_is_strong() const { return pending_qc.valid_qc_is_strong(); } // thread safe
void set_valid_qc(const valid_quorum_certificate& qc) { pending_qc.set_valid_qc(qc); }

protocol_feature_activation_set_ptr get_activated_protocol_features() const { return block_header_state::activated_protocol_features; }
uint32_t last_qc_block_num() const { return core.latest_qc_claim().block_num; }
Expand Down Expand Up @@ -164,4 +166,4 @@ using block_state_pair = std::pair<std::shared_ptr<block_state_legacy>, blo
FC_REFLECT( eosio::chain::valid_t::finality_leaf_node_t, (major_version)(minor_version)(block_num)(finality_digest)(action_mroot) )
FC_REFLECT( eosio::chain::valid_t, (validation_tree)(validation_mroots))
FC_REFLECT( eosio::chain::finality_data_t, (major_version)(minor_version)(active_finalizer_policy_generation)(action_mroot)(base_digest))
FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(strong_digest)(weak_digest)(pending_qc)(valid_qc)(valid)(validated) )
FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(strong_digest)(weak_digest)(pending_qc)(valid)(validated) )
7 changes: 5 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace eosio::chain {

void assemble_and_complete_block( block_report& br, const signer_callback_type& signer_callback );
void sign_block( const signer_callback_type& signer_callback );
void commit_block();
void commit_block(block_report& br);
void maybe_switch_forks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

// thread-safe
Expand Down Expand Up @@ -276,7 +276,8 @@ namespace eosio::chain {
// thread-safe
signed_block_ptr fetch_block_by_id( const block_id_type& id )const;
// thread-safe
bool block_exists( const block_id_type& id)const;
bool block_exists(const block_id_type& id) const;
bool validated_block_exists(const block_id_type& id) const;
// thread-safe
std::optional<signed_block_header> fetch_block_header_by_number( uint32_t block_num )const;
// thread-safe
Expand Down Expand Up @@ -372,6 +373,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace eosio::chain {

bsp_t get_block( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
bool block_exists( const block_id_type& id ) const;
bool validated_block_exists( const block_id_type& id ) const;

/**
* Purges any existing blocks from the fork database and resets the root block_header_state to the provided value.
Expand Down
Loading
Loading