Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel class cleanup #4749

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,14 +1024,13 @@ TEST (network, loopback_channel)
auto & node2 = *system.nodes[1];
nano::transport::inproc::channel channel1 (node1, node1);
ASSERT_EQ (channel1.get_type (), nano::transport::transport_type::loopback);
ASSERT_EQ (channel1.get_endpoint (), node1.network.endpoint ());
ASSERT_EQ (channel1.get_tcp_endpoint (), nano::transport::map_endpoint_to_tcp (node1.network.endpoint ()));
ASSERT_EQ (channel1.get_remote_endpoint (), node1.network.endpoint ());
ASSERT_EQ (channel1.get_network_version (), node1.network_params.network.protocol_version);
ASSERT_EQ (channel1.get_node_id (), node1.node_id.pub);
ASSERT_EQ (channel1.get_node_id_optional ().value_or (0), node1.node_id.pub);
nano::transport::inproc::channel channel2 (node2, node2);
++node1.network.port;
ASSERT_NE (channel1.get_endpoint (), node1.network.endpoint ());
ASSERT_NE (channel1.get_remote_endpoint (), node1.network.endpoint ());
}

// Ensure the network filters messages with the incorrect magic number
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2703,7 +2703,7 @@ TEST (node, peer_history_restart)
ASSERT_TIMELY (10s, !node2->network.empty ());
// Confirm that the peers match with the endpoints we are expecting
auto list (node2->network.list (2));
ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list[0]->get_remote_endpoint ());
ASSERT_EQ (1, node2->network.size ());
system.stop_node (*node2);
}
Expand All @@ -2726,7 +2726,7 @@ TEST (node, peer_history_restart)
ASSERT_TIMELY (10s, !node3->network.empty ());
// Confirm that the peers match with the endpoints we are expecting
auto list (node3->network.list (2));
ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list[0]->get_remote_endpoint ());
ASSERT_EQ (1, node3->network.size ());
system.stop_node (*node3);
}
Expand Down Expand Up @@ -2788,11 +2788,11 @@ TEST (node, bidirectional_tcp)
ASSERT_EQ (1, node2->network.size ());
auto list1 (node1->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ());
ASSERT_NE (node2->network.endpoint (), list1[0]->get_endpoint ()); // Ephemeral port
ASSERT_NE (node2->network.endpoint (), list1[0]->get_remote_endpoint ()); // Ephemeral port
ASSERT_EQ (node2->node_id.pub, list1[0]->get_node_id ());
auto list2 (node2->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ());
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_remote_endpoint ());
ASSERT_EQ (node1->node_id.pub, list2[0]->get_node_id ());
// Test block propagation from node 1
nano::keypair key;
Expand Down
8 changes: 2 additions & 6 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,12 @@ TEST (peer_container, tcp_channel_cleanup_works)
ASSERT_NE (nullptr, channel1);
// set the last packet sent for channel1 only to guarantee it contains a value.
// it won't be necessarily the same use by the cleanup cutoff time
node1.network.tcp_channels.modify (channel1, [&now] (auto channel) {
channel->set_last_packet_sent (now - std::chrono::seconds (5));
});
channel1->set_last_packet_sent (now - std::chrono::seconds (5));
auto channel2 = nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ());
ASSERT_NE (nullptr, channel2);
// set the last packet sent for channel2 only to guarantee it contains a value.
// it won't be necessarily the same use by the cleanup cutoff time
node1.network.tcp_channels.modify (channel2, [&now] (auto channel) {
channel->set_last_packet_sent (now + std::chrono::seconds (1));
});
channel2->set_last_packet_sent (now + std::chrono::seconds (1));
ASSERT_EQ (2, node1.network.size ());
ASSERT_EQ (2, node1.network.tcp_channels.size ());

Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ TEST (request_aggregator, two_endpoints)

auto dummy_channel1 = std::make_shared<nano::transport::inproc::channel> (node1, node1);
auto dummy_channel2 = std::make_shared<nano::transport::inproc::channel> (node2, node2);
ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ()));
ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_remote_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_remote_endpoint ()));

std::vector<std::pair<nano::block_hash, nano::root>> request{ { send1->hash (), send1->root () } };

Expand Down
18 changes: 9 additions & 9 deletions nano/core_test/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ TEST (telemetry, basic)
ASSERT_NE (nullptr, channel);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id);

// Check the metrics are correct
ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_server));

// Call again straight away
auto telemetry_data_2 = node_client->telemetry.get_telemetry (channel->get_endpoint ());
auto telemetry_data_2 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ());
ASSERT_TRUE (telemetry_data_2);

// Call again straight away
auto telemetry_data_3 = node_client->telemetry.get_telemetry (channel->get_endpoint ());
auto telemetry_data_3 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ());
ASSERT_TRUE (telemetry_data_3);

// we expect at least one consecutive repeat of telemetry
Expand All @@ -89,7 +89,7 @@ TEST (telemetry, basic)
WAIT (3s);

std::optional<nano::telemetry_data> telemetry_data_4;
ASSERT_TIMELY (5s, telemetry_data_4 = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data_4 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_NE (*telemetry_data, *telemetry_data_4);
}

Expand Down Expand Up @@ -120,13 +120,13 @@ TEST (telemetry, disconnected)
ASSERT_NE (nullptr, channel);

// Ensure telemetry is available before disconnecting
ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));

system.stop_node (*node_server);
ASSERT_TRUE (channel);

// Ensure telemetry from disconnected peer is removed
ASSERT_TIMELY (5s, !node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, !node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
}

TEST (telemetry, dos_tcp)
Expand Down Expand Up @@ -185,14 +185,14 @@ TEST (telemetry, disable_metrics)

node_client->telemetry.trigger ();

ASSERT_NEVER (1s, node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_NEVER (1s, node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));

// It should still be able to receive metrics though
auto channel1 = node_server->network.find_node_id (node_client->get_node_id ());
ASSERT_NE (nullptr, channel1);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_server->telemetry.get_telemetry (channel1->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_server->telemetry.get_telemetry (channel1->get_remote_endpoint ()));

ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_client));
}
Expand Down Expand Up @@ -237,7 +237,7 @@ TEST (telemetry, maker_pruning)
ASSERT_NE (nullptr, channel);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id);

// Ensure telemetry response indicates pruned node
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ TEST (websocket, telemetry)

auto channel = node1->network.find_node_id (node2->get_node_id ());
ASSERT_NE (channel, nullptr);
ASSERT_TIMELY (5s, node1->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, node1->telemetry.get_telemetry (channel->get_remote_endpoint ()));

ASSERT_TIMELY_EQ (10s, future.wait_for (0s), std::future_status::ready);

Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstr
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto const & socket_l = client_a->socket;
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_remote_endpoint ()))
{
socket_l->set_timeout (node.network_params.network.idle_timeout);
// Push into idle deque
Expand Down Expand Up @@ -138,7 +138,7 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::find_connec
std::shared_ptr<nano::bootstrap_client> result;
for (auto i (idle.begin ()), end (idle.end ()); i != end && !stopped; ++i)
{
if ((*i)->channel->get_tcp_endpoint () == endpoint_a)
if ((*i)->channel->get_remote_endpoint () == endpoint_a)
{
result = *i;
idle.erase (i);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<nano::m
lock_a.lock ();
if (connection_l && !stopped)
{
endpoint_frontier_request = connection_l->channel->get_tcp_endpoint ();
endpoint_frontier_request = connection_l->channel->get_remote_endpoint ();
std::future<bool> future;
{
auto this_l = std::dynamic_pointer_cast<nano::bootstrap_attempt_legacy> (shared_from_this ());
Expand Down
6 changes: 3 additions & 3 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2991,7 +2991,7 @@ void nano::json_handler::peers ()
bool const peer_details = request.get<bool> ("peer_details", false);
auto peers_list (node.network.list (std::numeric_limits<std::size_t>::max ()));
std::sort (peers_list.begin (), peers_list.end (), [] (auto const & lhs, auto const & rhs) {
return lhs->get_endpoint () < rhs->get_endpoint ();
return lhs->get_remote_endpoint () < rhs->get_remote_endpoint ();
});
for (auto i (peers_list.begin ()), n (peers_list.end ()); i != n; ++i)
{
Expand All @@ -3003,9 +3003,9 @@ void nano::json_handler::peers ()
boost::property_tree::ptree pending_tree;
pending_tree.put ("protocol_version", std::to_string (channel->get_network_version ()));
auto node_id_l (channel->get_node_id_optional ());
if (node_id_l.is_initialized ())
if (node_id_l.has_value ())
{
pending_tree.put ("node_id", node_id_l.get ().to_node_id ());
pending_tree.put ("node_id", node_id_l.value ().to_node_id ());
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/message_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class process_visitor : public nano::message_visitor
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
// TODO: Remove this as we do not need to establish a second connection to the same peer
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
nano::endpoint new_endpoint (channel->get_remote_endpoint ().address (), peer0.port ());
node.network.merge_peer (new_endpoint);

// Remember this for future forwarding to other peers
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,14 @@ void nano::network::erase (nano::transport::channel const & channel_a)
auto const channel_type = channel_a.get_type ();
if (channel_type == nano::transport::transport_type::tcp)
{
tcp_channels.erase (channel_a.get_tcp_endpoint ());
tcp_channels.erase (channel_a.get_remote_endpoint ());
}
}

void nano::network::exclude (std::shared_ptr<nano::transport::channel> const & channel)
{
// Add to peer exclusion list
excluded_peers.add (channel->get_tcp_endpoint ());
excluded_peers.add (channel->get_remote_endpoint ());

// Disconnect
erase (*channel);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void nano::rep_crawler::validate_and_process (nano::unique_lock<nano::mutex> & l
rep.last_response = std::chrono::steady_clock::now ();

// Update if representative channel was changed
if (rep.channel->get_endpoint () != channel->get_endpoint ())
if (rep.channel->get_remote_endpoint () != channel->get_remote_endpoint ())
{
debug_assert (rep.account == vote->account);
updated = true;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void nano::telemetry::run_requests ()
}
}

void nano::telemetry::request (std::shared_ptr<nano::transport::channel> & channel)
void nano::telemetry::request (std::shared_ptr<nano::transport::channel> const & channel)
{
stats.inc (nano::stat::type::telemetry, nano::stat::detail::request);

Expand All @@ -228,7 +228,7 @@ void nano::telemetry::run_broadcasts ()
}
}

void nano::telemetry::broadcast (std::shared_ptr<nano::transport::channel> & channel, const nano::telemetry_data & telemetry)
void nano::telemetry::broadcast (std::shared_ptr<nano::transport::channel> const & channel, const nano::telemetry_data & telemetry)
{
stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast);

Expand Down
6 changes: 3 additions & 3 deletions nano/node/telemetry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class telemetry

nano::endpoint endpoint () const
{
return channel->get_endpoint ();
return channel->get_remote_endpoint ();
}
};

Expand All @@ -105,8 +105,8 @@ class telemetry
void run_broadcasts ();
void cleanup ();

void request (std::shared_ptr<nano::transport::channel> &);
void broadcast (std::shared_ptr<nano::transport::channel> &, nano::telemetry_data const &);
void request (std::shared_ptr<nano::transport::channel> const &);
void broadcast (std::shared_ptr<nano::transport::channel> const &, nano::telemetry_data const &);

bool verify (nano::telemetry_ack const &, std::shared_ptr<nano::transport::channel> const &) const;
bool check_timeout (entry const &) const;
Expand Down
19 changes: 9 additions & 10 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo

void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint)
{
nano::lock_guard<nano::mutex> lock{ channel_mutex };
nano::lock_guard<nano::mutex> lock{ mutex };
peering_endpoint = endpoint;
}

nano::endpoint nano::transport::channel::get_peering_endpoint () const
{
nano::unique_lock<nano::mutex> lock{ channel_mutex };
if (peering_endpoint)
{
return *peering_endpoint;
}
else
{
lock.unlock ();
return get_endpoint ();
nano::lock_guard<nano::mutex> lock{ mutex };
if (peering_endpoint)
{
return *peering_endpoint;
}
}
return get_remote_endpoint ();
}

std::shared_ptr<nano::node> nano::transport::channel::owner () const
Expand All @@ -70,7 +68,8 @@ std::shared_ptr<nano::node> nano::transport::channel::owner () const

void nano::transport::channel::operator() (nano::object_stream & obs) const
{
obs.write ("endpoint", get_endpoint ());
obs.write ("remote_endpoint", get_remote_endpoint ());
obs.write ("local_endpoint", get_local_endpoint ());
obs.write ("peering_endpoint", get_peering_endpoint ());
obs.write ("node_id", get_node_id ().to_node_id ());
}
Loading
Loading