diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 21a958594..2c184c523 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1278,7 +1278,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { peer->commandAddress = message["commandAddress"]; } peer->setCommit(message.calcU64("CommitCount"), message["Hash"]); - + + // We check the commit difference with 12,500 commits behind because that + // represents ~30s of commits. If we're behind, let's close the command port + // so we can catch up with the cluster before processing new commands. + const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount(); + const string blockReason = "COMMITS_LAGGING_BEHIND"; + if (currentCommitDifference > 12'500 && !_blockedCommandPort) { + SINFO("Node is lagging behind, closing command port so it can catch up."); + _server.blockCommandPort(blockReason); + } else if (currentCommitDifference < 10'000 && _blockedCommandPort) { + SINFO("Node is caught up enough, unblocking command port."); + _server.unblockCommandPort(blockReason); + } // Classify and process the message if (SIEquals(message.methodLine, "LOGIN")) { // LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index dbdd17879..793b8c287 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -367,7 +367,9 @@ class SQLiteNode : public STCPManager { // Our current State. atomic _state; - atomic _blockedCommandPort{false}; + // Keeps track if we have closed the command port for commits fallen behind. + bool _blockedCommandPort{false}; + // This is an integer that increments every time we change states. This is useful for responses to state changes // (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change, // and not stale responses to old changes.