Skip to content

Commit

Permalink
Merge pull request #4735 from pwojcikdev/vote-duplicate-filter
Browse files Browse the repository at this point in the history
Network filter for votes
  • Loading branch information
pwojcikdev authored Sep 25, 2024
2 parents 0cfb0a7 + c9ee536 commit 4a144c6
Show file tree
Hide file tree
Showing 28 changed files with 471 additions and 204 deletions.
18 changes: 9 additions & 9 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,22 +649,22 @@ TEST (active_elections, dropped_cleanup)
nano::vectorstream stream (block_bytes);
chain[0]->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_FALSE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));

auto election = nano::test::start_election (system, node, hash);
ASSERT_NE (nullptr, election);

// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.vote_router.active (hash));

// Now simulate dropping the election
ASSERT_FALSE (election->confirmed ());
node.active.erase (*chain[0]);

// The filter must have been cleared
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_FALSE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));

// An election was recently dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));
Expand All @@ -673,7 +673,7 @@ TEST (active_elections, dropped_cleanup)
ASSERT_FALSE (node.vote_router.active (hash));

// Repeat test for a confirmed election
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));

election = nano::test::start_election (system, node, hash);
ASSERT_NE (nullptr, election);
Expand All @@ -682,7 +682,7 @@ TEST (active_elections, dropped_cleanup)
node.active.erase (*chain[0]);

// The filter should not have been cleared
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ()));

// Not dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));
Expand Down Expand Up @@ -825,7 +825,7 @@ TEST (active_elections, fork_filter_cleanup)
ASSERT_TIMELY_EQ (5s, node1.ledger.block_count (), 2);

// Block is erased from the duplicate filter
ASSERT_TIMELY (5s, node1.network.publish_filter.apply (send_block_bytes.data (), send_block_bytes.size ()));
ASSERT_TIMELY (5s, node1.network.filter.apply (send_block_bytes.data (), send_block_bytes.size ()));
}

/*
Expand Down Expand Up @@ -960,7 +960,7 @@ TEST (active_elections, fork_replacement_tally)
// Process correct block
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config));
node1.network.publish_filter.clear ();
node1.network.filter.clear ();
node2.network.flood_block (send_last);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);

Expand All @@ -974,7 +974,7 @@ TEST (active_elections, fork_replacement_tally)
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.publish_filter.clear ();
node1.network.filter.clear ();
node2.network.flood_block (send_last);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

Expand Down
118 changes: 108 additions & 10 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,21 @@ TEST (network, peer_max_tcp_attempts_subnetwork)
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::max_per_subnetwork, nano::stat::dir::out));
}

namespace
{
// Skip the first 8 bytes of the message header, which is the common header for all messages
std::vector<uint8_t> message_payload_to_bytes (nano::message const & message)
{
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
message.serialize (stream);
}
debug_assert (bytes.size () > nano::message_header::size);
return std::vector<uint8_t> (bytes.begin () + nano::message_header::size, bytes.end ());
}
}

// Send two publish messages and asserts that the duplication is detected.
TEST (network, duplicate_detection)
{
Expand All @@ -700,9 +715,10 @@ TEST (network, duplicate_detection)
// Publish duplicate detection through TCP
auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ());
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message));
tcp_channel->send (publish);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
tcp_channel->send (publish);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1);
}
Expand All @@ -714,28 +730,110 @@ TEST (network, duplicate_revert_publish)
node_config.block_processor.max_peer_queue = 0;
auto & node (*system.add_node (node_config));
nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis };
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
std::vector<uint8_t> bytes = message_payload_to_bytes (publish);
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
ASSERT_FALSE (node.network.filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.filter.apply (bytes.data (), bytes.size ()));
auto other_node (std::make_shared<nano::node> (system.io_ctx, system.get_available_port (), nano::unique_path (), system.work));
other_node->start ();
system.nodes.push_back (other_node);
auto channel = nano::test::establish_tcp (system, *other_node, node.network.endpoint ());
ASSERT_NE (nullptr, channel);
ASSERT_EQ (0, publish.digest);
node.network.inbound (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
ASSERT_TRUE (node.network.filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.inbound (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
ASSERT_FALSE (node.network.filter.apply (bytes.data (), bytes.size ()));
}

TEST (network, duplicate_vote_detection)
{
nano::test::system system;
auto & node0 = *system.add_node ();
auto & node1 = *system.add_node ();

auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () });
nano::confirm_ack message{ nano::dev::network_params.network, vote };

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));

// Publish duplicate detection through TCP
auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ());
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
}

// Ensures that the filter doesn't filter out votes that could not be queued for processing
TEST (network, duplicate_revert_vote)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.vote_processor.enable = false; // Do not drain queued votes
node_config.vote_processor.max_non_pr_queue = 1;
node_config.vote_processor.max_pr_queue = 1;
auto & node0 = *system.add_node (node_config);
auto & node1 = *system.add_node (node_config);

auto vote1 = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 1);
nano::confirm_ack message1{ nano::dev::network_params.network, vote1 };
auto bytes1 = message_payload_to_bytes (message1);

auto vote2 = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 2);
nano::confirm_ack message2{ nano::dev::network_params.network, vote2 };
auto bytes2 = message_payload_to_bytes (message2);

// Publish duplicate detection through TCP
auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ());
ASSERT_NE (nullptr, tcp_channel);

// First vote should be processed
tcp_channel->send (message1);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ()));

// Second vote should get dropped from processor queue
tcp_channel->send (message2);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
// And the filter should not have it
WAIT (500ms); // Give the node time to process the vote
ASSERT_TIMELY (5s, !node1.network.filter.check (bytes2.data (), bytes2.size ()));
}

TEST (network, expire_duplicate_filter)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.network.duplicate_filter_cutoff = 3; // Expire after 3 seconds
auto & node0 = *system.add_node (node_config);
auto & node1 = *system.add_node (node_config);

auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () });
nano::confirm_ack message{ nano::dev::network_params.network, vote };
auto bytes = message_payload_to_bytes (message);

// Publish duplicate detection through TCP
auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ());
ASSERT_NE (nullptr, tcp_channel);

// Send a vote
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);

// The filter should expire the vote after some time
ASSERT_TRUE (node1.network.filter.check (bytes.data (), bytes.size ()));
ASSERT_TIMELY (10s, !node1.network.filter.check (bytes.data (), bytes.size ()));
}

// The test must be completed in less than 1 second
Expand Down
39 changes: 38 additions & 1 deletion nano/core_test/network_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/network_filter.hpp>
#include <nano/lib/stream.hpp>
#include <nano/node/common.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/network_filter.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

TEST (network_filter, apply)
{
nano::network_filter filter (4);
ASSERT_FALSE (filter.check (34));
ASSERT_FALSE (filter.apply (34));
ASSERT_TRUE (filter.check (34));
ASSERT_TRUE (filter.apply (34));
filter.clear (nano::network_filter::digest_t{ 34 });
ASSERT_FALSE (filter.check (34));
ASSERT_FALSE (filter.apply (34));
}

TEST (network_filter, unit)
{
nano::network_filter filter (1);
Expand Down Expand Up @@ -92,6 +104,7 @@ TEST (network_filter, many)
// Now filter the rest of the stream
// All blocks should pass through
ASSERT_FALSE (filter.apply (bytes->data (), block->size));
ASSERT_TRUE (filter.check (bytes->data (), block->size));
ASSERT_FALSE (error);

// Make sure the stream was rewinded correctly
Expand Down Expand Up @@ -127,3 +140,27 @@ TEST (network_filter, optional_digest)
filter.clear (digest);
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
}

TEST (network_filter, expire)
{
// Expire entries older than 2 epochs
nano::network_filter filter{ 4, 2 };

ASSERT_FALSE (filter.apply (1)); // Entry with epoch 0
filter.update (); // Bump epoch to 1
ASSERT_FALSE (filter.apply (2)); // Entry with epoch 1

// Both values should be detected as present
ASSERT_TRUE (filter.check (1));
ASSERT_TRUE (filter.check (2));

filter.update (2); // Bump epoch to 3

ASSERT_FALSE (filter.check (1)); // Entry with epoch 0 should be expired
ASSERT_TRUE (filter.check (2)); // Entry with epoch 1 should still be present

filter.update (); // Bump epoch to 4

ASSERT_FALSE (filter.check (2)); // Entry with epoch 1 should be expired
ASSERT_FALSE (filter.apply (2)); // Entry with epoch 1 should be replaced
}
2 changes: 1 addition & 1 deletion nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2220,7 +2220,7 @@ TEST (node, vote_by_hash_republish)
ASSERT_TIMELY (5s, node2.active.active (*send1));

// give block send2 to node1 and wait until the block is received and processed by node1
node1.network.publish_filter.clear ();
node1.network.filter.clear ();
node1.process_active (send2);
ASSERT_TIMELY (5s, node1.active.active (*send2));

Expand Down
2 changes: 1 addition & 1 deletion nano/fuzzer_test/fuzz_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void fuzz_message_parser (uint8_t const * Data, size_t Size)
}

fuzz_visitor visitor;
nano::message_parser parser (node0->network.publish_filter, node0->block_uniquer, node0->vote_uniquer, visitor, node0->work);
nano::message_parser parser (node0->network.filter, node0->block_uniquer, node0->vote_uniquer, visitor, node0->work);
parser.deserialize_buffer (Data, Size);
}

Expand Down
2 changes: 2 additions & 0 deletions nano/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ add_library(
logging_enums.cpp
memory.hpp
memory.cpp
network_filter.hpp
network_filter.cpp
numbers.hpp
numbers.cpp
object_stream.hpp
Expand Down
Loading

0 comments on commit 4a144c6

Please sign in to comment.