From 16c390d24cdafbb85bb08f17af18058d41b12768 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 11 Dec 2024 15:06:45 -0800 Subject: [PATCH 1/3] Remove redundant NODE_LOGIN --- sqlitecluster/SQLiteNode.cpp | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 9395cf6a0..1cdba752d 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -56,20 +56,6 @@ // dbCountAtStart: The highest committed transaction in the DB at the start of this transaction on leader, for // optimizing replication. -// NOTE: This comment as well as NODE_LOGIN should be removed after https://github.com/Expensify/Bedrock/pull/1999 is deployed. -// On LOGIN vs NODE_LOGIN. -// _onConnect sends a LOGIN message. -// _onConnect is called in exctly two places: -// 1. In response to a NODE_LOGIN message received on a newly connected socket on the sync port. It's expected when -// establishing a connection, a node sends this NODE_LOGIN as its first message. -// 2. Immediately following establishing a TCP connection to another node and sending a NODE_LOGIN message. In the case that -// we are the initiating node, we immediately queue three messages: -// 1. NODE_LOGIN -// 2. PING -// 3. LOGIN -// -// When we receive a NODE_LOGIN, we immediately respond with a PING followed by a LOGIN (by calling _onConnect). - #undef SLOGPREFIX #define SLOGPREFIX "{" << _name << "/" << SQLiteNode::stateName(_state) << "} " @@ -1268,9 +1254,6 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->latency = max(STimeNow() - message.calc64("Timestamp"), 1ul); SINFO("Received PONG from peer '" << peer->name << "' (" << peer->latency/1000 << "ms latency)"); return; - } else if (SIEquals(message.methodLine, "NODE_LOGIN")) { - // Do nothing, this keeps this code from warning until NODE_LOGIN is deprecated. - return; } // We ignore everything except PING and PONG from forked nodes, so we can return here in that case. @@ -2553,7 +2536,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // with peers, so we store any that we can remove in this list. list socketsToRemove; - // Check each new connection for a NODE_LOGIN message. + // Check each new connection for a LOGIN message. for (auto socket : _unauthenticatedIncomingSockets) { STCPManager::postPoll(fdm, *socket); try { @@ -2565,8 +2548,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { int messageSize = message.deserialize(socket->recvBuffer); if (messageSize) { socket->recvBuffer.consumeFront(messageSize); - // Allow either LOGIN or NODE_LOGIN until we deprecate NODE_LOGIN. - if (SIEquals(message.methodLine, "NODE_LOGIN") || SIEquals(message.methodLine, "LOGIN")) { + if (SIEquals(message.methodLine, "LOGIN")) { SQLitePeer* peer = getPeerByName(message["Name"]); if (peer) { if (peer->setSocket(socket)) { @@ -2591,7 +2573,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { STHROW("Unauthenticated node '" + message["Name"] + "' attempted to connected, rejecting."); } } else { - STHROW("expecting LOGIN or NODE_LOGIN"); + STHROW("expecting LOGIN"); } } else if (STimeNow() > socket->lastRecvTime + 5'000'000) { STHROW("Incoming socket didn't send a message for over 5s, closing."); @@ -2614,10 +2596,6 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { - // When NODE_LOGIN is deprecated, we can remove the next 3 lines. - SData login("NODE_LOGIN"); - login["Name"] = _name; - _sendToPeer(peer, login); _onConnect(peer); _sendPING(peer); } From 9e22d67782ed43cbde2edb41a903ad0baa80eb35 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 11 Dec 2024 16:38:11 -0800 Subject: [PATCH 2/3] Accept but do not send, NODE_LOGIN --- sqlitecluster/SQLiteNode.cpp | 11 +++++++- sqlitecluster/SQLiteNode.h | 4 +-- test/clustertest/tests/ClusterUpgradeTest.cpp | 27 +++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 1cdba752d..0cbd018b1 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1254,6 +1254,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->latency = max(STimeNow() - message.calc64("Timestamp"), 1ul); SINFO("Received PONG from peer '" << peer->name << "' (" << peer->latency/1000 << "ms latency)"); return; + } else if (SIEquals(message.methodLine, "NODE_LOGIN")) { + // We need to return early here to ignore this deprecated message and avoid throwing: + // STHROW("not logged in"); + // Below. We can remove this check after one more deploy cycle. + // https://github.com/Expensify/Expensify/issues/450953 + return; } // We ignore everything except PING and PONG from forked nodes, so we can return here in that case. @@ -2548,7 +2554,10 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { int messageSize = message.deserialize(socket->recvBuffer); if (messageSize) { socket->recvBuffer.consumeFront(messageSize); - if (SIEquals(message.methodLine, "LOGIN")) { + // Old nodes, for one more upgrade cycle, will still send `NODE_LOGIN`. We can remove this check after this + // code is deployed. + // See: https://github.com/Expensify/Expensify/issues/450953 + if (SIEquals(message.methodLine, "NODE_LOGIN") || SIEquals(message.methodLine, "LOGIN")) { SQLitePeer* peer = getPeerByName(message["Name"]); if (peer) { if (peer->setSocket(socket)) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 57c5c1056..fd4ba5f6f 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -181,7 +181,7 @@ class SQLiteNode : public STCPManager { // would be a good idea for the caller to read any new commands or traffic from the network. bool update(); - // Look up the correct peer by the name it supplies in a NODE_LOGIN + // Look up the correct peer by the name it supplies in a LOGIN // message. Does not lock, but this method is const and all it does is // access _peerList and peer->name, both of which are const. So it is safe // to call from other public functions. @@ -293,7 +293,7 @@ class SQLiteNode : public STCPManager { const string _version; // These are sockets that have been accepted on the node port but have not yet been associated with a peer (because - // they need to send a NODE_LOGIN message with their name first). + // they need to send a LOGIN message with their name first). set _unauthenticatedIncomingSockets; // The write consistency requested for the current in-progress commit. diff --git a/test/clustertest/tests/ClusterUpgradeTest.cpp b/test/clustertest/tests/ClusterUpgradeTest.cpp index e1da7ef6b..b38ea6d5c 100644 --- a/test/clustertest/tests/ClusterUpgradeTest.cpp +++ b/test/clustertest/tests/ClusterUpgradeTest.cpp @@ -113,6 +113,9 @@ struct ClusterUpgradeTest : tpunit::TestFixture { // Get the versions from the cluster. auto versions = getVersions(); + for (auto s : versions) { + cout << s << endl; + } // Save the production version for later comparison. string prodVersion = versions[0]; @@ -128,6 +131,12 @@ struct ClusterUpgradeTest : tpunit::TestFixture { tester->getTester(2).startServer(); ASSERT_TRUE(tester->getTester(2).waitForState("FOLLOWING")); + cout << "Server 2 is upgraded." << endl; + versions = getVersions(); + for (auto s : versions) { + cout << s << endl; + } + // Verify the server has been upgraded and the version is different. versions = getVersions(); string devVersion = versions[2]; @@ -152,15 +161,33 @@ struct ClusterUpgradeTest : tpunit::TestFixture { // We should get the expected cluster state. ASSERT_TRUE(tester->getTester(0).waitForState("LEADING")); + cout << "Leader has been upgraded. It should receive NODE_LOGIN from old nodes." << endl; ASSERT_TRUE(tester->getTester(1).waitForState("FOLLOWING")); ASSERT_TRUE(tester->getTester(2).waitForState("FOLLOWING")); + // Now 0 and 2 are the new version, and 1 is the old version. versions = getVersions(); ASSERT_EQUAL(versions[0], devVersion); ASSERT_EQUAL(versions[1], prodVersion); ASSERT_EQUAL(versions[2], devVersion); + // Cycle the old version. We want it to come up and make an outgoing connection to a new version. It should send node_login. + for (int i =0; i < 10; i++) { + cout << "Cyling old server." << endl; + tester->getTester(1).stopServer(); + tester->getTester(1).startServer(); + tester->getTester(1).waitForState("FOLLOWING"); + } + + // Cycle the new version. We want it to come up and make an outgoing connection to a new version. It should send node_login. + for (int i =0; i < 10; i++) { + cout << "Cyling new server." << endl; + tester->getTester(2).stopServer(); + tester->getTester(2).startServer(); + tester->getTester(2).waitForState("FOLLOWING"); + } + // Now we need to send a command to node 1 to verify we can escalate old->new. cmdResult = tester->getTester(1).executeWaitMultipleData({cmd}); ASSERT_EQUAL(cmdResult[0].methodLine, "200 OK"); From 9326901473a795e1c7c59d82d0085276c858b7c9 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 11 Dec 2024 16:39:32 -0800 Subject: [PATCH 3/3] Remove test change --- test/clustertest/tests/ClusterUpgradeTest.cpp | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/test/clustertest/tests/ClusterUpgradeTest.cpp b/test/clustertest/tests/ClusterUpgradeTest.cpp index b38ea6d5c..e1da7ef6b 100644 --- a/test/clustertest/tests/ClusterUpgradeTest.cpp +++ b/test/clustertest/tests/ClusterUpgradeTest.cpp @@ -113,9 +113,6 @@ struct ClusterUpgradeTest : tpunit::TestFixture { // Get the versions from the cluster. auto versions = getVersions(); - for (auto s : versions) { - cout << s << endl; - } // Save the production version for later comparison. string prodVersion = versions[0]; @@ -131,12 +128,6 @@ struct ClusterUpgradeTest : tpunit::TestFixture { tester->getTester(2).startServer(); ASSERT_TRUE(tester->getTester(2).waitForState("FOLLOWING")); - cout << "Server 2 is upgraded." << endl; - versions = getVersions(); - for (auto s : versions) { - cout << s << endl; - } - // Verify the server has been upgraded and the version is different. versions = getVersions(); string devVersion = versions[2]; @@ -161,33 +152,15 @@ struct ClusterUpgradeTest : tpunit::TestFixture { // We should get the expected cluster state. ASSERT_TRUE(tester->getTester(0).waitForState("LEADING")); - cout << "Leader has been upgraded. It should receive NODE_LOGIN from old nodes." << endl; ASSERT_TRUE(tester->getTester(1).waitForState("FOLLOWING")); ASSERT_TRUE(tester->getTester(2).waitForState("FOLLOWING")); - // Now 0 and 2 are the new version, and 1 is the old version. versions = getVersions(); ASSERT_EQUAL(versions[0], devVersion); ASSERT_EQUAL(versions[1], prodVersion); ASSERT_EQUAL(versions[2], devVersion); - // Cycle the old version. We want it to come up and make an outgoing connection to a new version. It should send node_login. - for (int i =0; i < 10; i++) { - cout << "Cyling old server." << endl; - tester->getTester(1).stopServer(); - tester->getTester(1).startServer(); - tester->getTester(1).waitForState("FOLLOWING"); - } - - // Cycle the new version. We want it to come up and make an outgoing connection to a new version. It should send node_login. - for (int i =0; i < 10; i++) { - cout << "Cyling new server." << endl; - tester->getTester(2).stopServer(); - tester->getTester(2).startServer(); - tester->getTester(2).waitForState("FOLLOWING"); - } - // Now we need to send a command to node 1 to verify we can escalate old->new. cmdResult = tester->getTester(1).executeWaitMultipleData({cmd}); ASSERT_EQUAL(cmdResult[0].methodLine, "200 OK");