Skip to content

Commit

Permalink
GH-3 Move vote processing off net threads into a dedicated thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Apr 10, 2024
1 parent f9786e1 commit 7fc69ac
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 71 deletions.
33 changes: 13 additions & 20 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <eosio/chain/hotstuff/finalizer.hpp>
#include <eosio/chain/hotstuff/finalizer_policy.hpp>
#include <eosio/chain/hotstuff/hotstuff.hpp>
#include <eosio/chain/vote_processor.hpp>

#include <chainbase/chainbase.hpp>
#include <eosio/vm/allocator.hpp>
Expand Down Expand Up @@ -946,7 +947,9 @@ struct controller_impl {
signal<void(const block_signal_params&)> accepted_block;
signal<void(const block_signal_params&)> irreversible_block;
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)> applied_transaction;
signal<void(const vote_message&)> voted_block;
signal<void(const vote_signal_params&)> voted_block;

vote_processor_t vote_processor{fork_db, voted_block};

int64_t set_proposed_producers( vector<producer_authority> producers );
int64_t set_proposed_producers_legacy( vector<producer_authority> producers );
Expand Down Expand Up @@ -1195,6 +1198,7 @@ struct controller_impl {
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );
vote_processor.start(4);

set_activation_handler<builtin_protocol_feature_t::preactivate_feature>();
set_activation_handler<builtin_protocol_feature_t::replace_deferred>();
Expand All @@ -1214,6 +1218,7 @@ struct controller_impl {
irreversible_block.connect([this](const block_signal_params& t) {
const auto& [ block, id] = t;
wasmif.current_lib(block->block_num());
vote_processor.notify_lib(block->block_num());
});


Expand Down Expand Up @@ -3552,19 +3557,8 @@ struct controller_impl {


// called from net threads and controller's thread pool
vote_status process_vote_message( const vote_message& vote ) {
// only aggregate votes on proper if blocks
auto aggregate_vote = [&vote](auto& forkdb) -> vote_status {
auto bsp = forkdb.get_block(vote.block_id);
if (bsp && bsp->block->is_proper_svnn_block()) {
return bsp->aggregate_vote(vote);
}
return vote_status::unknown_block;
};
auto aggregate_vote_legacy = [](auto&) -> vote_status {
return vote_status::unknown_block;
};
return fork_db.apply<vote_status>(aggregate_vote_legacy, aggregate_vote);
void process_vote_message( uint32_t connection_id, const vote_message& vote ) {
vote_processor.process_vote_message(connection_id, vote);
}

bool node_has_voted_if_finalizer(const block_id_type& id) const {
Expand Down Expand Up @@ -3593,11 +3587,10 @@ struct controller_impl {
my_finalizers.maybe_vote(
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) {
// net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal
emit(voted_block, vote);
emit(voted_block, std::tuple{uint32_t{0}, vote_status::success, std::cref(vote)});

// also aggregate our own vote into the pending_qc for this block.
boost::asio::post(thread_pool.get_executor(),
[control = this, vote]() { control->process_vote_message(vote); });
process_vote_message(0, vote);
});
}

Expand Down Expand Up @@ -5254,8 +5247,8 @@ void controller::set_proposed_finalizers( finalizer_policy&& fin_pol ) {
}

// called from net threads
vote_status controller::process_vote_message( const vote_message& vote ) {
return my->process_vote_message( vote );
void controller::process_vote_message( uint32_t connection_id, const vote_message& vote ) {
my->process_vote_message( connection_id, vote );
};

bool controller::node_has_voted_if_finalizer(const block_id_type& id) const {
Expand Down Expand Up @@ -5538,7 +5531,7 @@ signal<void(const block_signal_params&)>& controller::accepted_block_header() {
signal<void(const block_signal_params&)>& controller::accepted_block() { return my->accepted_block; }
signal<void(const block_signal_params&)>& controller::irreversible_block() { return my->irreversible_block; }
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& controller::applied_transaction() { return my->applied_transaction; }
signal<void(const vote_message&)>& controller::voted_block() { return my->voted_block; }
signal<void(const vote_signal_params&)>& controller::voted_block() { return my->voted_block; }

chain_id_type controller::extract_chain_id(snapshot_reader& snapshot) {
chain_snapshot_header header;
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct block_header_state {
digest_type compute_finality_digest() const;

// Returns true if the block is a Proper Savanna Block
bool is_proper_svnn_block() const;
bool is_proper_svnn_block() const { return header.is_proper_svnn_block(); }

// block descending from this need the provided qc in the block extension
bool is_needed(const qc_claim_t& qc_claim) const {
Expand Down
8 changes: 4 additions & 4 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace eosio::chain {
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

using block_signal_params = std::tuple<const signed_block_ptr&, const block_id_type&>;
using vote_signal_params = std::tuple<uint32_t, vote_status, const vote_message&>;

enum class db_read_mode {
HEAD,
Expand Down Expand Up @@ -326,7 +327,7 @@ namespace eosio::chain {
// called by host function set_finalizers
void set_proposed_finalizers( finalizer_policy&& fin_pol );
// called from net threads
vote_status process_vote_message( const vote_message& msg );
void process_vote_message( uint32_t connection_id, const vote_message& msg );
// thread safe, for testing
bool node_has_voted_if_finalizer(const block_id_type& id) const;

Expand Down Expand Up @@ -373,9 +374,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();
// Unlike other signals, voted_block is signaled from other threads than the main thread.
signal<void(const vote_signal_params&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
wasm_interface& get_wasm_interface();
Expand Down
14 changes: 9 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ namespace eosio::chain {
bool strong{false};
bls_public_key finalizer_key;
bls_signature sig;

auto operator<=>(const vote_message&) const = default;
bool operator==(const vote_message&) const = default;
};

enum class vote_status {
success,
duplicate,
unknown_public_key,
invalid_signature,
unknown_block
duplicate, // duplicate vote, expected as votes arrive on multiple connections
unknown_public_key, // public key is invalid, indicates invalid vote
invalid_signature, // signature is invalid, indicates invalid vote
unknown_block, // block not available, possibly less than LIB, or too far in the future
max_exceeded // received too many votes for a connection
};

using bls_public_key = fc::crypto::blslib::bls_public_key;
Expand Down Expand Up @@ -159,7 +163,7 @@ namespace eosio::chain {


FC_REFLECT(eosio::chain::vote_message, (block_id)(strong)(finalizer_key)(sig));
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block))
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)(max_exceeded))
FC_REFLECT(eosio::chain::valid_quorum_certificate, (_strong_votes)(_weak_votes)(_sig));
FC_REFLECT(eosio::chain::pending_quorum_certificate, (_valid_qc)(_quorum)(_max_weak_sum_before_weak_final)(_state)(_strong_sum)(_weak_sum)(_weak_votes)(_strong_votes));
FC_REFLECT_ENUM(eosio::chain::pending_quorum_certificate::state_t, (unrestricted)(restricted)(weak_achieved)(weak_final)(strong));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/block_state_legacy.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
Expand Down
216 changes: 216 additions & 0 deletions libraries/chain/include/eosio/chain/vote_processor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#pragma once

#include <eosio/chain/hotstuff/hotstuff.hpp>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/composite_key.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>

namespace eosio { namespace chain {

/**
* Process votes in a dedicated thread pool.
*/
class vote_processor_t {
static constexpr size_t max_votes_per_connection = 2500; // 3000 is less than 1MB per connection
static constexpr std::chrono::milliseconds block_wait_time{10};

struct by_block_num;
struct by_connection;
struct by_vote;

struct vote {
uint32_t connection_id;
vote_message msg;

const block_id_type& id() const { return msg.block_id; }
block_num_type block_num() const { return block_header::num_from_id(msg.block_id); }
};

using vote_ptr = std::shared_ptr<vote>;
using vote_signal_type = decltype(controller({},chain_id_type::empty_chain_id()).voted_block());

typedef multi_index_container< vote_ptr,
indexed_by<
ordered_non_unique<tag<by_block_num>,
composite_key<vote,
const_mem_fun<vote, block_num_type, &vote::block_num>,
const_mem_fun<vote, const block_id_type&, &vote::id>
>, composite_key_compare< std::greater<>, sha256_less > // greater for block_num
>,
ordered_non_unique< tag<by_connection>, member<vote, uint32_t, &vote::connection_id> >,
ordered_unique< tag<by_vote>, member<vote, vote_message, &vote::msg> >
>
> vote_index_type;

fork_database& fork_db;
std::mutex mtx;
std::condition_variable cv;
vote_index_type index;
// connection, count of messages
std::map<uint32_t, uint16_t> num_messages;
std::atomic<block_num_type> lib{0};
std::atomic<bool> stopped{false};
vote_signal_type& vote_signal;
named_thread_pool<vote> thread_pool;

private:
template<typename Signal, typename Arg>
void emit( const Signal& s, Arg&& a ) {
try {
s(std::forward<Arg>(a));
} catch (std::bad_alloc& e) {
wlog( "std::bad_alloc: ${w}", ("w", e.what()) );
throw e;
} catch (boost::interprocess::bad_alloc& e) {
wlog( "boost::interprocess::bad alloc: ${w}", ("w", e.what()) );
throw e;
} catch ( controller_emit_signal_exception& e ) {
wlog( "controller_emit_signal_exception: ${details}", ("details", e.to_detail_string()) );
throw e;
} catch ( fc::exception& e ) {
wlog( "fc::exception: ${details}", ("details", e.to_detail_string()) );
} catch ( std::exception& e ) {
wlog( "std::exception: ${details}", ("details", e.what()) );
} catch ( ... ) {
wlog( "signal handler threw exception" );
}
}

void emit(uint32_t connection_id, vote_status status, const vote_message& msg) {
if (connection_id != 0) { // this nodes vote was already signaled
emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)} );
}
}

void remove_connection(uint32_t connection_id) {
auto& idx = index.get<by_connection>();
idx.erase(idx.lower_bound(connection_id), idx.upper_bound(connection_id));
}

void remove_before_lib() {
auto& idx = index.get<by_block_num>();
idx.erase(idx.lower_bound(lib.load()), idx.end()); // descending
}

bool remove_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && (*it)->id() == id) {
it = idx.erase(it);
}
return it == idx.end();
}

bool skip_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && (*it)->id() == id) {
++it;
}
return it == idx.end();
}

public:
explicit vote_processor_t(fork_database& forkdb, vote_signal_type& vote_signal)
: fork_db(forkdb)
, vote_signal(vote_signal) {}

~vote_processor_t() {
stopped = true;
std::lock_guard g(mtx);
cv.notify_one();
}

void start(size_t num_threads) {
assert(num_threads > 1); // need at least two as one is used for coordinatation
thread_pool.start( num_threads, []( const fc::exception& e ) {
elog( "Exception in vote processor thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
} );

// one coordinator thread
boost::asio::post(thread_pool.get_executor(), [&]() {
block_id_type not_in_forkdb_id{};
while (!stopped) {
std::unique_lock g(mtx);
cv.wait(g, [&]() {
if (!index.empty() || stopped)
return true;
return false;
});
if (stopped)
break;
remove_before_lib();
if (index.empty())
continue;
auto& idx = index.get<by_block_num>();
if (auto i = idx.begin(); i != idx.end() && not_in_forkdb_id == (*i)->id()) { // same block as last while loop
g.unlock();
std::this_thread::sleep_for(block_wait_time);
g.lock();
}
for (auto i = idx.begin(); i != idx.end();) {
auto& vt = *i;
block_state_ptr bsp = fork_db.apply_s<block_state_ptr>([&](const auto& forkdb) {
return forkdb.get_block(vt->id());
});
if (bsp) {
if (!bsp->is_proper_svnn_block()) {
if (remove_all_for_block(idx, i, bsp->id()))
break;
continue;
}
auto iter_of_bsp = i;
std::vector<vote_ptr> to_process;
to_process.reserve(std::min<size_t>(21u, idx.size())); // increase if we increase # of finalizers from 21
for(; i != idx.end() && bsp->id() == (*i)->id(); ++i) {
// although it is the highest contention on block state pending mutex posting all of the same bsp,
// the highest priority is processing votes for this block state.
to_process.push_back(*i);
}
bool should_break = remove_all_for_block(idx, iter_of_bsp, bsp->id());
g.unlock(); // do not hold lock when posting
for (auto& vptr : to_process) {
boost::asio::post(thread_pool.get_executor(), [this, bsp, vptr=std::move(vptr)]() {
vote_status s = bsp->aggregate_vote(vptr->msg);
emit(vptr->connection_id, s, vptr->msg);
});
}
if (should_break)
break;
} else {
not_in_forkdb_id = vt->id();
if (skip_all_for_block(idx, i, (*i)->id()))
break;
}
}
}
dlog("Exiting vote processor coordinator thread");
});
}

void notify_lib(block_num_type block_num) {
lib = block_num;
}

void process_vote_message(uint32_t connection_id, const vote_message& msg) {
vote_ptr vptr = std::make_shared<vote>(vote{.connection_id = connection_id, .msg = msg});
boost::asio::post(thread_pool.get_executor(), [this, connection_id, msg] {
std::unique_lock g(mtx);
if (++num_messages[connection_id] > max_votes_per_connection) {
// consider the connection invalid, remove all votes
remove_connection(connection_id);
g.unlock();

elog("Exceeded max votes per connection for ${c}", ("c", connection_id));
emit(connection_id, vote_status::max_exceeded, msg);
} else if (block_header::num_from_id(msg.block_id) < lib.load(std::memory_order_relaxed)) {
// ignore
} else {
index.insert(std::make_shared<vote>(vote{.connection_id = connection_id, .msg = msg}));
cv.notify_one();
}
});
}

};

} } //eosio::chain
7 changes: 7 additions & 0 deletions libraries/libfc/include/fc/crypto/bls_signature.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ namespace fc::crypto::blslib {
return _jacobian_montgomery_le.equal(sig._jacobian_montgomery_le);
}

auto operator<=>(const bls_signature& rhs) const {
return _affine_non_montgomery_le <=> rhs._affine_non_montgomery_le;
}
auto operator==(const bls_signature& rhs) const {
return _affine_non_montgomery_le == rhs._affine_non_montgomery_le;
}

template<typename T>
friend T& operator<<(T& ds, const bls_signature& sig) {
// Serialization as variable length array when it is stored as a fixed length array. This makes for easier deserialization by external tools
Expand Down
Loading

0 comments on commit 7fc69ac

Please sign in to comment.