From b1a1e946812216501894baf3890e3e74374ac0ed Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:26:07 +0000 Subject: [PATCH 01/22] receiving and calling new callback --- sqlitecluster/SQLiteNode.cpp | 10 +++++++++- sqlitecluster/SQLiteNode.h | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 21a958594..21adf634f 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1240,7 +1240,7 @@ bool SQLiteNode::update() { // Messages // Here are the messages that can be received, and how a cluster node will respond to each based on its state: -void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { +void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message, function commandPortCallback) { try { SASSERT(peer); SASSERTWARN(!message.empty()); @@ -1655,6 +1655,14 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } } } + // This will notify the BedrockServer's callback to block or unblock the command port based on + // how many commits we are behind. + { + SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false)); + SQLite& db = dbScope.db(); + const int64_t currentCommitDifference = message.calcU64("NewCount") - db.getCommitCount(); + commandPortCallback(currentCommitDifference); + } } catch (const system_error& e) { // If the server is strugling and falling behind on replication, we might have too many threads // causing a resource exhaustion. If that happens, all the transactions that are already threaded diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 817a4e9b9..75d66a86a 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -243,7 +243,7 @@ class SQLiteNode : public STCPManager { void _onDisconnect(SQLitePeer* peer); // Called when the peer sends us a message; throw an SException to reconnect. - void _onMESSAGE(SQLitePeer* peer, const SData& message); + void _onMESSAGE(SQLitePeer* peer, const SData& message, function commandPortCallback = nullptr); void _reconnectAll(); void _reconnectPeer(SQLitePeer* peer); void _recvSynchronize(SQLitePeer* peer, const SData& message); From ef6dc048c47e4d790e9bc2b8589c3ab38cbd7cb6 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:37:44 +0000 Subject: [PATCH 02/22] sending function on post poll --- sqlitecluster/SQLiteNode.cpp | 6 +++--- sqlitecluster/SQLiteNode.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 21adf634f..5e6bc53c7 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1657,7 +1657,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message, functiongetIndex(false)); SQLite& db = dbScope.db(); const int64_t currentCommitDifference = message.calcU64("NewCount") - db.getCommitCount(); @@ -2536,7 +2536,7 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { return socket; } -void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { +void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity, function commandPortCallback) { unique_lock uniqueLock(_stateMutex); // Accept any new peers @@ -2643,7 +2643,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { size_t messagesDeqeued = 0; while (true) { SData message = peer->popMessage(); - _onMESSAGE(peer, message); + _onMESSAGE(peer, messagem, commandPortCallback); messagesDeqeued++; if (messagesDeqeued >= 100) { // We should run again immediately, we have more to do. diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 75d66a86a..dbedbdd86 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -165,7 +165,7 @@ class SQLiteNode : public STCPManager { void kill(); // Handle any read/write events that occurred. - void postPoll(fd_map& fdm, uint64_t& nextActivity); + void postPoll(fd_map& fdm, uint64_t& nextActivity, function commandPortCallback = nullptr); // Constructor/Destructor SQLiteNode(SQLiteServer& server, shared_ptr dbPool, const string& name, const string& host, From 8603f433781d24dcd24af183689b9de414f8636a Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:38:32 +0000 Subject: [PATCH 03/22] adding callback function for blocking the port --- BedrockServer.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index c0cad65f3..d7d132491 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -253,7 +253,24 @@ void BedrockServer::sync() // Process any activity in our plugins. AutoTimerTime postPollTime(postPollTimer); - _syncNode->postPoll(fdm, nextActivity); + _syncNode->postPoll(fdm, nextActivity, [&](const int64_t currentCommitCountDiff){ + // If the command port is already closed, we don't need to check anything and can early return. + if (_isCommandPortLikelyBlocked) { + return; + } + const string blockReason = "COMMITS_LAGGING_BEHIND"; + // If + if (currentCommitCountDiff > 50'000) { + SINFO("Node is lagging behind, blocking command port so it can catch up."); + blockCommandPort(blockReason); + } else if (currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { + // We verify if we have the block reason we expected before unblocking so we don't call unblock every time, which will + // generate a warning if we don't have the block reason. + SINFO("Node is caught up enough, unblocking command port."); + unblockCommandPort(blockReason); + } + + }); _syncNodeQueuedCommands.postPoll(fdm); _notifyDoneSync.postPoll(fdm); } From 257b07c9fe641f8f374cd71d9dba20ffbca458bc Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:40:25 +0000 Subject: [PATCH 04/22] function should be logically correct now --- BedrockServer.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index d7d132491..13e86ba93 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -260,16 +260,15 @@ void BedrockServer::sync() } const string blockReason = "COMMITS_LAGGING_BEHIND"; // If - if (currentCommitCountDiff > 50'000) { + if (!_isCommandPortLikelyBlocked && currentCommitCountDiff > 50'000) { SINFO("Node is lagging behind, blocking command port so it can catch up."); blockCommandPort(blockReason); - } else if (currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { - // We verify if we have the block reason we expected before unblocking so we don't call unblock every time, which will - // generate a warning if we don't have the block reason. + } else if (_isCommandPortLikelyBlocked && currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { + // We verify if we have the block reason we expected before calling unblock. Unblock would generate a warning, and we don't + // want to do that if don't really need to. SINFO("Node is caught up enough, unblocking command port."); unblockCommandPort(blockReason); } - }); _syncNodeQueuedCommands.postPoll(fdm); _notifyDoneSync.postPoll(fdm); From 8d887836034a0a50c90c53ff9b76f647663d4921 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:48:15 +0000 Subject: [PATCH 05/22] adding new methods to the SQLiteServer interface --- sqlitecluster/SQLiteServer.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sqlitecluster/SQLiteServer.h b/sqlitecluster/SQLiteServer.h index 0d8804f0d..245892a66 100644 --- a/sqlitecluster/SQLiteServer.h +++ b/sqlitecluster/SQLiteServer.h @@ -13,4 +13,8 @@ class SQLiteServer : public STCPManager { // We call this method whenever a node changes state virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) = 0; + + // You must block and unblock the command port with *identical strings*. + virtual void blockCommandPort(const string& reason); + virtual void unblockCommandPort(const string& reason); }; From 82a1e21986280f25e2742e9e88e85c29ef886979 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:48:41 +0000 Subject: [PATCH 06/22] adding override into the methods --- BedrockServer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BedrockServer.h b/BedrockServer.h index 4a6175d27..a23bcc886 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -182,8 +182,8 @@ class BedrockServer : public SQLiteServer { void onNodeLogin(SQLitePeer* peer) override; // You must block and unblock the command port with *identical strings*. - void blockCommandPort(const string& reason); - void unblockCommandPort(const string& reason); + void blockCommandPort(const string& reason) override; + void unblockCommandPort(const string& reason) override; // Legacy version of above. void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false); From d4d19c624fa6373e41c4a74a82a977f4394ea9e9 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:51:12 +0000 Subject: [PATCH 07/22] fixing test plugin --- test/tests/SQLiteNodeTest.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/tests/SQLiteNodeTest.cpp b/test/tests/SQLiteNodeTest.cpp index 4bdb882e3..046c5e48a 100644 --- a/test/tests/SQLiteNodeTest.cpp +++ b/test/tests/SQLiteNodeTest.cpp @@ -26,6 +26,8 @@ class TestServer : public SQLiteServer { virtual bool canStandDown() { return true; } virtual void onNodeLogin(SQLitePeer* peer) { } virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {} + virtual void blockCommandPort(const string& reason) { }; + virtual void unblockCommandPort(const string& reason) { }; }; struct SQLiteNodeTest : tpunit::TestFixture { From d5384e3d7cc5e78619dc3e65f871d37254628722 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 17:53:32 +0000 Subject: [PATCH 08/22] removing new function, using the current port block functions --- sqlitecluster/SQLiteNode.cpp | 24 ++++++++++++++---------- sqlitecluster/SQLiteNode.h | 4 ++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 5e6bc53c7..8dbc73692 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1240,7 +1240,7 @@ bool SQLiteNode::update() { // Messages // Here are the messages that can be received, and how a cluster node will respond to each based on its state: -void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message, function commandPortCallback) { +void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { try { SASSERT(peer); SASSERTWARN(!message.empty()); @@ -1655,13 +1655,17 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message, functiongetIndex(false)); - SQLite& db = dbScope.db(); - const int64_t currentCommitDifference = message.calcU64("NewCount") - db.getCommitCount(); - commandPortCallback(currentCommitDifference); + const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); + const string blockReason = "COMMITS_LAGGING_BEHIND"; + // If + if (!_isCommandPortLikelyBlocked && currentCommitCountDiff > 50'000) { + SINFO("Node is lagging behind, blocking command port so it can catch up."); + blockCommandPort(blockReason); + } else if (_isCommandPortLikelyBlocked && currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { + // We verify if we have the block reason we expected before calling unblock. Unblock would generate a warning, and we don't + // want to do that if don't really need to. + SINFO("Node is caught up enough, unblocking command port."); + unblockCommandPort(blockReason); } } catch (const system_error& e) { // If the server is strugling and falling behind on replication, we might have too many threads @@ -2536,7 +2540,7 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { return socket; } -void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity, function commandPortCallback) { +void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { unique_lock uniqueLock(_stateMutex); // Accept any new peers @@ -2643,7 +2647,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity, functionpopMessage(); - _onMESSAGE(peer, messagem, commandPortCallback); + _onMESSAGE(peer, messagem); messagesDeqeued++; if (messagesDeqeued >= 100) { // We should run again immediately, we have more to do. diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index dbedbdd86..817a4e9b9 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -165,7 +165,7 @@ class SQLiteNode : public STCPManager { void kill(); // Handle any read/write events that occurred. - void postPoll(fd_map& fdm, uint64_t& nextActivity, function commandPortCallback = nullptr); + void postPoll(fd_map& fdm, uint64_t& nextActivity); // Constructor/Destructor SQLiteNode(SQLiteServer& server, shared_ptr dbPool, const string& name, const string& host, @@ -243,7 +243,7 @@ class SQLiteNode : public STCPManager { void _onDisconnect(SQLitePeer* peer); // Called when the peer sends us a message; throw an SException to reconnect. - void _onMESSAGE(SQLitePeer* peer, const SData& message, function commandPortCallback = nullptr); + void _onMESSAGE(SQLitePeer* peer, const SData& message); void _reconnectAll(); void _reconnectPeer(SQLitePeer* peer); void _recvSynchronize(SQLitePeer* peer, const SData& message); From a4bf7d181e99db21278653508cd15cc4382a9eb1 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 18:11:32 +0000 Subject: [PATCH 09/22] adding new method to check if port is closed or not --- BedrockServer.cpp | 18 +++++++----------- sqlitecluster/SQLiteServer.h | 1 + 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 13e86ba93..47a66934a 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -258,17 +258,6 @@ void BedrockServer::sync() if (_isCommandPortLikelyBlocked) { return; } - const string blockReason = "COMMITS_LAGGING_BEHIND"; - // If - if (!_isCommandPortLikelyBlocked && currentCommitCountDiff > 50'000) { - SINFO("Node is lagging behind, blocking command port so it can catch up."); - blockCommandPort(blockReason); - } else if (_isCommandPortLikelyBlocked && currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { - // We verify if we have the block reason we expected before calling unblock. Unblock would generate a warning, and we don't - // want to do that if don't really need to. - SINFO("Node is caught up enough, unblocking command port."); - unblockCommandPort(blockReason); - } }); _syncNodeQueuedCommands.postPoll(fdm); _notifyDoneSync.postPoll(fdm); @@ -1605,6 +1594,13 @@ void BedrockServer::unblockCommandPort(const string& reason) { } } +virtual void isCommandPortClosed(const string& reason) { + if (!strlen(reason)) { + return _isCommandPortLikelyBlocked; + } + return _commandPortBlockReasons.find(reason) != _commandPortBlockReasons.end(); +} + void BedrockServer::suppressCommandPort(const string& reason, bool suppress, bool manualOverride) { if (suppress) { blockCommandPort("LEGACY_" + reason); diff --git a/sqlitecluster/SQLiteServer.h b/sqlitecluster/SQLiteServer.h index 245892a66..2a0f8f4f5 100644 --- a/sqlitecluster/SQLiteServer.h +++ b/sqlitecluster/SQLiteServer.h @@ -17,4 +17,5 @@ class SQLiteServer : public STCPManager { // You must block and unblock the command port with *identical strings*. virtual void blockCommandPort(const string& reason); virtual void unblockCommandPort(const string& reason); + virtual void isCommandPortClosed(const string& reason); }; From fb549a89284e6d4e13a327f9e3fae14311346522 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 18:16:28 +0000 Subject: [PATCH 10/22] small changes --- BedrockServer.cpp | 2 ++ sqlitecluster/SQLiteNode.cpp | 9 +++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 47a66934a..66e6bf99d 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -1598,6 +1598,8 @@ virtual void isCommandPortClosed(const string& reason) { if (!strlen(reason)) { return _isCommandPortLikelyBlocked; } + // Get the shared mutex so we don't execute read operations while changing the set + shared_mutex lock(_portMutex); return _commandPortBlockReasons.find(reason) != _commandPortBlockReasons.end(); } diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 8dbc73692..405fd90a0 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1657,13 +1657,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); const string blockReason = "COMMITS_LAGGING_BEHIND"; - // If - if (!_isCommandPortLikelyBlocked && currentCommitCountDiff > 50'000) { - SINFO("Node is lagging behind, blocking command port so it can catch up."); + if (currentCommitCountDiff > 50'000 && !isCommandPortClosed(blockReason)) { + SINFO("Node is lagging behind, closing command port so it can catch up."); blockCommandPort(blockReason); - } else if (_isCommandPortLikelyBlocked && currentCommitCountDiff < 10'000 && _commandPortBlockReasons.find(blockReason) == _commandPortBlockReasons.end()) { - // We verify if we have the block reason we expected before calling unblock. Unblock would generate a warning, and we don't - // want to do that if don't really need to. + } else if (isCommandPortClosed(blockReason) && currentCommitCountDiff < 10'000) { SINFO("Node is caught up enough, unblocking command port."); unblockCommandPort(blockReason); } From 5962cdcdd3d4c5252c6484e1fd8141be1c87e763 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 18:18:40 +0000 Subject: [PATCH 11/22] fixing return type and adding to test plugin --- BedrockServer.cpp | 2 +- sqlitecluster/SQLiteServer.h | 2 +- test/tests/SQLiteNodeTest.cpp | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 66e6bf99d..5a6c2aec1 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -1594,7 +1594,7 @@ void BedrockServer::unblockCommandPort(const string& reason) { } } -virtual void isCommandPortClosed(const string& reason) { +virtual bool isCommandPortClosed(const string& reason) { if (!strlen(reason)) { return _isCommandPortLikelyBlocked; } diff --git a/sqlitecluster/SQLiteServer.h b/sqlitecluster/SQLiteServer.h index 2a0f8f4f5..c4e0e2913 100644 --- a/sqlitecluster/SQLiteServer.h +++ b/sqlitecluster/SQLiteServer.h @@ -17,5 +17,5 @@ class SQLiteServer : public STCPManager { // You must block and unblock the command port with *identical strings*. virtual void blockCommandPort(const string& reason); virtual void unblockCommandPort(const string& reason); - virtual void isCommandPortClosed(const string& reason); + virtual bool isCommandPortClosed(const string& reason); }; diff --git a/test/tests/SQLiteNodeTest.cpp b/test/tests/SQLiteNodeTest.cpp index 046c5e48a..3ef16ad7c 100644 --- a/test/tests/SQLiteNodeTest.cpp +++ b/test/tests/SQLiteNodeTest.cpp @@ -28,6 +28,7 @@ class TestServer : public SQLiteServer { virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {} virtual void blockCommandPort(const string& reason) { }; virtual void unblockCommandPort(const string& reason) { }; + virtual bool isCommandPortClosed(const string& reason) { return false; }; }; struct SQLiteNodeTest : tpunit::TestFixture { From 88c2980bff5391104726e1266065b51bf280d1b7 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 18:37:08 +0000 Subject: [PATCH 12/22] fixing locks issues --- BedrockServer.cpp | 31 +++++++++++++------------------ BedrockServer.h | 3 ++- sqlitecluster/SQLiteNode.cpp | 10 +++++----- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 5a6c2aec1..5e6a546b6 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -253,12 +253,7 @@ void BedrockServer::sync() // Process any activity in our plugins. AutoTimerTime postPollTime(postPollTimer); - _syncNode->postPoll(fdm, nextActivity, [&](const int64_t currentCommitCountDiff){ - // If the command port is already closed, we don't need to check anything and can early return. - if (_isCommandPortLikelyBlocked) { - return; - } - }); + _syncNode->postPoll(fdm, nextActivity); _syncNodeQueuedCommands.postPoll(fdm); _notifyDoneSync.postPoll(fdm); } @@ -1206,7 +1201,7 @@ bool BedrockServer::_wouldCrash(const unique_ptr& command) { } void BedrockServer::_resetServer() { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _requestCount = 0; _upgradeInProgress = false; @@ -1319,7 +1314,7 @@ BedrockServer::BedrockServer(const SData& args_) // Allow sending control commands when the server's not LEADING/FOLLOWING. SINFO("Opening control port on '" << args["-controlPort"] << "'"); { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _controlPort = openPort(args["-controlPort"]); } @@ -1375,7 +1370,7 @@ bool BedrockServer::shutdownComplete() { } void BedrockServer::prePoll(fd_map& fdm) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); // This will interrupt poll when we shut down. _notifyDone.prePoll(fdm); @@ -1401,7 +1396,7 @@ void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) { // NOTE: There are no sockets managed here, just ports. // Open the port the first time we enter a command-processing state { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); if (_commandPortBlockReasons.empty() && (getState() == SQLiteNodeState::LEADING || getState() == SQLiteNodeState::FOLLOWING) && _shutdownState.load() == RUNNING) { // Open the port @@ -1570,7 +1565,7 @@ void BedrockServer::_reply(unique_ptr& command) { void BedrockServer::blockCommandPort(const string& reason) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _commandPortBlockReasons.insert(reason); _isCommandPortLikelyBlocked = true; if (_commandPortBlockReasons.size() == 1) { @@ -1581,7 +1576,7 @@ void BedrockServer::blockCommandPort(const string& reason) { } void BedrockServer::unblockCommandPort(const string& reason) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); auto it = _commandPortBlockReasons.find(reason); if (it == _commandPortBlockReasons.end()) { SWARN("Tried to remove command port block because: " << reason << ", but it wasn't blocked for that reason!"); @@ -1594,12 +1589,12 @@ void BedrockServer::unblockCommandPort(const string& reason) { } } -virtual bool isCommandPortClosed(const string& reason) { - if (!strlen(reason)) { +bool BedrockServer::isCommandPortClosed(const string& reason) { + if (reason.empty()) { return _isCommandPortLikelyBlocked; } // Get the shared mutex so we don't execute read operations while changing the set - shared_mutex lock(_portMutex); + shared_lock lock(_portMutex); return _commandPortBlockReasons.find(reason) != _commandPortBlockReasons.end(); } @@ -1741,7 +1736,7 @@ void BedrockServer::_status(unique_ptr& command) { } { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); content["commandPortBlockReasons"] = SComposeJSONArray(_commandPortBlockReasons); } @@ -2015,7 +2010,7 @@ void BedrockServer::_beginShutdown(const string& reason, bool detach) { // down, so otherwise there's a race condition where that happens just after we close them but before we // change the state. { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _commandPortPublic = nullptr; _commandPortPrivate = nullptr; if (!_detach) { @@ -2094,7 +2089,7 @@ void BedrockServer::_acceptSockets() { // Lock _portMutex so suppressing the port does not cause it to be null // in the middle of this function. - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); for (auto& p : _portPluginMap) { portList.push_back(reference_wrapper>(p.first)); diff --git a/BedrockServer.h b/BedrockServer.h index a23bcc886..77982ce5e 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -184,6 +184,7 @@ class BedrockServer : public SQLiteServer { // You must block and unblock the command port with *identical strings*. void blockCommandPort(const string& reason) override; void unblockCommandPort(const string& reason) override; + bool isCommandPortClosed(const string& reason) override; // Legacy version of above. void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false); @@ -377,7 +378,7 @@ class BedrockServer : public SQLiteServer { atomic _detach; // Pointers to the ports on which we accept commands. - mutex _portMutex; + shared_mutex _portMutex; // The "control port" is intended to be open to privileged clients (i.e., localhost and other nodes in the Bedrock // cluster) it can be used to run any command including commands meant for cluster operations, changing server diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 405fd90a0..59843c842 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1657,12 +1657,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); const string blockReason = "COMMITS_LAGGING_BEHIND"; - if (currentCommitCountDiff > 50'000 && !isCommandPortClosed(blockReason)) { + if (currentCommitDifference > 50'000 && !_server.isCommandPortClosed(blockReason)) { SINFO("Node is lagging behind, closing command port so it can catch up."); - blockCommandPort(blockReason); - } else if (isCommandPortClosed(blockReason) && currentCommitCountDiff < 10'000) { + _server.blockCommandPort(blockReason); + } else if (currentCommitDifference < 10'000 && _server.isCommandPortClosed(blockReason)) { SINFO("Node is caught up enough, unblocking command port."); - unblockCommandPort(blockReason); + _server.unblockCommandPort(blockReason); } } catch (const system_error& e) { // If the server is strugling and falling behind on replication, we might have too many threads @@ -2644,7 +2644,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { size_t messagesDeqeued = 0; while (true) { SData message = peer->popMessage(); - _onMESSAGE(peer, messagem); + _onMESSAGE(peer, message); messagesDeqeued++; if (messagesDeqeued >= 100) { // We should run again immediately, we have more to do. From d6f3d80b4dc38659517e4ceb74cbf51cf63b7790 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 18:49:19 +0000 Subject: [PATCH 13/22] fixing methods to be true virtual --- sqlitecluster/SQLiteServer.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlitecluster/SQLiteServer.h b/sqlitecluster/SQLiteServer.h index c4e0e2913..c5b82d4c0 100644 --- a/sqlitecluster/SQLiteServer.h +++ b/sqlitecluster/SQLiteServer.h @@ -15,7 +15,7 @@ class SQLiteServer : public STCPManager { virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) = 0; // You must block and unblock the command port with *identical strings*. - virtual void blockCommandPort(const string& reason); - virtual void unblockCommandPort(const string& reason); - virtual bool isCommandPortClosed(const string& reason); + virtual void blockCommandPort(const string& reason) = 0; + virtual void unblockCommandPort(const string& reason) = 0; + virtual bool isCommandPortClosed(const string& reason) = 0; }; From a7dffc08f483b9f1e8bafade4a67cf69a7390f00 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 19:28:17 +0000 Subject: [PATCH 14/22] addressing comments --- BedrockServer.cpp | 27 +++++++++------------------ BedrockServer.h | 3 +-- sqlitecluster/SQLiteNode.cpp | 9 --------- sqlitecluster/SQLiteNode.h | 3 ++- sqlitecluster/SQLiteServer.h | 1 - test/tests/SQLiteNodeTest.cpp | 1 - 6 files changed, 12 insertions(+), 32 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 5e6a546b6..c0cad65f3 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -1201,7 +1201,7 @@ bool BedrockServer::_wouldCrash(const unique_ptr& command) { } void BedrockServer::_resetServer() { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _requestCount = 0; _upgradeInProgress = false; @@ -1314,7 +1314,7 @@ BedrockServer::BedrockServer(const SData& args_) // Allow sending control commands when the server's not LEADING/FOLLOWING. SINFO("Opening control port on '" << args["-controlPort"] << "'"); { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _controlPort = openPort(args["-controlPort"]); } @@ -1370,7 +1370,7 @@ bool BedrockServer::shutdownComplete() { } void BedrockServer::prePoll(fd_map& fdm) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); // This will interrupt poll when we shut down. _notifyDone.prePoll(fdm); @@ -1396,7 +1396,7 @@ void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) { // NOTE: There are no sockets managed here, just ports. // Open the port the first time we enter a command-processing state { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); if (_commandPortBlockReasons.empty() && (getState() == SQLiteNodeState::LEADING || getState() == SQLiteNodeState::FOLLOWING) && _shutdownState.load() == RUNNING) { // Open the port @@ -1565,7 +1565,7 @@ void BedrockServer::_reply(unique_ptr& command) { void BedrockServer::blockCommandPort(const string& reason) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _commandPortBlockReasons.insert(reason); _isCommandPortLikelyBlocked = true; if (_commandPortBlockReasons.size() == 1) { @@ -1576,7 +1576,7 @@ void BedrockServer::blockCommandPort(const string& reason) { } void BedrockServer::unblockCommandPort(const string& reason) { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); auto it = _commandPortBlockReasons.find(reason); if (it == _commandPortBlockReasons.end()) { SWARN("Tried to remove command port block because: " << reason << ", but it wasn't blocked for that reason!"); @@ -1589,15 +1589,6 @@ void BedrockServer::unblockCommandPort(const string& reason) { } } -bool BedrockServer::isCommandPortClosed(const string& reason) { - if (reason.empty()) { - return _isCommandPortLikelyBlocked; - } - // Get the shared mutex so we don't execute read operations while changing the set - shared_lock lock(_portMutex); - return _commandPortBlockReasons.find(reason) != _commandPortBlockReasons.end(); -} - void BedrockServer::suppressCommandPort(const string& reason, bool suppress, bool manualOverride) { if (suppress) { blockCommandPort("LEGACY_" + reason); @@ -1736,7 +1727,7 @@ void BedrockServer::_status(unique_ptr& command) { } { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); content["commandPortBlockReasons"] = SComposeJSONArray(_commandPortBlockReasons); } @@ -2010,7 +2001,7 @@ void BedrockServer::_beginShutdown(const string& reason, bool detach) { // down, so otherwise there's a race condition where that happens just after we close them but before we // change the state. { - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); _commandPortPublic = nullptr; _commandPortPrivate = nullptr; if (!_detach) { @@ -2089,7 +2080,7 @@ void BedrockServer::_acceptSockets() { // Lock _portMutex so suppressing the port does not cause it to be null // in the middle of this function. - lock_guard lock(_portMutex); + lock_guard lock(_portMutex); for (auto& p : _portPluginMap) { portList.push_back(reference_wrapper>(p.first)); diff --git a/BedrockServer.h b/BedrockServer.h index 77982ce5e..a23bcc886 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -184,7 +184,6 @@ class BedrockServer : public SQLiteServer { // You must block and unblock the command port with *identical strings*. void blockCommandPort(const string& reason) override; void unblockCommandPort(const string& reason) override; - bool isCommandPortClosed(const string& reason) override; // Legacy version of above. void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false); @@ -378,7 +377,7 @@ class BedrockServer : public SQLiteServer { atomic _detach; // Pointers to the ports on which we accept commands. - shared_mutex _portMutex; + mutex _portMutex; // The "control port" is intended to be open to privileged clients (i.e., localhost and other nodes in the Bedrock // cluster) it can be used to run any command including commands meant for cluster operations, changing server diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 59843c842..21a958594 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1655,15 +1655,6 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } } } - const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); - const string blockReason = "COMMITS_LAGGING_BEHIND"; - if (currentCommitDifference > 50'000 && !_server.isCommandPortClosed(blockReason)) { - SINFO("Node is lagging behind, closing command port so it can catch up."); - _server.blockCommandPort(blockReason); - } else if (currentCommitDifference < 10'000 && _server.isCommandPortClosed(blockReason)) { - SINFO("Node is caught up enough, unblocking command port."); - _server.unblockCommandPort(blockReason); - } } catch (const system_error& e) { // If the server is strugling and falling behind on replication, we might have too many threads // causing a resource exhaustion. If that happens, all the transactions that are already threaded diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 817a4e9b9..dbdd17879 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -364,9 +364,10 @@ class SQLiteNode : public STCPManager { // Stopwatch to track if we're giving up on the server preventing a standdown. SStopwatch _standDownTimeout; - // Our current State. + // Our current State. atomic _state; + atomic _blockedCommandPort{false}; // This is an integer that increments every time we change states. This is useful for responses to state changes // (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change, // and not stale responses to old changes. diff --git a/sqlitecluster/SQLiteServer.h b/sqlitecluster/SQLiteServer.h index c5b82d4c0..9b10e3835 100644 --- a/sqlitecluster/SQLiteServer.h +++ b/sqlitecluster/SQLiteServer.h @@ -17,5 +17,4 @@ class SQLiteServer : public STCPManager { // You must block and unblock the command port with *identical strings*. virtual void blockCommandPort(const string& reason) = 0; virtual void unblockCommandPort(const string& reason) = 0; - virtual bool isCommandPortClosed(const string& reason) = 0; }; diff --git a/test/tests/SQLiteNodeTest.cpp b/test/tests/SQLiteNodeTest.cpp index 3ef16ad7c..046c5e48a 100644 --- a/test/tests/SQLiteNodeTest.cpp +++ b/test/tests/SQLiteNodeTest.cpp @@ -28,7 +28,6 @@ class TestServer : public SQLiteServer { virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {} virtual void blockCommandPort(const string& reason) { }; virtual void unblockCommandPort(const string& reason) { }; - virtual bool isCommandPortClosed(const string& reason) { return false; }; }; struct SQLiteNodeTest : tpunit::TestFixture { From 0434c8b14f76a4fa584f090080f82b0335aee405 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 19:32:28 +0000 Subject: [PATCH 15/22] addressing comment --- sqlitecluster/SQLiteNode.cpp | 14 +++++++++++++- sqlitecluster/SQLiteNode.h | 4 +++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 21a958594..2c184c523 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1278,7 +1278,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->commandAddress = message["commandAddress"]; } peer->setCommit(message.calcU64("CommitCount"), message["Hash"]); - + + // We check the commit difference with 12,500 commits behind because that + // represents ~30s of commits. If we're behind, let's close the command port + // so we can catch up with the cluster before processing new commands. + const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); + const string blockReason = "COMMITS_LAGGING_BEHIND"; + if (currentCommitDifference > 12'500 && !_blockedCommandPort) { + SINFO("Node is lagging behind, closing command port so it can catch up."); + _server.blockCommandPort(blockReason); + } else if (currentCommitDifference < 10'000 && _blockedCommandPort) { + SINFO("Node is caught up enough, unblocking command port."); + _server.unblockCommandPort(blockReason); + } // Classify and process the message if (SIEquals(message.methodLine, "LOGIN")) { // LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index dbdd17879..793b8c287 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -367,7 +367,9 @@ class SQLiteNode : public STCPManager { // Our current State. atomic _state; - atomic _blockedCommandPort{false}; + // Keeps track if we have closed the command port for commits fallen behind. + bool _blockedCommandPort{false}; + // This is an integer that increments every time we change states. This is useful for responses to state changes // (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change, // and not stale responses to old changes. From 6f5596f37cb172620918e8a828b8924ad4da9893 Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Mon, 23 Dec 2024 20:56:42 +0000 Subject: [PATCH 16/22] last change --- sqlitecluster/SQLiteNode.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 2c184c523..d40372b92 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1282,12 +1282,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // We check the commit difference with 12,500 commits behind because that // represents ~30s of commits. If we're behind, let's close the command port // so we can catch up with the cluster before processing new commands. - const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); const string blockReason = "COMMITS_LAGGING_BEHIND"; - if (currentCommitDifference > 12'500 && !_blockedCommandPort) { + const int64_t currentCommitDifference = getCommitCount() - peer->commitCount; + if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) { SINFO("Node is lagging behind, closing command port so it can catch up."); _server.blockCommandPort(blockReason); - } else if (currentCommitDifference < 10'000 && _blockedCommandPort) { + } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { SINFO("Node is caught up enough, unblocking command port."); _server.unblockCommandPort(blockReason); } From 2d023066835673dd411882602ef676b5fd90e65d Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Mon, 23 Dec 2024 18:57:44 -0300 Subject: [PATCH 17/22] Update SQLiteNode.cpp --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d40372b92..74339d29f 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1283,7 +1283,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // represents ~30s of commits. If we're behind, let's close the command port // so we can catch up with the cluster before processing new commands. const string blockReason = "COMMITS_LAGGING_BEHIND"; - const int64_t currentCommitDifference = getCommitCount() - peer->commitCount; + const int64_t currentCommitDifference = peer->commitCount - getCommitCount(); if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) { SINFO("Node is lagging behind, closing command port so it can catch up."); _server.blockCommandPort(blockReason); From bd8d47076c02c501bc789c9f5284577d50a20ed7 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Mon, 23 Dec 2024 18:59:15 -0300 Subject: [PATCH 18/22] Update SQLiteNode.cpp --- sqlitecluster/SQLiteNode.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 74339d29f..d22f55ea8 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1287,9 +1287,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) { SINFO("Node is lagging behind, closing command port so it can catch up."); _server.blockCommandPort(blockReason); + _blockedCommandPort = true; } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { SINFO("Node is caught up enough, unblocking command port."); _server.unblockCommandPort(blockReason); + _blockedCommandPort = false; } // Classify and process the message if (SIEquals(message.methodLine, "LOGIN")) { From ff4dacd864cedffd2d3efe38ddd531262b4b6fe2 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 24 Dec 2024 16:05:26 -0300 Subject: [PATCH 19/22] Update sqlitecluster/SQLiteNode.cpp Co-authored-by: Carlos Alvarez --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d22f55ea8..b86d51422 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1285,7 +1285,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { const string blockReason = "COMMITS_LAGGING_BEHIND"; const int64_t currentCommitDifference = peer->commitCount - getCommitCount(); if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) { - SINFO("Node is lagging behind, closing command port so it can catch up."); + SINFO("Node is behind by " + SToStr(currentCommitDifference) + " commits, closing command port so it can catch up."); _server.blockCommandPort(blockReason); _blockedCommandPort = true; } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { From 2d25cab4aec6fca76dc38bae251f723306c3e543 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 24 Dec 2024 16:06:56 -0300 Subject: [PATCH 20/22] Update sqlitecluster/SQLiteNode.cpp Co-authored-by: Carlos Alvarez --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index b86d51422..a5ce3992e 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1289,7 +1289,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { _server.blockCommandPort(blockReason); _blockedCommandPort = true; } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { - SINFO("Node is caught up enough, unblocking command port."); + SINFO("Node is caught up enough (behind by " + SToStr(currentCommitDifference) + " commits), re-opening command port."); _server.unblockCommandPort(blockReason); _blockedCommandPort = false; } From a47789745b3a4cb5f06c63be79e3c85f00520c3c Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Tue, 24 Dec 2024 19:10:29 +0000 Subject: [PATCH 21/22] addressing comments --- sqlitecluster/SQLiteNode.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index a5ce3992e..c8e27e19a 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1289,6 +1289,8 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { _server.blockCommandPort(blockReason); _blockedCommandPort = true; } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { + // We'll open the command port again if we're 1k commits behind, which + // translates to ~2s of commits. SINFO("Node is caught up enough (behind by " + SToStr(currentCommitDifference) + " commits), re-opening command port."); _server.unblockCommandPort(blockReason); _blockedCommandPort = false; From 9da7bf279f58d91ee36a46e26cbd2524f9cb000f Mon Sep 17 00:00:00 2001 From: danieldoglas Date: Tue, 24 Dec 2024 19:11:18 +0000 Subject: [PATCH 22/22] changing variable name --- sqlitecluster/SQLiteNode.cpp | 8 ++++---- sqlitecluster/SQLiteNode.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index c8e27e19a..4db1beca1 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1284,16 +1284,16 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // so we can catch up with the cluster before processing new commands. const string blockReason = "COMMITS_LAGGING_BEHIND"; const int64_t currentCommitDifference = peer->commitCount - getCommitCount(); - if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) { + if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPortForBeingBehind) { SINFO("Node is behind by " + SToStr(currentCommitDifference) + " commits, closing command port so it can catch up."); _server.blockCommandPort(blockReason); - _blockedCommandPort = true; - } else if (currentCommitDifference < 1'000 && _blockedCommandPort) { + _blockedCommandPortForBeingBehind = true; + } else if (currentCommitDifference < 1'000 && _blockedCommandPortForBeingBehind) { // We'll open the command port again if we're 1k commits behind, which // translates to ~2s of commits. SINFO("Node is caught up enough (behind by " + SToStr(currentCommitDifference) + " commits), re-opening command port."); _server.unblockCommandPort(blockReason); - _blockedCommandPort = false; + _blockedCommandPortForBeingBehind = false; } // Classify and process the message if (SIEquals(message.methodLine, "LOGIN")) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 793b8c287..6377c9511 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -368,7 +368,7 @@ class SQLiteNode : public STCPManager { atomic _state; // Keeps track if we have closed the command port for commits fallen behind. - bool _blockedCommandPort{false}; + bool _blockedCommandPortForBeingBehind{false}; // This is an integer that increments every time we change states. This is useful for responses to state changes // (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change,