Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking command port when node is not caught up #2039

Merged
merged 22 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class BedrockServer : public SQLiteServer {
void onNodeLogin(SQLitePeer* peer) override;

// You must block and unblock the command port with *identical strings*.
void blockCommandPort(const string& reason);
void unblockCommandPort(const string& reason);
void blockCommandPort(const string& reason) override;
void unblockCommandPort(const string& reason) override;

// Legacy version of above.
void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false);
Expand Down
18 changes: 17 additions & 1 deletion sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,23 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
peer->commandAddress = message["commandAddress"];
}
peer->setCommit(message.calcU64("CommitCount"), message["Hash"]);


// We check the commit difference with 12,500 commits behind because that
// represents ~30s of commits. If we're behind, let's close the command port
// so we can catch up with the cluster before processing new commands.
const string blockReason = "COMMITS_LAGGING_BEHIND";
const int64_t currentCommitDifference = peer->commitCount - getCommitCount();
if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPortForBeingBehind) {
SINFO("Node is behind by " + SToStr(currentCommitDifference) + " commits, closing command port so it can catch up.");
_server.blockCommandPort(blockReason);
_blockedCommandPortForBeingBehind = true;
} else if (currentCommitDifference < 1'000 && _blockedCommandPortForBeingBehind) {
// We'll open the command port again if we're 1k commits behind, which
// translates to ~2s of commits.
SINFO("Node is caught up enough (behind by " + SToStr(currentCommitDifference) + " commits), re-opening command port.");
_server.unblockCommandPort(blockReason);
_blockedCommandPortForBeingBehind = false;
}
// Classify and process the message
if (SIEquals(message.methodLine, "LOGIN")) {
// LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of
Expand Down
5 changes: 4 additions & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,12 @@ class SQLiteNode : public STCPManager {
// Stopwatch to track if we're giving up on the server preventing a standdown.
SStopwatch _standDownTimeout;

// Our current State.
// Our current State.
atomic<SQLiteNodeState> _state;

// Keeps track if we have closed the command port for commits fallen behind.
bool _blockedCommandPortForBeingBehind{false};

// This is an integer that increments every time we change states. This is useful for responses to state changes
// (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change,
// and not stale responses to old changes.
Expand Down
4 changes: 4 additions & 0 deletions sqlitecluster/SQLiteServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ class SQLiteServer : public STCPManager {

// We call this method whenever a node changes state
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) = 0;

// You must block and unblock the command port with *identical strings*.
virtual void blockCommandPort(const string& reason) = 0;
virtual void unblockCommandPort(const string& reason) = 0;
};
2 changes: 2 additions & 0 deletions test/tests/SQLiteNodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TestServer : public SQLiteServer {
virtual bool canStandDown() { return true; }
virtual void onNodeLogin(SQLitePeer* peer) { }
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {}
virtual void blockCommandPort(const string& reason) { };
virtual void unblockCommandPort(const string& reason) { };
};

struct SQLiteNodeTest : tpunit::TestFixture {
Expand Down