Skip to content

Commit

Permalink
Merge pull request #2039 from Expensify/dsilva_blockingCommandPortWhe…
Browse files Browse the repository at this point in the history
…nNotCaughtUp

Blocking command port when node is not caught up
  • Loading branch information
tylerkaraszewski authored Dec 26, 2024
2 parents 05b6d65 + 9da7bf2 commit e218e46
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 4 deletions.
4 changes: 2 additions & 2 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class BedrockServer : public SQLiteServer {
void onNodeLogin(SQLitePeer* peer) override;

// You must block and unblock the command port with *identical strings*.
void blockCommandPort(const string& reason);
void unblockCommandPort(const string& reason);
void blockCommandPort(const string& reason) override;
void unblockCommandPort(const string& reason) override;

// Legacy version of above.
void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false);
Expand Down
18 changes: 17 additions & 1 deletion sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,23 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
peer->commandAddress = message["commandAddress"];
}
peer->setCommit(message.calcU64("CommitCount"), message["Hash"]);


// We check the commit difference with 12,500 commits behind because that
// represents ~30s of commits. If we're behind, let's close the command port
// so we can catch up with the cluster before processing new commands.
const string blockReason = "COMMITS_LAGGING_BEHIND";
const int64_t currentCommitDifference = peer->commitCount - getCommitCount();
if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPortForBeingBehind) {
SINFO("Node is behind by " + SToStr(currentCommitDifference) + " commits, closing command port so it can catch up.");
_server.blockCommandPort(blockReason);
_blockedCommandPortForBeingBehind = true;
} else if (currentCommitDifference < 1'000 && _blockedCommandPortForBeingBehind) {
// We'll open the command port again if we're 1k commits behind, which
// translates to ~2s of commits.
SINFO("Node is caught up enough (behind by " + SToStr(currentCommitDifference) + " commits), re-opening command port.");
_server.unblockCommandPort(blockReason);
_blockedCommandPortForBeingBehind = false;
}
// Classify and process the message
if (SIEquals(message.methodLine, "LOGIN")) {
// LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of
Expand Down
5 changes: 4 additions & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,12 @@ class SQLiteNode : public STCPManager {
// Stopwatch to track if we're giving up on the server preventing a standdown.
SStopwatch _standDownTimeout;

// Our current State.
// Our current State.
atomic<SQLiteNodeState> _state;

// Keeps track if we have closed the command port for commits fallen behind.
bool _blockedCommandPortForBeingBehind{false};

// This is an integer that increments every time we change states. This is useful for responses to state changes
// (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change,
// and not stale responses to old changes.
Expand Down
4 changes: 4 additions & 0 deletions sqlitecluster/SQLiteServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ class SQLiteServer : public STCPManager {

// We call this method whenever a node changes state
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) = 0;

// You must block and unblock the command port with *identical strings*.
virtual void blockCommandPort(const string& reason) = 0;
virtual void unblockCommandPort(const string& reason) = 0;
};
2 changes: 2 additions & 0 deletions test/tests/SQLiteNodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TestServer : public SQLiteServer {
virtual bool canStandDown() { return true; }
virtual void onNodeLogin(SQLitePeer* peer) { }
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {}
virtual void blockCommandPort(const string& reason) { };
virtual void unblockCommandPort(const string& reason) { };
};

struct SQLiteNodeTest : tpunit::TestFixture {
Expand Down

0 comments on commit e218e46

Please sign in to comment.