From c03124a1ee7edb689c86cb33660f9822afbad29a Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 12:19:00 -0800 Subject: [PATCH 01/18] Allow first message to be either LOGIN or NODE_LOGIN to deprecate old message --- sqlitecluster/SQLiteNode.cpp | 38 +++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 79be57fed..3ea870cd7 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -56,6 +56,20 @@ // dbCountAtStart: The highest committed transaction in the DB at the start of this transaction on leader, for // optimizing replication. +// On LOGIN vs NODE_LOGIN. +// _onConnect sends a LOGIN message. +// _onConnect is called in exctly two places: +// 1. In response to a NODE_LOGIN message received on a newly connected socket on the sync port. It's expected when +// establishing a connection, a node sends this NODE_LOGIN as its first message. +// 2. Immediately following establishing a TCP connection to another node and sending a NODE_LOGIN message. In the case that +// we are the initiating node, we immediately queue three messages: +// 1. NODE_LOGIN +// 2. PING +// 3. LOGIN +// +// When we receive a NODE_LOGIN, we immediately respond with a PING followed by a LOGIN (by calling _onConnect). +// We can cobine all of these into a single login message. + #undef SLOGPREFIX #define SLOGPREFIX "{" << _name << "/" << SQLiteNode::stateName(_state) << "} " @@ -1284,6 +1298,9 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->latency = max(STimeNow() - message.calc64("Timestamp"), 1ul); SINFO("Received PONG from peer '" << peer->name << "' (" << peer->latency/1000 << "ms latency)"); return; + } else if (SIEquals(message.methodLine, "NODE_LOGIN")) { + // Do nothing, this keeps this code from warning until NODE_LOGIN is deprecated. + return; } // Every other message broadcasts the current state of the node @@ -1357,6 +1374,8 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { STHROW("you're *not* supposed to be a 0-priority permafollower"); } + // Validate hash here, mark node as forked if found. + // It's an error to have to peers configured with the same priority, except 0 and -1 SASSERT(_priority == -1 || _priority == 0 || message.calc("Priority") != _priority); PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" @@ -1745,7 +1764,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } } } else { - STHROW("unrecognized message"); + STHROW("unrecognized message: " + message.methodLine); } } catch (const SException& e) { PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); @@ -1759,13 +1778,15 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { void SQLiteNode::_onConnect(SQLitePeer* peer) { SASSERT(peer); SASSERTWARN(!peer->loggedIn); - // Send the LOGIN - PINFO("Sending LOGIN"); SData login("LOGIN"); + login["Name"] = _name; login["Priority"] = to_string(_priority); login["State"] = stateName(_state); login["Version"] = _version; login["Permafollower"] = _originalPriority ? "false" : "true"; + PINFO("Sending " << login.serialize()); + + // NOTE: the following call adds CommitCount, Hash, and commandAddress fields. _sendToPeer(peer, login); } @@ -2565,12 +2586,14 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { int messageSize = message.deserialize(socket->recvBuffer); if (messageSize) { socket->recvBuffer.consumeFront(messageSize); - if (SIEquals(message.methodLine, "NODE_LOGIN")) { + // Allow either LOGIN or NODE_LOGIN until we deprecate NODE_LOGIN. + if (SIEquals(message.methodLine, "NODE_LOGIN") || SIEquals(message.methodLine, "LOGIN")) { SQLitePeer* peer = getPeerByName(message["Name"]); if (peer) { if (peer->setSocket(socket)) { - _sendPING(peer); _onConnect(peer); + _sendPING(peer); + _onMESSAGE(peer, message); // Connected OK, don't need in _unauthenticatedIncomingSockets anymore. socketsToRemove.push_back(socket); @@ -2589,7 +2612,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { STHROW("Unauthenticated node '" + message["Name"] + "' attempted to connected, rejecting."); } } else { - STHROW("expecting NODE_LOGIN"); + STHROW("expecting LOGIN or NODE_LOGIN"); } } else if (STimeNow() > socket->lastRecvTime + 5'000'000) { STHROW("Incoming socket didn't send a message for over 5s, closing."); @@ -2612,11 +2635,12 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { + // When NODE_LOGIN is deprecated, we can remove the next 3 lines. SData login("NODE_LOGIN"); login["Name"] = _name; peer->sendMessage(login.serialize()); - _sendPING(peer); _onConnect(peer); + _sendPING(peer); } break; case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR: From e5659c7e4eab52a257b76f6e9d301be589948743 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 14:22:36 -0800 Subject: [PATCH 02/18] add known bad peer, refuse to talk to it --- sqlitecluster/SQLiteNode.cpp | 21 ++++++++++++++++----- sqlitecluster/SQLitePeer.cpp | 2 ++ sqlitecluster/SQLitePeer.h | 3 +++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 3ea870cd7..5f8e7d893 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1303,6 +1303,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { return; } + // We allow PING and PONG even for bad peers just to avoid them getting caught in reconnect cycles. + if (peer->knownBad) { + PINFO("Received message " << message.methodLine << " from known bad peer, ignoring."); + return; + } + // Every other message broadcasts the current state of the node if (!message.isSet("CommitCount")) { STHROW("missing CommitCount"); @@ -1374,17 +1380,22 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { STHROW("you're *not* supposed to be a 0-priority permafollower"); } - // Validate hash here, mark node as forked if found. - // It's an error to have to peers configured with the same priority, except 0 and -1 SASSERT(_priority == -1 || _priority == 0 || message.calc("Priority") != _priority); - PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" - << message["CommitCount"] << " (" << message["Hash"] << ")"); peer->priority = message.calc("Priority"); - peer->loggedIn = true; peer->version = message["Version"]; peer->state = stateFromName(message["State"]); + // Validate hash here, mark node as forked if found. + if (false) { + PINFO("Peer is forked, marking as bad, will ignore."); + peer->knownBad = true; + } else { + PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" + << message["CommitCount"] << " (" << message["Hash"] << ")"); + peer->loggedIn = true; + } + // If the peer is already standing up, go ahead and approve or deny immediately. if (peer->state == SQLiteNodeState::STANDINGUP) { _sendStandupResponse(peer, message); diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 82c2de8a2..4daf2c785 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -26,6 +26,7 @@ SQLitePeer::SQLitePeer(const string& name_, const string& host_, const STable& p transactionResponse(Response::NONE), version(), lastPingTime(0), + knownBad(false), hash() { } @@ -79,6 +80,7 @@ void SQLitePeer::reset() { version = ""; lastPingTime = 0, setCommit(0, ""); + knownBad = false; } void SQLitePeer::shutdownSocket() { diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 0eb984656..296a11ed8 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -91,6 +91,9 @@ class SQLitePeer { atomic version; atomic lastPingTime; + // Set to true when this peer is known to be unusable, I.e., when it has a database that is forked from us. + atomic knownBad; + private: // For initializing the permafollower value from the params list. static bool isPermafollower(const STable& params); From ccdc5cd82f1b933381600cdec0900a5e21374192 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 15:51:51 -0800 Subject: [PATCH 03/18] Stop communicating on fork. Test don't pass yet. --- sqlitecluster/SQLiteNode.cpp | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 5f8e7d893..3b95b3833 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1386,14 +1386,23 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->version = message["Version"]; peer->state = stateFromName(message["State"]); - // Validate hash here, mark node as forked if found. - if (false) { - PINFO("Peer is forked, marking as bad, will ignore."); - peer->knownBad = true; - } else { + uint64_t peerCommitCount; + string peerCommitHash; + bool hashesMatch = true; + peer->getCommit(peerCommitCount, peerCommitHash); + if (!peerCommitHash.empty() && peerCommitCount <= getCommitCount()) { + string query, hash; + _db.getCommit(peerCommitCount, query, hash); + hashesMatch = (peerCommitHash == hash); + } + + if (hashesMatch) { PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" << message["CommitCount"] << " (" << message["Hash"] << ")"); peer->loggedIn = true; + } else { + PINFO("Peer is forked, marking as bad, will ignore."); + peer->knownBad = true; } // If the peer is already standing up, go ahead and approve or deny immediately. @@ -1775,7 +1784,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } } } else { - STHROW("unrecognized message: " + message.methodLine); + SINFO("unrecognized message: " + message.methodLine); } } catch (const SException& e) { PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); @@ -1902,6 +1911,9 @@ void SQLiteNode::_sendToPeer(SQLitePeer* peer, const SData& message) { // We can treat this whole function as atomic and thread-safe as it sends data to a peer with it's own atomic // `sendMessage` and the peer itself (assuming it's something from _peerList, which, if not, don't do that) is // const and will exist without changing until destruction. + if (peer->knownBad) { + PINFO("Skipping message " << message.methodLine << " to known bad peer."); + } peer->sendMessage(_addPeerHeaders(message).serialize()); } @@ -1910,6 +1922,9 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { // Loop across all connected peers and send the message. _peerList is const so this is thread-safe. for (auto peer : _peerList) { + if (peer->knownBad) { + PINFO("Skipping message " << message.methodLine << " to known bad peer."); + } // This check is strictly thread-safe, as SQLitePeer::subscribed is atomic, but there's still a race condition // around checking subscribed and then sending, as subscribed could technically change. if (!subscribedOnly || peer->subscribed) { From 466725411ae8838f1e77e9451b1638cbfeb2b2c3 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 16:18:36 -0800 Subject: [PATCH 04/18] Rename variable, remove _forkedFrom list --- sqlitecluster/SQLiteNode.cpp | 53 ++++++++++++++++++++++-------------- sqlitecluster/SQLiteNode.h | 7 ----- sqlitecluster/SQLitePeer.cpp | 4 +-- sqlitecluster/SQLitePeer.h | 2 +- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 3b95b3833..705e29a29 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -639,7 +639,7 @@ bool SQLiteNode::update() { // Find the freshest non-broken peer (including permafollowers). if (peer->loggedIn) { loggedInPeers.push_back(peer->name); - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it." << _getLostQuorumLogMessage()); continue; } @@ -757,7 +757,7 @@ bool SQLiteNode::update() { continue; } - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { // Forked nodes are treated as ineligible for leader, etc. SHMMM("Not counting forked peer " << peer->name << " for freshest, highestPriority, or currentLeader."); continue; @@ -1304,8 +1304,8 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } // We allow PING and PONG even for bad peers just to avoid them getting caught in reconnect cycles. - if (peer->knownBad) { - PINFO("Received message " << message.methodLine << " from known bad peer, ignoring."); + if (peer->forked) { + PINFO("Received message " << message.methodLine << " from forked peer, ignoring."); return; } @@ -1402,7 +1402,13 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->loggedIn = true; } else { PINFO("Peer is forked, marking as bad, will ignore."); - peer->knownBad = true; + + // Send it a message before we mark it as forked, as we'll refuse afterward. + SData forked("FORKED"); + _sendToPeer(peer, forked); + + // And mark it dead until it reconnects. + peer->forked = true; } // If the peer is already standing up, go ahead and approve or deny immediately. @@ -1579,14 +1585,21 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SQResult result; uint64_t commitNum = SToUInt64(message["hashMismatchNumber"]); _db.getCommits(commitNum, commitNum, result); - _forkedFrom.insert(peer->name); + peer->forked = true; + + size_t forkedCount = 0; + for (const auto& p : _peerList) { + if (p->forked) { + forkedCount++; + } + } SALERT("Hash mismatch. Peer " << peer->name << " and I have forked at commit " << message["hashMismatchNumber"] - << ". I have forked from " << _forkedFrom.size() << " other nodes. I am " << stateName(_state) + << ". I have forked from " << forkedCount << " other nodes. I am " << stateName(_state) << " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "." << _getLostQuorumLogMessage()); - if (_forkedFrom.size() > ((_peerList.size() + 1) / 2)) { + if (forkedCount > ((_peerList.size() + 1) / 2)) { SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable." << _getLostQuorumLogMessage()); } @@ -1783,8 +1796,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { << e.what() << "', ignoring."); } } + } else if (SIEquals(message.methodLine, "FORKED")) { + peer->forked = true; + PINFO("Peer said we're forked, beleiving them."); } else { - SINFO("unrecognized message: " + message.methodLine); + PINFO("unrecognized message: " + message.methodLine); } } catch (const SException& e) { PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); @@ -1911,8 +1927,8 @@ void SQLiteNode::_sendToPeer(SQLitePeer* peer, const SData& message) { // We can treat this whole function as atomic and thread-safe as it sends data to a peer with it's own atomic // `sendMessage` and the peer itself (assuming it's something from _peerList, which, if not, don't do that) is // const and will exist without changing until destruction. - if (peer->knownBad) { - PINFO("Skipping message " << message.methodLine << " to known bad peer."); + if (peer->forked) { + PINFO("Skipping message " << message.methodLine << " to forked peer."); } peer->sendMessage(_addPeerHeaders(message).serialize()); } @@ -1922,8 +1938,8 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { // Loop across all connected peers and send the message. _peerList is const so this is thread-safe. for (auto peer : _peerList) { - if (peer->knownBad) { - PINFO("Skipping message " << message.methodLine << " to known bad peer."); + if (peer->forked) { + PINFO("Skipping message " << message.methodLine << " to forked peer."); } // This check is strictly thread-safe, as SQLitePeer::subscribed is atomic, but there's still a race condition // around checking subscribed and then sending, as subscribed could technically change. @@ -2014,16 +2030,12 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance _leadPeer = nullptr; } - if (newState >= SQLiteNodeState::STANDINGUP) { - // Not forked from anyone. Note that this includes both LEADING and FOLLOWING. - _forkedFrom.clear(); - } - // Re-enable commits if they were disabled during a previous stand-down. if (newState != SQLiteNodeState::SEARCHING) { _db.setCommitEnabled(true); } +#if 0 // If we're going searching and have forked from at least 1 peer, sleep for a second. This is intended to prevent thousands of lines of log spam when this happens in an infinite // loop. It's entirely possible that we do this for valid reasons - it may be the peer that has the bad database and not us, and there are plenty of other reasons we could switch to // SEARCHING, but in those cases, we just wait an extra second before trying again. @@ -2031,6 +2043,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance SWARN("Going searching while forked peers present, sleeping 1 second." << _getLostQuorumLogMessage()); sleep(1); } +#endif // Additional logic for some new states if (newState == SQLiteNodeState::LEADING) { @@ -2250,7 +2263,7 @@ void SQLiteNode::_updateSyncPeer() continue; } - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { SWARN("Hash mismatch. Can't choose peer " << peer->name << " due to previous hash mismatch."); continue; } @@ -2821,7 +2834,7 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { return; } - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { PHMMM("Forked from peer, can't approve standup."); response["Response"] = "abstain"; response["Reason"] = "We are forked"; diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 7c14667d4..3826ba0bd 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -393,13 +393,6 @@ class SQLiteNode : public STCPManager { // This can be removed once we've figured out why replication falls behind. See this issue: https://github.com/Expensify/Expensify/issues/210528 atomic _concurrentReplicateTransactions = 0; - // We keep a set of strings that are the names of nodes we've forked from, in the case we ever receive a hash mismatch while trying to synchronize. - // Whenever we become LEADING or FOLLOWING this is cleared. This resets the case where one node has forked, we attempt to synchronize from it, and fail, - // but later synchronize from someone else. Once we've come up completely, we no longer "hold a grudge" against this node, which will likely get fixed - // while we're online. - // In the event that this list becomes longer than half the cluster size, the node kills itself and logs that it's in an unrecoverable state. - set _forkedFrom; - // A pointer to a SQLite instance that is passed to plugin's stateChanged function. This prevents plugins from operating on the same handle that // the sync node is when they run queries in stateChanged. SQLite* pluginDB; diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 4daf2c785..35146de52 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -26,7 +26,7 @@ SQLitePeer::SQLitePeer(const string& name_, const string& host_, const STable& p transactionResponse(Response::NONE), version(), lastPingTime(0), - knownBad(false), + forked(false), hash() { } @@ -80,7 +80,7 @@ void SQLitePeer::reset() { version = ""; lastPingTime = 0, setCommit(0, ""); - knownBad = false; + forked = false; } void SQLitePeer::shutdownSocket() { diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 296a11ed8..d6d0a8030 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -92,7 +92,7 @@ class SQLitePeer { atomic lastPingTime; // Set to true when this peer is known to be unusable, I.e., when it has a database that is forked from us. - atomic knownBad; + atomic forked; private: // For initializing the permafollower value from the params list. From 0689a931a2845e03f5c2ebb64a5efc2728d4e579 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 16:42:58 -0800 Subject: [PATCH 05/18] Fix fork test --- sqlitecluster/SQLiteNode.cpp | 48 +++++++++++++++++------- sqlitecluster/SQLiteNode.h | 2 + test/clustertest/tests/ForkCheckTest.cpp | 5 +++ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 705e29a29..8478678fa 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1409,6 +1409,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // And mark it dead until it reconnects. peer->forked = true; + _dieIfForkedFromCluster(); } // If the peer is already standing up, go ahead and approve or deny immediately. @@ -1587,22 +1588,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { _db.getCommits(commitNum, commitNum, result); peer->forked = true; - size_t forkedCount = 0; - for (const auto& p : _peerList) { - if (p->forked) { - forkedCount++; - } - } - SALERT("Hash mismatch. Peer " << peer->name << " and I have forked at commit " << message["hashMismatchNumber"] - << ". I have forked from " << forkedCount << " other nodes. I am " << stateName(_state) - << " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "." - << _getLostQuorumLogMessage()); - - if (forkedCount > ((_peerList.size() + 1) / 2)) { - SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable." << _getLostQuorumLogMessage()); - } + << ". I am " << stateName(_state) << " and have hash " << result[0][0] << " for that commit. Peer has hash " + << message["hashMismatchValue"] << "." << _getLostQuorumLogMessage()); + _dieIfForkedFromCluster(); STHROW("Hash mismatch"); } if (!_syncPeer) { @@ -1799,6 +1789,23 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } else if (SIEquals(message.methodLine, "FORKED")) { peer->forked = true; PINFO("Peer said we're forked, beleiving them."); + _dieIfForkedFromCluster(); + + // If leader said we're forked from it, we need a new leader. + if (peer == _leadPeer) { + _leadPeer = nullptr; + _changeState(SQLiteNodeState::SEARCHING); + } + + // If our sync peer said we're forked from it, and we're currently synchronizing, we need a new sync peer. + // However, if we're not currently syncing, then we don't need to change states, this peer could have been chosen + // hours or days ago. + if (peer == _syncPeer) { + _syncPeer = nullptr; + if (_state == SQLiteNodeState::SYNCHRONIZING) { + _changeState(SQLiteNodeState::SEARCHING); + } + } } else { PINFO("unrecognized message: " + message.methodLine); } @@ -2908,3 +2915,16 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { } _sendToPeer(peer, response); } + +void SQLiteNode::_dieIfForkedFromCluster() { + size_t forkedCount = 0; + for (const auto& p : _peerList) { + if (p->forked) { + forkedCount++; + } + } + + if (forkedCount > ((_peerList.size() + 1) / 2)) { + SERROR("I have forked from over half the cluster (" << forkedCount << " nodes). This is unrecoverable." << _getLostQuorumLogMessage()); + } +} \ No newline at end of file diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 3826ba0bd..354aef7c9 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -280,6 +280,8 @@ class SQLiteNode : public STCPManager { // commitCount that we do, this will return null. void _updateSyncPeer(); + void _dieIfForkedFromCluster(); + const string _commandAddress; const string _name; const vector _peerList; diff --git a/test/clustertest/tests/ForkCheckTest.cpp b/test/clustertest/tests/ForkCheckTest.cpp index 6e0c0d262..330c8735f 100644 --- a/test/clustertest/tests/ForkCheckTest.cpp +++ b/test/clustertest/tests/ForkCheckTest.cpp @@ -119,5 +119,10 @@ struct ForkCheckTest : tpunit::TestFixture { // And that signal should have been ABORT. ASSERT_EQUAL(SIGABRT, WTERMSIG(status)); + + // We call stopServer on the forked leader because it crashed, but the cluster tester doesn't realize, so shutting down + // normally will time out after a minute. Calling `stopServer` explicitly will clear the server PID, and we won't need + // to wait for this timeout. + tester.getTester(0).stopServer(); } } __ForkCheckTest; From 002a9373aa6b08cde04fbbfd2f0b0c029a2e50b9 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 16:46:01 -0800 Subject: [PATCH 06/18] remove test that tests condition that's no longer used. --- .../tests/ForkedNodeApprovalTest.cpp | 161 ------------------ 1 file changed, 161 deletions(-) delete mode 100644 test/clustertest/tests/ForkedNodeApprovalTest.cpp diff --git a/test/clustertest/tests/ForkedNodeApprovalTest.cpp b/test/clustertest/tests/ForkedNodeApprovalTest.cpp deleted file mode 100644 index 197b883cc..000000000 --- a/test/clustertest/tests/ForkedNodeApprovalTest.cpp +++ /dev/null @@ -1,161 +0,0 @@ - -#include - -#include -#include -#include -#include - -struct ForkedNodeApprovalTest : tpunit::TestFixture { - ForkedNodeApprovalTest() - : tpunit::TestFixture("ForkedNodeApproval", TEST(ForkedNodeApprovalTest::test)) {} - - pair getMaxJournalCommit(BedrockTester& tester, bool online = true) { - SQResult journals; - tester.readDB("SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'journal%';", journals, online); - uint64_t maxJournalCommit = 0; - string maxJournalTable; - for (auto& row : journals.rows) { - string maxID = tester.readDB("SELECT MAX(id) FROM " + row[0] + ";", online); - try { - uint64_t maxCommitNum = stoull(maxID); - if (maxCommitNum > maxJournalCommit) { - maxJournalCommit = maxCommitNum; - maxJournalTable = row[0]; - } - } catch (const invalid_argument& e) { - // do nothing, skip this journal with no entries. - continue; - } - } - return make_pair(maxJournalCommit, maxJournalTable); - } - - void test() { - // Create a cluster, wait for it to come up. - BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER); - - // We'll tell the threads to stop when they're done. - atomic stop(false); - - // We want to not spam a stopped leader. - atomic leaderIsUp(true); - - // Just use a bunch of copies of the same command. - SData spamCommand("idcollision"); - - // In a vector. - const vector commands(100, spamCommand); - - // Now create 9 threads spamming 100 commands at a time, each. 9 cause we have three nodes. - vector threads; - for (size_t i = 0; i < 9; i++) { - threads.emplace_back([&tester, i, &commands, &stop, &leaderIsUp](){ - while (!stop) { - // Pick a tester, send, don't care about the result. - size_t testerNum = i % 3; - if (testerNum == 0 && !leaderIsUp) { - // If we're looking for leader and it's down, wait a second to avoid pegging the CPU. - sleep(1); - } else { - // If we're not leader or leader is up, spam away! - tester.getTester(testerNum).executeWaitMultipleData(commands); - } - } - }); - } - - // Let them spam for a second. - sleep(1); - - // We can try and stop the leader. - leaderIsUp = false; - tester.getTester(0).stopServer(); - - // Spam a few more commands and then we can stop. - sleep(1); - stop = true; - for (auto& t : threads) { - t.join(); - } - - // Fetch the latest journal commits on leader and follower - auto result = getMaxJournalCommit(tester.getTester(0), false); - - uint64_t leaderMaxCommit = result.first; - string leaderMaxCommitJournal = result.second; - result = getMaxJournalCommit(tester.getTester(1)); - uint64_t followerMaxCommit = result.first; - - // Make sure the follower got farther than the leader. - ASSERT_GREATER_THAN(followerMaxCommit, leaderMaxCommit); - - // We need to release any DB that the tester is holding. - tester.getTester(0).freeDB(); - tester.getTester(1).freeDB(); - - // Break leader. - { - string filename = tester.getTester(0).getArg("-db"); - string query = "UPDATE " + leaderMaxCommitJournal + " SET hash = 'abcdef123456' WHERE id = " + to_string(leaderMaxCommit) + ";"; - - sqlite3* db = nullptr; - sqlite3_open_v2(filename.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL); - char* errMsg = nullptr; - sqlite3_exec(db, query.c_str(), 0, 0, &errMsg); - if (errMsg) { - cout << "Error updating db: " << errMsg << endl; - ASSERT_TRUE(false); - } - sqlite3_close_v2(db); - } - - // Stop the second follower. - tester.getTester(2).stopServer(); - - // Start the broken leader back up. - tester.getTester(0).startServer(false); - - // We should not get a leader, the primary leader needs to synchronize, but can't because it's forked. - // The secondary leader should go leading, but can't, because it only receives `abstain` responses to standup requests. - // It's possible for the secondary leader to go leading once, but it should quickly fall out of leading when the fork is detected and primary leader reconnects. - // After that, it should not go leading again, primary leader should abstain from participation. - auto start = chrono::steady_clock::now(); - bool abstainDetected = false; - while (true) { - if (chrono::steady_clock::now() - start > 30s) { - cout << "It's been 30 seconds." << endl; - break; - } - SData command("Status"); - auto responseJson = tester.getTester(1).executeWaitMultipleData({command}, 1, true)[0].content; - - auto json = SParseJSONObject(responseJson); - auto peers = SParseJSONArray(json["peerList"]); - for (auto& peer : peers) { - auto peerJSON = SParseJSONObject(peer); - if (peerJSON["name"] == "cluster_node_0" && peerJSON["standupResponse"] == "ABSTAIN") { - abstainDetected = true; - break; - } - } - if (abstainDetected) { - break; - } - - // try again. - usleep(50'000); - } - - ASSERT_TRUE(abstainDetected); - - // Ok, now we can start the second follower back up and secondary leader should be able to lead. - tester.getTester(2).startServer(false); - ASSERT_TRUE(tester.getTester(1).waitForState("LEADING")); - - // We call stopServer on the forked leader because it crashed, but the cluster tester doesn't realize, so shutting down - // normally will time out after a minute. Calling `stopServer` explicitly will clear the server PID, and we won't need - // to wait for this timeout. - tester.getTester(0).stopServer(); - } -} __ForkedNodeApprovalTest; From f50aec04744ff9fb9cfb44e18b5c378bff3f1e2a Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 16:54:03 -0800 Subject: [PATCH 07/18] Remove ABSTAIN --- sqlitecluster/SQLiteNode.cpp | 25 ------------------------- sqlitecluster/SQLitePeer.cpp | 3 --- sqlitecluster/SQLitePeer.h | 3 +-- 3 files changed, 1 insertion(+), 30 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 8478678fa..7374ac0f3 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -861,7 +861,6 @@ bool SQLiteNode::update() { size_t numFullPeers = 0; size_t numLoggedInFullPeers = 0; size_t approveCount = 0; - size_t abstainCount = 0; if (_isShuttingDown) { SINFO("Shutting down while standing up, setting state to SEARCHING"); _changeState(SQLiteNodeState::SEARCHING); @@ -879,9 +878,6 @@ bool SQLiteNode::update() { // Has it responded yet? if (peer->standupResponse == SQLitePeer::Response::NONE) { // This peer hasn't yet responded. We do nothing with it in this case, maybe it will have responded by the next check. - } else if (peer->standupResponse == SQLitePeer::Response::ABSTAIN) { - PHMMM("Peer abstained from participation in quorum"); - abstainCount++; } else if (peer->standupResponse == SQLitePeer::Response::DENY) { // It responeded, but didn't approve -- abort PHMMM("Refused our STANDUP, cancel and RE-SEARCH"); @@ -894,16 +890,6 @@ bool SQLiteNode::update() { } } - // If the majority of full peers responds with abstain, then re-search. - const bool majorityAbstained = abstainCount * 2 > numFullPeers; - if (majorityAbstained) { - // Majority abstained, meaning we're probably forked, - // so we go back to searching so we can go back to synchronizing and see if we're forked. - SHMMM("Majority of full peers abstained; re-SEARCHING."); - _changeState(SQLiteNodeState::SEARCHING); - return true; // Re-update - } - // If everyone's responded with approval and we form a majority, then finish standup. bool majorityConnected = numLoggedInFullPeers * 2 >= numFullPeers; bool quorumApproved = approveCount * 2 >= numFullPeers; @@ -1531,9 +1517,6 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (SIEquals(message["Response"], "approve")) { PINFO("Received standup approval"); peer->standupResponse = SQLitePeer::Response::APPROVE; - } else if (SIEquals(message["Response"], "abstain")) { - PINFO("Received standup abstain"); - peer->standupResponse = SQLitePeer::Response::ABSTAIN; } else { PHMMM("Received standup denial: reason='" << message["Reason"] << "'"); peer->standupResponse = SQLitePeer::Response::DENY; @@ -2841,14 +2824,6 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { return; } - if (peer->forked) { - PHMMM("Forked from peer, can't approve standup."); - response["Response"] = "abstain"; - response["Reason"] = "We are forked"; - _sendToPeer(peer, response); - return; - } - // What's our state if (SWITHIN(SQLiteNodeState::STANDINGUP, _state, SQLiteNodeState::STANDINGDOWN)) { // Oh crap, it's trying to stand up while we're leading. Who is higher priority? diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 35146de52..c93589520 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -207,9 +207,6 @@ string SQLitePeer::responseName(Response response) { case Response::DENY: return "DENY"; break; - case Response::ABSTAIN: - return "ABSTAIN"; - break; default: return ""; } diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index d6d0a8030..48bc3433c 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -8,8 +8,7 @@ class SQLitePeer { enum class Response { NONE, APPROVE, - DENY, - ABSTAIN + DENY }; enum class PeerPostPollStatus { From 0e828eb21f325f189642a47678a6d3ef562f9495 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 17:01:06 -0800 Subject: [PATCH 08/18] Comment cleanup --- sqlitecluster/SQLiteNode.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 7374ac0f3..7f779d43d 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -56,6 +56,7 @@ // dbCountAtStart: The highest committed transaction in the DB at the start of this transaction on leader, for // optimizing replication. +// NOTE: This comment as well as NODE_LOGIN should be removed after https://github.com/Expensify/Bedrock/pull/1999 is deployed. // On LOGIN vs NODE_LOGIN. // _onConnect sends a LOGIN message. // _onConnect is called in exctly two places: @@ -68,7 +69,6 @@ // 3. LOGIN // // When we receive a NODE_LOGIN, we immediately respond with a PING followed by a LOGIN (by calling _onConnect). -// We can cobine all of these into a single login message. #undef SLOGPREFIX #define SLOGPREFIX "{" << _name << "/" << SQLiteNode::stateName(_state) << "} " @@ -2902,4 +2902,4 @@ void SQLiteNode::_dieIfForkedFromCluster() { if (forkedCount > ((_peerList.size() + 1) / 2)) { SERROR("I have forked from over half the cluster (" << forkedCount << " nodes). This is unrecoverable." << _getLostQuorumLogMessage()); } -} \ No newline at end of file +} From 2d6b73adc2ca09cc8fe42249a826584da282ddb5 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 17:19:39 -0800 Subject: [PATCH 09/18] Standardize use of _sendToPeer --- sqlitecluster/SQLiteNode.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 7f779d43d..d6c03bb70 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1277,7 +1277,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SINFO("Received PING from peer '" << peer->name << "'. Sending PONG."); SData pong("PONG"); pong["Timestamp"] = message["Timestamp"]; - peer->sendMessage(pong.serialize()); + peer->sendMessage(pong); return; } else if (SIEquals(message.methodLine, "PONG")) { // Latency must be > 0 because we treat 0 as "not connected". @@ -1531,7 +1531,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SINFO("Asked to help SYNCHRONIZE but shutting down."); SData response("SYNCHRONIZE_RESPONSE"); response["ShuttingDown"] = "true"; - peer->sendMessage(response); + _sendToPeer(peer, response); } else { _pendingSynchronizeResponses++; static atomic synchronizeCount(0); @@ -1546,13 +1546,13 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // The following two lines are copied from `_sendToPeer`. response["CommitCount"] = to_string(db.getCommitCount()); response["Hash"] = db.getCommittedHash(); - peer->sendMessage(response); + _sendToPeer(peer, response); } catch (const SException& e) { // This is the same handling as at the bottom of _onMESSAGE. PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); SData reconnect("RECONNECT"); reconnect["Reason"] = e.what(); - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } @@ -1796,7 +1796,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); SData reconnect("RECONNECT"); reconnect["Reason"] = e.what(); - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } } @@ -1920,11 +1920,11 @@ void SQLiteNode::_sendToPeer(SQLitePeer* peer, const SData& message) { if (peer->forked) { PINFO("Skipping message " << message.methodLine << " to forked peer."); } - peer->sendMessage(_addPeerHeaders(message).serialize()); + peer->sendMessage(_addPeerHeaders(message)); } void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { - const string serializedMessage = _addPeerHeaders(message).serialize(); + const SData messageWithHeaders = _addPeerHeaders(message); // Loop across all connected peers and send the message. _peerList is const so this is thread-safe. for (auto peer : _peerList) { @@ -1934,7 +1934,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { // This check is strictly thread-safe, as SQLitePeer::subscribed is atomic, but there's still a race condition // around checking subscribed and then sending, as subscribed could technically change. if (!subscribedOnly || peer->subscribed) { - peer->sendMessage(serializedMessage); + peer->sendMessage(messageWithHeaders); } } } @@ -2667,7 +2667,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // When NODE_LOGIN is deprecated, we can remove the next 3 lines. SData login("NODE_LOGIN"); login["Name"] = _name; - peer->sendMessage(login.serialize()); + _sendToPeer(peer, login); _onConnect(peer); _sendPING(peer); } @@ -2676,7 +2676,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { { SData reconnect("RECONNECT"); reconnect["Reason"] = "socket error"; - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } break; @@ -2734,7 +2734,7 @@ void SQLiteNode::_sendPING(SQLitePeer* peer) { SASSERT(peer); SData ping("PING"); ping["Timestamp"] = SToStr(STimeNow()); - peer->sendMessage(ping.serialize()); + peer->sendMessage(ping); } SQLitePeer* SQLiteNode::getPeerByName(const string& name) const { From 859ba9f500361ef02dbb79505d0abf1f6f1333b1 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 18:37:00 -0800 Subject: [PATCH 10/18] Update sqlitecluster/SQLiteNode.cpp Co-authored-by: Carlos Alvarez --- sqlitecluster/SQLiteNode.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d6c03bb70..38f676c10 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1930,6 +1930,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { for (auto peer : _peerList) { if (peer->forked) { PINFO("Skipping message " << message.methodLine << " to forked peer."); + continue; } // This check is strictly thread-safe, as SQLitePeer::subscribed is atomic, but there's still a race condition // around checking subscribed and then sending, as subscribed could technically change. From 12cdd008b05440e883e8dbde799e9411446242f6 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Wed, 4 Dec 2024 18:37:25 -0800 Subject: [PATCH 11/18] Update sqlitecluster/SQLiteNode.cpp Co-authored-by: Carlos Alvarez --- sqlitecluster/SQLiteNode.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 38f676c10..a26c84f71 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1919,6 +1919,7 @@ void SQLiteNode::_sendToPeer(SQLitePeer* peer, const SData& message) { // const and will exist without changing until destruction. if (peer->forked) { PINFO("Skipping message " << message.methodLine << " to forked peer."); + return; } peer->sendMessage(_addPeerHeaders(message)); } From d8c495fe8bdb16dae2226d3baa0dd1229226e339 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Thu, 5 Dec 2024 09:42:34 -0800 Subject: [PATCH 12/18] Refactor test to make multiple tests easier to run --- test/clustertest/tests/ForkCheckTest.cpp | 61 ++++++++++++++---------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/test/clustertest/tests/ForkCheckTest.cpp b/test/clustertest/tests/ForkCheckTest.cpp index 330c8735f..631df5264 100644 --- a/test/clustertest/tests/ForkCheckTest.cpp +++ b/test/clustertest/tests/ForkCheckTest.cpp @@ -1,3 +1,4 @@ +#include "test/lib/BedrockTester.h" #include #include @@ -7,7 +8,9 @@ struct ForkCheckTest : tpunit::TestFixture { ForkCheckTest() - : tpunit::TestFixture("ForkCheck", TEST(ForkCheckTest::test)) {} + : tpunit::TestFixture("ForkCheck", + TEST(ForkCheckTest::forkAtShutDown), + TEST(ForkCheckTest::forkAtCrash)) {} pair getMaxJournalCommit(BedrockTester& tester, bool online = true) { SQResult journals; @@ -30,40 +33,42 @@ struct ForkCheckTest : tpunit::TestFixture { return make_pair(maxJournalCommit, maxJournalTable); } - void test() { - // Create a cluster, wait for it to come up. - BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER); - - // We'll tell the threads to stop when they're done. - atomic stop(false); - - // We want to not spam a stopped leader. - atomic leaderIsUp(true); - + vector createThreads(size_t num, BedrockClusterTester& tester, atomic& stop, atomic& leaderIsUp) { // Just use a bunch of copies of the same command. - SData spamCommand("idcollision"); - - // In a vector. - const vector commands(100, spamCommand); - - // Now create 9 threads spamming 100 commands at a time, each. 9 cause we have three nodes. vector threads; - for (size_t i = 0; i < 9; i++) { - threads.emplace_back([&tester, i, &commands, &stop, &leaderIsUp](){ + for (size_t num = 0; num < 9; num++) { + threads.emplace_back([&tester, num, &stop, &leaderIsUp](){ + const vector commands(100, SData("idcollision")); while (!stop) { // Pick a tester, send, don't care about the result. - size_t testerNum = i % 3; + size_t testerNum = num % 5; if (testerNum == 0 && !leaderIsUp) { - // If we're looking for leader and it's down, wait a second to avoid pegging the CPU. - sleep(1); - } else { - // If we're not leader or leader is up, spam away! - tester.getTester(testerNum).executeWaitMultipleData(commands); + // If leader's off, don't use it. + testerNum = 1; } + tester.getTester(testerNum).executeWaitMultipleData(commands); } }); } + return threads; + } + + // This primary test here checks that a node that is forked will not be able to rejoin the cluster when reconnecting. + // This is a reasonable test for a fork that happens at shutdown. + void forkAtShutDown() { + // Create a cluster, wait for it to come up. + BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER); + + // We'll tell the threads to stop when they're done. + atomic stop(false); + + // We want to not spam a stopped leader. + atomic leaderIsUp(true); + + // Now create 15 threads spamming 100 commands at a time, each. 15 cause we have five nodes. + vector threads = createThreads(15, tester, stop, leaderIsUp); + // Let them spam for a second. sleep(1); @@ -71,7 +76,7 @@ struct ForkCheckTest : tpunit::TestFixture { leaderIsUp = false; tester.getTester(0).stopServer(); - // Spam a few more commands and then we can stop. + // Spam a few more commands so thar the follower is ahead of the stopped leader, and then we can stop. sleep(1); stop = true; for (auto& t : threads) { @@ -125,4 +130,8 @@ struct ForkCheckTest : tpunit::TestFixture { // to wait for this timeout. tester.getTester(0).stopServer(); } + + void forkAtCrash() { + + } } __ForkCheckTest; From ac2aa66647d2fe6c331393bf2058ae62935bb2e9 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Thu, 5 Dec 2024 10:05:00 -0800 Subject: [PATCH 13/18] Remove second test --- test/clustertest/tests/ForkCheckTest.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/test/clustertest/tests/ForkCheckTest.cpp b/test/clustertest/tests/ForkCheckTest.cpp index 631df5264..835cc76cd 100644 --- a/test/clustertest/tests/ForkCheckTest.cpp +++ b/test/clustertest/tests/ForkCheckTest.cpp @@ -4,13 +4,13 @@ #include #include #include +#include #include struct ForkCheckTest : tpunit::TestFixture { ForkCheckTest() : tpunit::TestFixture("ForkCheck", - TEST(ForkCheckTest::forkAtShutDown), - TEST(ForkCheckTest::forkAtCrash)) {} + TEST(ForkCheckTest::forkAtShutDown)) {} pair getMaxJournalCommit(BedrockTester& tester, bool online = true) { SQResult journals; @@ -66,7 +66,7 @@ struct ForkCheckTest : tpunit::TestFixture { // We want to not spam a stopped leader. atomic leaderIsUp(true); - // Now create 15 threads spamming 100 commands at a time, each. 15 cause we have five nodes. + // Now create 15 threads spamming 100 commands at a time, each. 15 because we have five nodes. vector threads = createThreads(15, tester, stop, leaderIsUp); // Let them spam for a second. @@ -130,8 +130,4 @@ struct ForkCheckTest : tpunit::TestFixture { // to wait for this timeout. tester.getTester(0).stopServer(); } - - void forkAtCrash() { - - } } __ForkCheckTest; From e6774554997c399f2626d3e5f417de3c8d382c71 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Thu, 5 Dec 2024 13:24:01 -0800 Subject: [PATCH 14/18] Fix last instance of _forkedFrom --- sqlitecluster/SQLiteNode.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index a26c84f71..5042e2e3e 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2027,15 +2027,17 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance _db.setCommitEnabled(true); } -#if 0 // If we're going searching and have forked from at least 1 peer, sleep for a second. This is intended to prevent thousands of lines of log spam when this happens in an infinite // loop. It's entirely possible that we do this for valid reasons - it may be the peer that has the bad database and not us, and there are plenty of other reasons we could switch to // SEARCHING, but in those cases, we just wait an extra second before trying again. - if (newState == SQLiteNodeState::SEARCHING && _forkedFrom.size()) { + bool forkedPeers = false; + for (const auto p : _peerList) { + forkedPeers = forkedPeers || p->forked; + } + if (newState == SQLiteNodeState::SEARCHING && forkedPeers) { SWARN("Going searching while forked peers present, sleeping 1 second." << _getLostQuorumLogMessage()); sleep(1); } -#endif // Additional logic for some new states if (newState == SQLiteNodeState::LEADING) { From 54cdcbc1ca8e549387338781da27933e9336fab1 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Thu, 5 Dec 2024 19:44:30 -0800 Subject: [PATCH 15/18] Code review feedback --- sqlitecluster/SQLiteNode.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 2bd99cd2f..e7c1a3703 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1720,7 +1720,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } } else if (SIEquals(message.methodLine, "FORKED")) { peer->forked = true; - PINFO("Peer said we're forked, beleiving them."); + PINFO("Peer said we're forked, believing them."); _dieIfForkedFromCluster(); // If leader said we're forked from it, we need a new leader. @@ -2825,14 +2825,23 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { } void SQLiteNode::_dieIfForkedFromCluster() { - size_t forkedCount = 0; + size_t quorumNodeCount = 0; + size_t forkedFullPeerCount = 0; for (const auto& p : _peerList) { - if (p->forked) { - forkedCount++; + if (!p->permaFollower) { + forkedFullPeerCount++; + if (p->forked) { + forkedFullPeerCount++; + } } } - if (forkedCount > ((_peerList.size() + 1) / 2)) { - SERROR("I have forked from over half the cluster (" << forkedCount << " nodes). This is unrecoverable." << _getLostQuorumLogMessage()); + // Am *I* a permaFollower? + if (_originalPriority != 0) { + quorumNodeCount++; + } + + if (forkedFullPeerCount >= (quorumNodeCount + 1) / 2) { + SERROR("I have forked from over half the cluster (" << forkedFullPeerCount << " nodes). This is unrecoverable." << _getLostQuorumLogMessage()); } } From f4bb1f1a58c2076d75b2b3becbcd3810bfb33bac Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 6 Dec 2024 14:22:41 -0800 Subject: [PATCH 16/18] Fix wrong count being incremented --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 29aad0bcc..e0281c1e1 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2837,7 +2837,7 @@ void SQLiteNode::_dieIfForkedFromCluster() { size_t forkedFullPeerCount = 0; for (const auto& p : _peerList) { if (!p->permaFollower) { - forkedFullPeerCount++; + quorumNodeCount++; if (p->forked) { forkedFullPeerCount++; } From 8d3fad988ae525735cef2fc54b7e0ce4ca87585b Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 6 Dec 2024 14:25:30 -0800 Subject: [PATCH 17/18] Change comment --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index e0281c1e1..2907e7e84 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1273,7 +1273,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { return; } - // We allow PING and PONG even for bad peers just to avoid them getting caught in reconnect cycles. + // We ignore everything except PING and PONG from forked nodes, so we can return here in that case. if (peer->forked) { PINFO("Received message " << message.methodLine << " from forked peer, ignoring."); return; From c86f79953454fe2b25ac3f8ad0321fdc03f82ae5 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 6 Dec 2024 14:26:08 -0800 Subject: [PATCH 18/18] Update sqlitecluster/SQLiteNode.cpp Co-authored-by: Carlos Alvarez --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 2907e7e84..5a75611cd 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2844,7 +2844,7 @@ void SQLiteNode::_dieIfForkedFromCluster() { } } - // Am *I* a permaFollower? + // Increase quorumNodeCount if *I* am not a permafollower if (_originalPriority != 0) { quorumNodeCount++; }