diff --git a/libstuff/SLog.cpp b/libstuff/SLog.cpp index eb644d28b..ff94e5a9b 100644 --- a/libstuff/SLog.cpp +++ b/libstuff/SLog.cpp @@ -49,6 +49,9 @@ static set PARAMS_WHITELIST = { "indexName", "isUnique", "logParam", + "message", + "peer", + "reason", "requestID", "status", "userID", diff --git a/libstuff/sqlite3.c b/libstuff/sqlite3.c index e95e639a2..b3c5eebf7 100644 --- a/libstuff/sqlite3.c +++ b/libstuff/sqlite3.c @@ -18,7 +18,7 @@ ** separate file. This file contains only code for the core SQLite library. ** ** The content in this amalgamation comes from Fossil check-in -** 5f9f6764e9dffef60213bbc9604940ddfc71. +** 65b753735b8e8fb70d2b522d527426f1eb5c. */ #define SQLITE_CORE 1 #define SQLITE_AMALGAMATION 1 @@ -465,7 +465,7 @@ extern "C" { */ #define SQLITE_VERSION "3.47.0" #define SQLITE_VERSION_NUMBER 3047000 -#define SQLITE_SOURCE_ID "2024-12-05 19:45:14 5f9f6764e9dffef60213bbc9604940ddfc713436333c3f62ed8a090697fcbb1e" +#define SQLITE_SOURCE_ID "2024-12-06 17:52:38 65b753735b8e8fb70d2b522d527426f1eb5c09339fb4b15cf69cbd2e595b160f" /* ** CAPI3REF: Run-Time Library Version Numbers @@ -18234,21 +18234,22 @@ struct sqlite3 { #define SCHEMA_TIME_AFTER_STAT1 12 #define SCHEMA_TIME_AFTER_DEFAULTS 13 -#define SCHEMA_TIME_AFTER_STAT4_Q1 14 -#define SCHEMA_TIME_AFTER_STAT4_Q2 15 -#define SCHEMA_TIME_AFTER_STAT4 16 +#define SCHEMA_TIME_STAT4_Q1_BODY 14 +#define SCHEMA_TIME_AFTER_STAT4_Q1 15 +#define SCHEMA_TIME_AFTER_STAT4_Q2 16 +#define SCHEMA_TIME_AFTER_STAT4 17 -#define SCHEMA_TIME_END_ANALYZE_LOAD 17 -#define SCHEMA_TIME_FINISH 18 +#define SCHEMA_TIME_END_ANALYZE_LOAD 18 +#define SCHEMA_TIME_FINISH 19 -#define SCHEMA_TIME_N 19 -#define SCHEMA_TIME_TIMEOUT (2 * 1000 * 1000) +#define SCHEMA_TIME_N 20 +#define SCHEMA_TIME_TIMEOUT (0 * 1000 * 1000) #define sqlite3PrepareTimeSet(x,y) sqlite3CommitTimeSet(x,y) SQLITE_PRIVATE void sqlite3PrepareTimeLog(const char *zSql, int nSql, u64 *aPrepareTime); -SQLITE_PRIVATE void sqlite3SchemaTimeLog(u64 *aSchemaTime); +SQLITE_PRIVATE void sqlite3SchemaTimeLog(u64 *aSchemaTime, const char *zFile); #define PREPARE_TIME_TIMEOUT (2 * 1000 * 1000) /* 2 second timeout */ @@ -67566,6 +67567,7 @@ static void walCleanupHash(Wal *pWal){ ** 8, and p must be aligned to an 8-byte boundary. */ static void zero64(void *p, int n){ +#if defined(__x86_64__) size_t c = n / sizeof(u64); void *d = p; @@ -67578,6 +67580,9 @@ static void zero64(void *p, int n){ : "a" (0) : "memory" ); +#else + memset(p, 0, n); +#endif } /* @@ -93469,7 +93474,7 @@ SQLITE_PRIVATE void sqlite3CommitTimeLog(u64 *aCommit){ } zStr = sqlite3_mprintf("%z%s%s%d%s", zStr, (zStr?", ":""),zHash,iVal,zU); } - sqlite3_log(SQLITE_WARNING, "slow commit (v=15): (%s)", zStr); + sqlite3_log(SQLITE_WARNING, "slow commit (v=16): (%s)", zStr); sqlite3_free(zStr); } } @@ -93497,12 +93502,12 @@ SQLITE_PRIVATE void sqlite3PrepareTimeLog(const char *zSql, int nSql, u64 *aPrep } if( nByte<0 ){ nByte = sqlite3Strlen30(zSql); } sqlite3_log(SQLITE_WARNING, - "slow prepare (v=15): (%s) [%.*s]", zStr, nByte, zSql + "slow prepare (v=16): (%s) [%.*s]", zStr, nByte, zSql ); sqlite3_free(zStr); } } -SQLITE_PRIVATE void sqlite3SchemaTimeLog(u64 *aSchema){ +SQLITE_PRIVATE void sqlite3SchemaTimeLog(u64 *aSchema, const char *zFile){ u64 i1 = aSchema[SCHEMA_TIME_START]; assert( SCHEMA_TIME_START==0 && SCHEMA_TIME_FINISH==SCHEMA_TIME_N-1 ); if( aSchema[SCHEMA_TIME_FINISH]>(i1+SCHEMA_TIME_TIMEOUT) ){ @@ -93513,7 +93518,7 @@ SQLITE_PRIVATE void sqlite3SchemaTimeLog(u64 *aSchema){ (aSchema[ii]==0 ? 0 : (int)(aSchema[ii] - i1)) ); } - sqlite3_log(SQLITE_WARNING, "slow schema (v=15): (%s)", zStr); + sqlite3_log(SQLITE_WARNING, "slow schema (%s) (v=16): (%s)", zFile, zStr); sqlite3_free(zStr); } } @@ -123796,6 +123801,8 @@ static int loadStatTbl( tRowcnt *pSpace; /* Available allocated memory space */ u8 *pPtr; /* Available memory as a u8 for easier manipulation */ + u64 t = sqlite3STimeNow(); + zIndex = (char *)sqlite3_column_text(pStmt, 0); if( zIndex==0 ) continue; nSample = sqlite3_column_int(pStmt, 1); @@ -123835,6 +123842,9 @@ static int loadStatTbl( pIdx->aSample[i].anDLt = pSpace; pSpace += nIdxCol; } assert( ((u8*)pSpace)-nByte==(u8*)(pIdx->aSample) ); + if( db->aSchemaTime ){ + db->aSchemaTime[SCHEMA_TIME_STAT4_Q1_BODY] += (sqlite3STimeNow() - t); + } } rc = sqlite3_finalize(pStmt); if( rc ) return rc; @@ -145940,7 +145950,10 @@ SQLITE_PRIVATE int sqlite3InitOne(sqlite3 *db, int iDb, char **pzErrMsg, u32 mFl error_out: db->aSchemaTime = 0; sqlite3PrepareTimeSet(aSchemaTime, SCHEMA_TIME_FINISH); - sqlite3SchemaTimeLog(aSchemaTime); + if( rc==SQLITE_OK && iDb==0 ){ + const char *zFile = sqlite3BtreeGetFilename(pDb->pBt); + sqlite3SchemaTimeLog(aSchemaTime, zFile); + } if( rc ){ if( rc==SQLITE_NOMEM || rc==SQLITE_IOERR_NOMEM ){ sqlite3OomFault(db); @@ -257904,7 +257917,7 @@ static void fts5SourceIdFunc( ){ assert( nArg==0 ); UNUSED_PARAM2(nArg, apUnused); - sqlite3_result_text(pCtx, "fts5: 2024-12-05 19:45:14 5f9f6764e9dffef60213bbc9604940ddfc713436333c3f62ed8a090697fcbb1e", -1, SQLITE_TRANSIENT); + sqlite3_result_text(pCtx, "fts5: 2024-12-06 17:52:38 65b753735b8e8fb70d2b522d527426f1eb5c09339fb4b15cf69cbd2e595b160f", -1, SQLITE_TRANSIENT); } /* diff --git a/libstuff/sqlite3.h b/libstuff/sqlite3.h index 6287d400b..ff4f7b3f8 100644 --- a/libstuff/sqlite3.h +++ b/libstuff/sqlite3.h @@ -148,7 +148,7 @@ extern "C" { */ #define SQLITE_VERSION "3.47.0" #define SQLITE_VERSION_NUMBER 3047000 -#define SQLITE_SOURCE_ID "2024-12-05 19:45:14 5f9f6764e9dffef60213bbc9604940ddfc713436333c3f62ed8a090697fcbb1e" +#define SQLITE_SOURCE_ID "2024-12-06 17:52:38 65b753735b8e8fb70d2b522d527426f1eb5c09339fb4b15cf69cbd2e595b160f" /* ** CAPI3REF: Run-Time Library Version Numbers diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 41b6d06df..5a75611cd 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -56,6 +56,20 @@ // dbCountAtStart: The highest committed transaction in the DB at the start of this transaction on leader, for // optimizing replication. +// NOTE: This comment as well as NODE_LOGIN should be removed after https://github.com/Expensify/Bedrock/pull/1999 is deployed. +// On LOGIN vs NODE_LOGIN. +// _onConnect sends a LOGIN message. +// _onConnect is called in exctly two places: +// 1. In response to a NODE_LOGIN message received on a newly connected socket on the sync port. It's expected when +// establishing a connection, a node sends this NODE_LOGIN as its first message. +// 2. Immediately following establishing a TCP connection to another node and sending a NODE_LOGIN message. In the case that +// we are the initiating node, we immediately queue three messages: +// 1. NODE_LOGIN +// 2. PING +// 3. LOGIN +// +// When we receive a NODE_LOGIN, we immediately respond with a PING followed by a LOGIN (by calling _onConnect). + #undef SLOGPREFIX #define SLOGPREFIX "{" << _name << "/" << SQLiteNode::stateName(_state) << "} " @@ -609,7 +623,7 @@ bool SQLiteNode::update() { // Find the freshest non-broken peer (including permafollowers). if (peer->loggedIn) { loggedInPeers.push_back(peer->name); - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it." << _getLostQuorumLogMessage()); continue; } @@ -727,7 +741,7 @@ bool SQLiteNode::update() { continue; } - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { // Forked nodes are treated as ineligible for leader, etc. SHMMM("Not counting forked peer " << peer->name << " for freshest, highestPriority, or currentLeader."); continue; @@ -831,7 +845,6 @@ bool SQLiteNode::update() { size_t numFullPeers = 0; size_t numLoggedInFullPeers = 0; size_t approveCount = 0; - size_t abstainCount = 0; if (_isShuttingDown) { SINFO("Shutting down while standing up, setting state to SEARCHING"); _changeState(SQLiteNodeState::SEARCHING); @@ -849,9 +862,6 @@ bool SQLiteNode::update() { // Has it responded yet? if (peer->standupResponse == SQLitePeer::Response::NONE) { // This peer hasn't yet responded. We do nothing with it in this case, maybe it will have responded by the next check. - } else if (peer->standupResponse == SQLitePeer::Response::ABSTAIN) { - PHMMM("Peer abstained from participation in quorum"); - abstainCount++; } else if (peer->standupResponse == SQLitePeer::Response::DENY) { // It responeded, but didn't approve -- abort PHMMM("Refused our STANDUP, cancel and RE-SEARCH"); @@ -864,16 +874,6 @@ bool SQLiteNode::update() { } } - // If the majority of full peers responds with abstain, then re-search. - const bool majorityAbstained = abstainCount * 2 > numFullPeers; - if (majorityAbstained) { - // Majority abstained, meaning we're probably forked, - // so we go back to searching so we can go back to synchronizing and see if we're forked. - SHMMM("Majority of full peers abstained; re-SEARCHING."); - _changeState(SQLiteNodeState::SEARCHING); - return true; // Re-update - } - // If everyone's responded with approval and we form a majority, then finish standup. bool majorityConnected = numLoggedInFullPeers * 2 >= numFullPeers; bool quorumApproved = approveCount * 2 >= numFullPeers; @@ -1261,13 +1261,22 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SINFO("Received PING from peer '" << peer->name << "'. Sending PONG."); SData pong("PONG"); pong["Timestamp"] = message["Timestamp"]; - peer->sendMessage(pong.serialize()); + peer->sendMessage(pong); return; } else if (SIEquals(message.methodLine, "PONG")) { // Latency must be > 0 because we treat 0 as "not connected". peer->latency = max(STimeNow() - message.calc64("Timestamp"), 1ul); SINFO("Received PONG from peer '" << peer->name << "' (" << peer->latency/1000 << "ms latency)"); return; + } else if (SIEquals(message.methodLine, "NODE_LOGIN")) { + // Do nothing, this keeps this code from warning until NODE_LOGIN is deprecated. + return; + } + + // We ignore everything except PING and PONG from forked nodes, so we can return here in that case. + if (peer->forked) { + PINFO("Received message " << message.methodLine << " from forked peer, ignoring."); + return; } // Every other message broadcasts the current state of the node @@ -1308,13 +1317,36 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // It's an error to have to peers configured with the same priority, except 0 and -1 SASSERT(_priority == -1 || _priority == 0 || message.calc("Priority") != _priority); - PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" - << message["CommitCount"] << " (" << message["Hash"] << ")"); peer->priority = message.calc("Priority"); - peer->loggedIn = true; peer->version = message["Version"]; peer->state = stateFromName(message["State"]); + uint64_t peerCommitCount; + string peerCommitHash; + bool hashesMatch = true; + peer->getCommit(peerCommitCount, peerCommitHash); + if (!peerCommitHash.empty() && peerCommitCount <= getCommitCount()) { + string query, hash; + _db.getCommit(peerCommitCount, query, hash); + hashesMatch = (peerCommitHash == hash); + } + + if (hashesMatch) { + PINFO("Peer logged in at '" << message["State"] << "', priority #" << message["Priority"] << " commit #" + << message["CommitCount"] << " (" << message["Hash"] << ")"); + peer->loggedIn = true; + } else { + PINFO("Peer is forked, marking as bad, will ignore."); + + // Send it a message before we mark it as forked, as we'll refuse afterward. + SData forked("FORKED"); + _sendToPeer(peer, forked); + + // And mark it dead until it reconnects. + peer->forked = true; + _dieIfForkedFromCluster(); + } + // If the peer is already standing up, go ahead and approve or deny immediately. if (peer->state == SQLiteNodeState::STANDINGUP) { _sendStandupResponse(peer, message); @@ -1434,9 +1466,6 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (SIEquals(message["Response"], "approve")) { PINFO("Received standup approval"); peer->standupResponse = SQLitePeer::Response::APPROVE; - } else if (SIEquals(message["Response"], "abstain")) { - PINFO("Received standup abstain"); - peer->standupResponse = SQLitePeer::Response::ABSTAIN; } else { PHMMM("Received standup denial: reason='" << message["Reason"] << "'"); peer->standupResponse = SQLitePeer::Response::DENY; @@ -1451,7 +1480,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SINFO("Asked to help SYNCHRONIZE but shutting down."); SData response("SYNCHRONIZE_RESPONSE"); response["ShuttingDown"] = "true"; - peer->sendMessage(response); + _sendToPeer(peer, response); } else { _pendingSynchronizeResponses++; static atomic synchronizeCount(0); @@ -1466,13 +1495,17 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // The following two lines are copied from `_sendToPeer`. response["CommitCount"] = to_string(db.getCommitCount()); response["Hash"] = db.getCommittedHash(); - peer->sendMessage(response); + _sendToPeer(peer, response); } catch (const SException& e) { // This is the same handling as at the bottom of _onMESSAGE. - PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); + SWARN("Error processing message, reconnecting", { + {"peer", peer ? peer->name : "unknown"}, + {"message", !message.empty()? message.methodLine : ""}, + {"reason", e.what()} + }); SData reconnect("RECONNECT"); reconnect["Reason"] = e.what(); - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } @@ -1489,17 +1522,13 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SQResult result; uint64_t commitNum = SToUInt64(message["hashMismatchNumber"]); _db.getCommits(commitNum, commitNum, result); - _forkedFrom.insert(peer->name); + peer->forked = true; SALERT("Hash mismatch. Peer " << peer->name << " and I have forked at commit " << message["hashMismatchNumber"] - << ". I have forked from " << _forkedFrom.size() << " other nodes. I am " << stateName(_state) - << " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "." - << _getLostQuorumLogMessage()); - - if (_forkedFrom.size() > ((_peerList.size() + 1) / 2)) { - SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable." << _getLostQuorumLogMessage()); - } + << ". I am " << stateName(_state) << " and have hash " << result[0][0] << " for that commit. Peer has hash " + << message["hashMismatchValue"] << "." << _getLostQuorumLogMessage()); + _dieIfForkedFromCluster(); STHROW("Hash mismatch"); } if (!_syncPeer) { @@ -1693,14 +1722,38 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { << e.what() << "', ignoring."); } } + } else if (SIEquals(message.methodLine, "FORKED")) { + peer->forked = true; + PINFO("Peer said we're forked, believing them."); + _dieIfForkedFromCluster(); + + // If leader said we're forked from it, we need a new leader. + if (peer == _leadPeer) { + _leadPeer = nullptr; + _changeState(SQLiteNodeState::SEARCHING); + } + + // If our sync peer said we're forked from it, and we're currently synchronizing, we need a new sync peer. + // However, if we're not currently syncing, then we don't need to change states, this peer could have been chosen + // hours or days ago. + if (peer == _syncPeer) { + _syncPeer = nullptr; + if (_state == SQLiteNodeState::SYNCHRONIZING) { + _changeState(SQLiteNodeState::SEARCHING); + } + } } else { - STHROW("unrecognized message"); + PINFO("unrecognized message: " + message.methodLine); } } catch (const SException& e) { - PWARN("Error processing message '" << message.methodLine << "' (" << e.what() << "), reconnecting."); + SWARN("Error processing message, reconnecting", { + {"peer", peer ? peer->name : "unknown"}, + {"message", !message.empty() ? message.methodLine : ""}, + {"reason", e.what()} + }); SData reconnect("RECONNECT"); reconnect["Reason"] = e.what(); - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } } @@ -1708,13 +1761,15 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { void SQLiteNode::_onConnect(SQLitePeer* peer) { SASSERT(peer); SASSERTWARN(!peer->loggedIn); - // Send the LOGIN - PINFO("Sending LOGIN"); SData login("LOGIN"); + login["Name"] = _name; login["Priority"] = to_string(_priority); login["State"] = stateName(_state); login["Version"] = _version; login["Permafollower"] = _originalPriority ? "false" : "true"; + PINFO("Sending " << login.serialize()); + + // NOTE: the following call adds CommitCount, Hash, and commandAddress fields. _sendToPeer(peer, login); } @@ -1819,18 +1874,26 @@ void SQLiteNode::_sendToPeer(SQLitePeer* peer, const SData& message) { // We can treat this whole function as atomic and thread-safe as it sends data to a peer with it's own atomic // `sendMessage` and the peer itself (assuming it's something from _peerList, which, if not, don't do that) is // const and will exist without changing until destruction. - peer->sendMessage(_addPeerHeaders(message).serialize()); + if (peer->forked) { + PINFO("Skipping message " << message.methodLine << " to forked peer."); + return; + } + peer->sendMessage(_addPeerHeaders(message)); } void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { - const string serializedMessage = _addPeerHeaders(message).serialize(); + const SData messageWithHeaders = _addPeerHeaders(message); // Loop across all connected peers and send the message. _peerList is const so this is thread-safe. for (auto peer : _peerList) { + if (peer->forked) { + PINFO("Skipping message " << message.methodLine << " to forked peer."); + continue; + } // This check is strictly thread-safe, as SQLitePeer::subscribed is atomic, but there's still a race condition // around checking subscribed and then sending, as subscribed could technically change. if (!subscribedOnly || peer->subscribed) { - peer->sendMessage(serializedMessage); + peer->sendMessage(messageWithHeaders); } } } @@ -1916,11 +1979,6 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance _leadPeer = nullptr; } - if (newState >= SQLiteNodeState::STANDINGUP) { - // Not forked from anyone. Note that this includes both LEADING and FOLLOWING. - _forkedFrom.clear(); - } - // Re-enable commits if they were disabled during a previous stand-down. if (newState != SQLiteNodeState::SEARCHING) { _db.setCommitEnabled(true); @@ -1929,7 +1987,11 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance // If we're going searching and have forked from at least 1 peer, sleep for a second. This is intended to prevent thousands of lines of log spam when this happens in an infinite // loop. It's entirely possible that we do this for valid reasons - it may be the peer that has the bad database and not us, and there are plenty of other reasons we could switch to // SEARCHING, but in those cases, we just wait an extra second before trying again. - if (newState == SQLiteNodeState::SEARCHING && _forkedFrom.size()) { + bool forkedPeers = false; + for (const auto p : _peerList) { + forkedPeers = forkedPeers || p->forked; + } + if (newState == SQLiteNodeState::SEARCHING && forkedPeers) { SWARN("Going searching while forked peers present, sleeping 1 second." << _getLostQuorumLogMessage()); sleep(1); } @@ -2132,7 +2194,7 @@ void SQLiteNode::_updateSyncPeer() continue; } - if (_forkedFrom.count(peer->name)) { + if (peer->forked) { SWARN("Hash mismatch. Can't choose peer " << peer->name << " due to previous hash mismatch."); continue; } @@ -2494,12 +2556,14 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { int messageSize = message.deserialize(socket->recvBuffer); if (messageSize) { socket->recvBuffer.consumeFront(messageSize); - if (SIEquals(message.methodLine, "NODE_LOGIN")) { + // Allow either LOGIN or NODE_LOGIN until we deprecate NODE_LOGIN. + if (SIEquals(message.methodLine, "NODE_LOGIN") || SIEquals(message.methodLine, "LOGIN")) { SQLitePeer* peer = getPeerByName(message["Name"]); if (peer) { if (peer->setSocket(socket)) { - _sendPING(peer); _onConnect(peer); + _sendPING(peer); + _onMESSAGE(peer, message); // Connected OK, don't need in _unauthenticatedIncomingSockets anymore. socketsToRemove.push_back(socket); @@ -2518,7 +2582,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { STHROW("Unauthenticated node '" + message["Name"] + "' attempted to connected, rejecting."); } } else { - STHROW("expecting NODE_LOGIN"); + STHROW("expecting LOGIN or NODE_LOGIN"); } } else if (STimeNow() > socket->lastRecvTime + 5'000'000) { STHROW("Incoming socket didn't send a message for over 5s, closing."); @@ -2541,18 +2605,19 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { + // When NODE_LOGIN is deprecated, we can remove the next 3 lines. SData login("NODE_LOGIN"); login["Name"] = _name; - peer->sendMessage(login.serialize()); - _sendPING(peer); + _sendToPeer(peer, login); _onConnect(peer); + _sendPING(peer); } break; case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR: { SData reconnect("RECONNECT"); reconnect["Reason"] = "socket error"; - peer->sendMessage(reconnect.serialize()); + _sendToPeer(peer, reconnect); peer->shutdownSocket(); } break; @@ -2610,7 +2675,7 @@ void SQLiteNode::_sendPING(SQLitePeer* peer) { SASSERT(peer); SData ping("PING"); ping["Timestamp"] = SToStr(STimeNow()); - peer->sendMessage(ping.serialize()); + peer->sendMessage(ping); } SQLitePeer* SQLiteNode::getPeerByName(const string& name) const { @@ -2700,14 +2765,6 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { return; } - if (_forkedFrom.count(peer->name)) { - PHMMM("Forked from peer, can't approve standup."); - response["Response"] = "abstain"; - response["Reason"] = "We are forked"; - _sendToPeer(peer, response); - return; - } - // What's our state if (SWITHIN(SQLiteNodeState::STANDINGUP, _state, SQLiteNodeState::STANDINGDOWN)) { // Oh crap, it's trying to stand up while we're leading. Who is higher priority? @@ -2774,3 +2831,25 @@ void SQLiteNode::_sendStandupResponse(SQLitePeer* peer, const SData& message) { } _sendToPeer(peer, response); } + +void SQLiteNode::_dieIfForkedFromCluster() { + size_t quorumNodeCount = 0; + size_t forkedFullPeerCount = 0; + for (const auto& p : _peerList) { + if (!p->permaFollower) { + quorumNodeCount++; + if (p->forked) { + forkedFullPeerCount++; + } + } + } + + // Increase quorumNodeCount if *I* am not a permafollower + if (_originalPriority != 0) { + quorumNodeCount++; + } + + if (forkedFullPeerCount >= (quorumNodeCount + 1) / 2) { + SERROR("I have forked from over half the cluster (" << forkedFullPeerCount << " nodes). This is unrecoverable." << _getLostQuorumLogMessage()); + } +} diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 652f157f0..57c5c1056 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -276,6 +276,8 @@ class SQLiteNode : public STCPManager { // commitCount that we do, this will return null. void _updateSyncPeer(); + void _dieIfForkedFromCluster(); + const string _commandAddress; const string _name; const vector _peerList; @@ -384,13 +386,6 @@ class SQLiteNode : public STCPManager { // This can be removed once we've figured out why replication falls behind. See this issue: https://github.com/Expensify/Expensify/issues/210528 atomic _concurrentReplicateTransactions = 0; - // We keep a set of strings that are the names of nodes we've forked from, in the case we ever receive a hash mismatch while trying to synchronize. - // Whenever we become LEADING or FOLLOWING this is cleared. This resets the case where one node has forked, we attempt to synchronize from it, and fail, - // but later synchronize from someone else. Once we've come up completely, we no longer "hold a grudge" against this node, which will likely get fixed - // while we're online. - // In the event that this list becomes longer than half the cluster size, the node kills itself and logs that it's in an unrecoverable state. - set _forkedFrom; - // A pointer to a SQLite instance that is passed to plugin's stateChanged function. This prevents plugins from operating on the same handle that // the sync node is when they run queries in stateChanged. SQLite* pluginDB; diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 82c2de8a2..c93589520 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -26,6 +26,7 @@ SQLitePeer::SQLitePeer(const string& name_, const string& host_, const STable& p transactionResponse(Response::NONE), version(), lastPingTime(0), + forked(false), hash() { } @@ -79,6 +80,7 @@ void SQLitePeer::reset() { version = ""; lastPingTime = 0, setCommit(0, ""); + forked = false; } void SQLitePeer::shutdownSocket() { @@ -205,9 +207,6 @@ string SQLitePeer::responseName(Response response) { case Response::DENY: return "DENY"; break; - case Response::ABSTAIN: - return "ABSTAIN"; - break; default: return ""; } diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 0eb984656..48bc3433c 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -8,8 +8,7 @@ class SQLitePeer { enum class Response { NONE, APPROVE, - DENY, - ABSTAIN + DENY }; enum class PeerPostPollStatus { @@ -91,6 +90,9 @@ class SQLitePeer { atomic version; atomic lastPingTime; + // Set to true when this peer is known to be unusable, I.e., when it has a database that is forked from us. + atomic forked; + private: // For initializing the permafollower value from the params list. static bool isPermafollower(const STable& params); diff --git a/test/clustertest/tests/ForkCheckTest.cpp b/test/clustertest/tests/ForkCheckTest.cpp index 6e0c0d262..835cc76cd 100644 --- a/test/clustertest/tests/ForkCheckTest.cpp +++ b/test/clustertest/tests/ForkCheckTest.cpp @@ -1,13 +1,16 @@ +#include "test/lib/BedrockTester.h" #include #include #include #include +#include #include struct ForkCheckTest : tpunit::TestFixture { ForkCheckTest() - : tpunit::TestFixture("ForkCheck", TEST(ForkCheckTest::test)) {} + : tpunit::TestFixture("ForkCheck", + TEST(ForkCheckTest::forkAtShutDown)) {} pair getMaxJournalCommit(BedrockTester& tester, bool online = true) { SQResult journals; @@ -30,40 +33,42 @@ struct ForkCheckTest : tpunit::TestFixture { return make_pair(maxJournalCommit, maxJournalTable); } - void test() { - // Create a cluster, wait for it to come up. - BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER); - - // We'll tell the threads to stop when they're done. - atomic stop(false); - - // We want to not spam a stopped leader. - atomic leaderIsUp(true); - + vector createThreads(size_t num, BedrockClusterTester& tester, atomic& stop, atomic& leaderIsUp) { // Just use a bunch of copies of the same command. - SData spamCommand("idcollision"); - - // In a vector. - const vector commands(100, spamCommand); - - // Now create 9 threads spamming 100 commands at a time, each. 9 cause we have three nodes. vector threads; - for (size_t i = 0; i < 9; i++) { - threads.emplace_back([&tester, i, &commands, &stop, &leaderIsUp](){ + for (size_t num = 0; num < 9; num++) { + threads.emplace_back([&tester, num, &stop, &leaderIsUp](){ + const vector commands(100, SData("idcollision")); while (!stop) { // Pick a tester, send, don't care about the result. - size_t testerNum = i % 3; + size_t testerNum = num % 5; if (testerNum == 0 && !leaderIsUp) { - // If we're looking for leader and it's down, wait a second to avoid pegging the CPU. - sleep(1); - } else { - // If we're not leader or leader is up, spam away! - tester.getTester(testerNum).executeWaitMultipleData(commands); + // If leader's off, don't use it. + testerNum = 1; } + tester.getTester(testerNum).executeWaitMultipleData(commands); } }); } + return threads; + } + + // This primary test here checks that a node that is forked will not be able to rejoin the cluster when reconnecting. + // This is a reasonable test for a fork that happens at shutdown. + void forkAtShutDown() { + // Create a cluster, wait for it to come up. + BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER); + + // We'll tell the threads to stop when they're done. + atomic stop(false); + + // We want to not spam a stopped leader. + atomic leaderIsUp(true); + + // Now create 15 threads spamming 100 commands at a time, each. 15 because we have five nodes. + vector threads = createThreads(15, tester, stop, leaderIsUp); + // Let them spam for a second. sleep(1); @@ -71,7 +76,7 @@ struct ForkCheckTest : tpunit::TestFixture { leaderIsUp = false; tester.getTester(0).stopServer(); - // Spam a few more commands and then we can stop. + // Spam a few more commands so thar the follower is ahead of the stopped leader, and then we can stop. sleep(1); stop = true; for (auto& t : threads) { @@ -119,5 +124,10 @@ struct ForkCheckTest : tpunit::TestFixture { // And that signal should have been ABORT. ASSERT_EQUAL(SIGABRT, WTERMSIG(status)); + + // We call stopServer on the forked leader because it crashed, but the cluster tester doesn't realize, so shutting down + // normally will time out after a minute. Calling `stopServer` explicitly will clear the server PID, and we won't need + // to wait for this timeout. + tester.getTester(0).stopServer(); } } __ForkCheckTest; diff --git a/test/clustertest/tests/ForkedNodeApprovalTest.cpp b/test/clustertest/tests/ForkedNodeApprovalTest.cpp deleted file mode 100644 index 197b883cc..000000000 --- a/test/clustertest/tests/ForkedNodeApprovalTest.cpp +++ /dev/null @@ -1,161 +0,0 @@ - -#include - -#include -#include -#include -#include - -struct ForkedNodeApprovalTest : tpunit::TestFixture { - ForkedNodeApprovalTest() - : tpunit::TestFixture("ForkedNodeApproval", TEST(ForkedNodeApprovalTest::test)) {} - - pair getMaxJournalCommit(BedrockTester& tester, bool online = true) { - SQResult journals; - tester.readDB("SELECT name FROM sqlite_schema WHERE type ='table' AND name LIKE 'journal%';", journals, online); - uint64_t maxJournalCommit = 0; - string maxJournalTable; - for (auto& row : journals.rows) { - string maxID = tester.readDB("SELECT MAX(id) FROM " + row[0] + ";", online); - try { - uint64_t maxCommitNum = stoull(maxID); - if (maxCommitNum > maxJournalCommit) { - maxJournalCommit = maxCommitNum; - maxJournalTable = row[0]; - } - } catch (const invalid_argument& e) { - // do nothing, skip this journal with no entries. - continue; - } - } - return make_pair(maxJournalCommit, maxJournalTable); - } - - void test() { - // Create a cluster, wait for it to come up. - BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER); - - // We'll tell the threads to stop when they're done. - atomic stop(false); - - // We want to not spam a stopped leader. - atomic leaderIsUp(true); - - // Just use a bunch of copies of the same command. - SData spamCommand("idcollision"); - - // In a vector. - const vector commands(100, spamCommand); - - // Now create 9 threads spamming 100 commands at a time, each. 9 cause we have three nodes. - vector threads; - for (size_t i = 0; i < 9; i++) { - threads.emplace_back([&tester, i, &commands, &stop, &leaderIsUp](){ - while (!stop) { - // Pick a tester, send, don't care about the result. - size_t testerNum = i % 3; - if (testerNum == 0 && !leaderIsUp) { - // If we're looking for leader and it's down, wait a second to avoid pegging the CPU. - sleep(1); - } else { - // If we're not leader or leader is up, spam away! - tester.getTester(testerNum).executeWaitMultipleData(commands); - } - } - }); - } - - // Let them spam for a second. - sleep(1); - - // We can try and stop the leader. - leaderIsUp = false; - tester.getTester(0).stopServer(); - - // Spam a few more commands and then we can stop. - sleep(1); - stop = true; - for (auto& t : threads) { - t.join(); - } - - // Fetch the latest journal commits on leader and follower - auto result = getMaxJournalCommit(tester.getTester(0), false); - - uint64_t leaderMaxCommit = result.first; - string leaderMaxCommitJournal = result.second; - result = getMaxJournalCommit(tester.getTester(1)); - uint64_t followerMaxCommit = result.first; - - // Make sure the follower got farther than the leader. - ASSERT_GREATER_THAN(followerMaxCommit, leaderMaxCommit); - - // We need to release any DB that the tester is holding. - tester.getTester(0).freeDB(); - tester.getTester(1).freeDB(); - - // Break leader. - { - string filename = tester.getTester(0).getArg("-db"); - string query = "UPDATE " + leaderMaxCommitJournal + " SET hash = 'abcdef123456' WHERE id = " + to_string(leaderMaxCommit) + ";"; - - sqlite3* db = nullptr; - sqlite3_open_v2(filename.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL); - char* errMsg = nullptr; - sqlite3_exec(db, query.c_str(), 0, 0, &errMsg); - if (errMsg) { - cout << "Error updating db: " << errMsg << endl; - ASSERT_TRUE(false); - } - sqlite3_close_v2(db); - } - - // Stop the second follower. - tester.getTester(2).stopServer(); - - // Start the broken leader back up. - tester.getTester(0).startServer(false); - - // We should not get a leader, the primary leader needs to synchronize, but can't because it's forked. - // The secondary leader should go leading, but can't, because it only receives `abstain` responses to standup requests. - // It's possible for the secondary leader to go leading once, but it should quickly fall out of leading when the fork is detected and primary leader reconnects. - // After that, it should not go leading again, primary leader should abstain from participation. - auto start = chrono::steady_clock::now(); - bool abstainDetected = false; - while (true) { - if (chrono::steady_clock::now() - start > 30s) { - cout << "It's been 30 seconds." << endl; - break; - } - SData command("Status"); - auto responseJson = tester.getTester(1).executeWaitMultipleData({command}, 1, true)[0].content; - - auto json = SParseJSONObject(responseJson); - auto peers = SParseJSONArray(json["peerList"]); - for (auto& peer : peers) { - auto peerJSON = SParseJSONObject(peer); - if (peerJSON["name"] == "cluster_node_0" && peerJSON["standupResponse"] == "ABSTAIN") { - abstainDetected = true; - break; - } - } - if (abstainDetected) { - break; - } - - // try again. - usleep(50'000); - } - - ASSERT_TRUE(abstainDetected); - - // Ok, now we can start the second follower back up and secondary leader should be able to lead. - tester.getTester(2).startServer(false); - ASSERT_TRUE(tester.getTester(1).waitForState("LEADING")); - - // We call stopServer on the forked leader because it crashed, but the cluster tester doesn't realize, so shutting down - // normally will time out after a minute. Calling `stopServer` explicitly will clear the server PID, and we won't need - // to wait for this timeout. - tester.getTester(0).stopServer(); - } -} __ForkedNodeApprovalTest;