Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support throttling block syncing to peers. #1540

Closed
wants to merge 53 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
e68743a
Support throttling block syncing to peers. WIP
jgiszczak Aug 22, 2023
303c3d6
Add exponential backoff to throttle. Fix wretched math.
jgiszczak Aug 23, 2023
3789d17
Merge branch 'main' into p2p-peer-throttle
jgiszczak Aug 23, 2023
70b530b
Experiment: How many tests fail if waitForObj default times out
jgiszczak Aug 24, 2023
b92d84c
Address review comments in net_plugin.
jgiszczak Aug 24, 2023
dc54d46
Further tweak the sync throttle test for machines faster than mine.
jgiszczak Aug 24, 2023
3a50864
Move block sync throttling to the correct layer in the call stack.
jgiszczak Aug 25, 2023
e1c1d42
Move block sync rate limit parsing to plugin initialize.
jgiszczak Aug 25, 2023
92e4e7c
Require IPv6 addresses to be in square bracket format.
jgiszczak Aug 30, 2023
28bb38d
Added throttle exception for configured p2p-peer-addresses.
jgiszczak Aug 30, 2023
1eb1e44
Merge branch 'main' into p2p-peer-throttle
jgiszczak Sep 1, 2023
1983d90
Merge branch 'main' into p2p-peer-throttle
jgiszczak Sep 7, 2023
e998cc6
Debug commit, doesn't build.
jgiszczak Sep 11, 2023
92e4022
Fix build error. Lambda captures by value are const.
jgiszczak Sep 11, 2023
b24f8e3
Fix bare numeric value for peer throttle.
jgiszczak Sep 11, 2023
3f67034
Update supplied_peers only once per configured peer and once per API …
jgiszczak Sep 12, 2023
e59451d
Remove encapsulation violation.
jgiszczak Sep 15, 2023
ec2d36d
Restored connection reconnect method. WIP
jgiszczak Sep 18, 2023
777c8d8
Remove stacktrace additions
heifner Sep 18, 2023
e88c259
Merge branch 'p2p-peer-throttle' of github.com:AntelopeIO/leap into p…
jgiszczak Sep 18, 2023
05be825
Update netApi connect test.
jgiszczak Sep 20, 2023
f8763ae
Merge branch 'main' into p2p-peer-throttle
jgiszczak Sep 20, 2023
24f4aad
GH-1295 Project index to default iterator
heifner Sep 20, 2023
97591fe
Use reconnect method as intended, and avoid threading issue.
jgiszczak Sep 20, 2023
ffee0df
Use std::any_of when finding supplied peers to unlimit a connection.
jgiszczak Sep 20, 2023
fb740ef
Change language in p2p_multiple_listen_test for clarity.
jgiszczak Sep 23, 2023
453bfcf
Tolerate duplicate (empty) peer addresses in connection manager.
jgiszczak Sep 23, 2023
99f02c0
Add rate limit parse unittest.
jgiszczak Sep 27, 2023
b16184a
Tolerate node running with no listen endpoints.
jgiszczak Sep 27, 2023
df6d948
Restore lock of connections mutex when connecting configured peers.
jgiszczak Sep 27, 2023
733849b
Don't pass around iterators that may be invalidated by an erase.
jgiszczak Sep 27, 2023
ed69238
Renamed method.
jgiszczak Sep 27, 2023
2f80663
Break encapsulation less.
jgiszczak Sep 27, 2023
7019b65
Thread safety.
jgiszczak Sep 27, 2023
4d136e3
Revert "Restore lock of connections mutex when connecting configured …
jgiszczak Sep 27, 2023
3708418
Restore lock of connections mutex when connecting configured peers.
jgiszczak Sep 27, 2023
4baec72
Accept suggested refactoring.
jgiszczak Sep 27, 2023
8d2c1c2
Remove some unused machine-generated variables from custom shape file.
jgiszczak Sep 27, 2023
7e37de1
Convert connections mutex to resursive_mutex and update locks.
jgiszczak Sep 28, 2023
669ed0f
Revert mutex and lock type changes.
jgiszczak Sep 29, 2023
a6f7761
Revise connection_monitor for thread safety.
jgiszczak Oct 2, 2023
2ef3a6c
Merge branch 'main' into p2p-peer-throttle
jgiszczak Oct 2, 2023
7acac0c
Misc cleanups
heifner Oct 2, 2023
ff7a8a1
Add block sync bytes received metric and use it in sync throttle test.
jgiszczak Oct 4, 2023
6b2fe63
Add requests module for test.
oschwaldp-oci Oct 4, 2023
a6ed57d
Merge branch 'main' into p2p-peer-throttle
oschwaldp-oci Oct 4, 2023
d1ad2cf
Merge branch 'main' into p2p-peer-throttle
jgiszczak Oct 5, 2023
1e5b427
Add throttling flag to Prometheus peer data and use it in sync test.
jgiszczak Oct 6, 2023
db34bbf
Revise for better repeatability.
jgiszczak Oct 6, 2023
6db4ad8
Customize plugin_config_exception handling in net_plugin.
jgiszczak Oct 6, 2023
d8610c6
Merge branch 'main' into p2p-peer-throttle
jgiszczak Oct 7, 2023
5576604
Merge branch 'main' into p2p-peer-throttle
jgiszczak Oct 9, 2023
b41dd96
Merge branch 'main' into p2p-peer-throttle
jgiszczak Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ namespace eosio {
std::function<void()> increment_dropped_trxs;

private:
static const std::map<std::string, int> prefix_multipliers;
heifner marked this conversation as resolved.
Show resolved Hide resolved
alignas(hardware_destructive_interference_size)
mutable fc::mutex chain_info_mtx; // protects chain_info_t
chain_info_t chain_info GUARDED_BY(chain_info_mtx);
Expand Down Expand Up @@ -530,14 +531,15 @@ namespace eosio {

constexpr static uint16_t to_protocol_version(uint16_t v);

size_t parse_connection_limit(const string& limit_str);
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
bool in_sync() const;
fc::logger& get_logger() { return logger; }

void create_session(tcp::socket&& socket, const string listen_address, const string& limit);
};
void create_session(tcp::socket&& socket, const string listen_address, size_t limit);
}; //net_plugin_impl

// peer_[x]log must be called from thread in connection strand
#define peer_dlog( PEER, FORMAT, ... ) \
Expand Down Expand Up @@ -772,7 +774,7 @@ namespace eosio {
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
explicit connection( tcp::socket&& socket, const string& listen_address, const string& limit_str );
explicit connection( tcp::socket&& socket, const string& listen_address, size_t block_sync_rate_limit );
~connection() = default;

connection( const connection& ) = delete;
Expand All @@ -788,7 +790,6 @@ namespace eosio {
static std::string state_str(connection_state s);
const string& peer_address() const { return peer_addr; } // thread safe, const

void set_connection_limit( const string& limit_str );
void set_connection_type( const string& peer_addr );
bool is_transactions_only_connection()const { return connection_type == transactions_only; } // thread safe, atomic
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
Expand All @@ -812,7 +813,6 @@ namespace eosio {

private:
static const string unknown;
static const std::map<std::string, int> prefix_multipliers;

std::atomic<uint64_t> peer_ping_time_ns = std::numeric_limits<uint64_t>::max();

Expand Down Expand Up @@ -1063,7 +1063,7 @@ namespace eosio {
}; // class connection

const string connection::unknown = "<unknown>";
const std::map<std::string, int> connection::prefix_multipliers{
const std::map<std::string, int> net_plugin_impl::prefix_multipliers{
{"",1},{"K",pow(10,3)},{"M",pow(10,6)},{"G",pow(10, 9)},{"T",pow(10, 12)},
{"Ki",pow(2,10)},{"Mi",pow(2,20)},{"Gi",pow(2,30)},{"Ti",pow(2,40)}
};
Expand Down Expand Up @@ -1197,17 +1197,17 @@ namespace eosio {
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

connection::connection(tcp::socket&& s, const string& listen_address, const string& limit_str)
connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit)
: listen_address( listen_address ),
peer_addr(),
block_sync_rate_limit(block_sync_rate_limit),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( std::move(s) ) ),
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
{
set_connection_limit(limit_str);
update_endpoints();
fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
}
Expand Down Expand Up @@ -1238,30 +1238,6 @@ namespace eosio {
}
}

void connection::set_connection_limit( const std::string& limit_str) {
std::istringstream in(limit_str);
fc_dlog( logger, "parsing connection endpoint with locale ${l}", ("l", std::locale("").name()));
in.imbue(std::locale(""));
double limit{0};
in >> limit;
if( limit > 0.0f ) {
std::string units;
in >> units;
std::regex units_regex{"([KMGT]?[i]?)B/s"};
std::smatch units_match;
std::regex_match(units, units_match, units_regex);
try {
block_sync_rate_limit = boost::numeric_cast<size_t>(limit * prefix_multipliers.at(units_match[1].str()));
fc_dlog( logger, "setting block_sync_rate_limit to ${limit}", ("limit", block_sync_rate_limit));
} catch (boost::numeric::bad_numeric_cast&) {
fc_wlog( logger, "listen address ${la} block sync limit specification overflowed, connection will not be throttled", ("la", listen_address));
block_sync_rate_limit = 0;
}
} else {
block_sync_rate_limit = 0;
}
}

// called from connection strand
void connection::set_connection_type( const std::string& peer_add ) {
auto [host, port, type] = split_host_port_type(peer_add);
Expand Down Expand Up @@ -2767,7 +2743,7 @@ namespace eosio {
}


void net_plugin_impl::create_session(tcp::socket&& socket, const string listen_address, const string& limit) {
void net_plugin_impl::create_session(tcp::socket&& socket, const string listen_address, size_t limit) {
uint32_t visitors = 0;
uint32_t from_addr = 0;
boost::system::error_code rec;
Expand Down Expand Up @@ -4039,6 +4015,29 @@ namespace eosio {
return fc::json::from_string(s).as<T>();
}

size_t net_plugin_impl::parse_connection_limit( const std::string& limit_str) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
std::istringstream in(limit_str);
fc_dlog( logger, "parsing connection endpoint with locale ${l}", ("l", std::locale("").name()));
heifner marked this conversation as resolved.
Show resolved Hide resolved
in.imbue(std::locale(""));
double limit{0};
in >> limit;
size_t block_sync_rate_limit = 0;
if( limit > 0.0f ) {
std::string units;
in >> units;
std::regex units_regex{"([KMGT]?[i]?)B/s"};
std::smatch units_match;
std::regex_match(units, units_match, units_regex);
try {
block_sync_rate_limit = boost::numeric_cast<size_t>(limit * prefix_multipliers.at(units_match[1].str()));
heifner marked this conversation as resolved.
Show resolved Hide resolved
fc_dlog( logger, "setting block_sync_rate_limit to ${limit}", ("limit", block_sync_rate_limit));
heifner marked this conversation as resolved.
Show resolved Hide resolved
} catch (boost::numeric::bad_numeric_cast&) {
EOS_ASSERT(false, plugin_config_exception, "block sync limit specification overflowed: ${limit}", ("limit", limit_str));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use EOS_THROW instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to EOS_THROW.

}
}
heifner marked this conversation as resolved.
Show resolved Hide resolved
return block_sync_rate_limit;
}

void net_plugin_impl::plugin_initialize( const variables_map& options ) {
try {
fc_ilog( logger, "Initialize net plugin" );
Expand Down Expand Up @@ -4257,9 +4256,11 @@ namespace eosio {
limit = std::string(address, last_colon_location+1);
}

auto block_sync_rate_limit = my->parse_connection_limit(limit);

fc::create_listener<tcp>(
my->thread_pool.get_executor(), logger, accept_timeout, listen_addr, extra_listening_log_info,
[my = my, addr = p2p_addr, limit = limit](tcp::socket&& socket) { fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", ("addr", addr)("limit", limit)); my->create_session(std::move(socket), addr, limit); });
[my = my, addr = p2p_addr, block_sync_rate_limit = block_sync_rate_limit](tcp::socket&& socket) { fc_dlog( logger, "start listening on ${addr} with peer sync throttle ${limit}", ("addr", addr)("limit", block_sync_rate_limit)); my->create_session(std::move(socket), addr, block_sync_rate_limit); });
} catch (const std::exception& e) {
fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}",
("addr", address)("what", e.what()) );
Expand Down