Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking command port when node is not caught up #2039

Merged
merged 22 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class BedrockServer : public SQLiteServer {
void onNodeLogin(SQLitePeer* peer) override;

// You must block and unblock the command port with *identical strings*.
void blockCommandPort(const string& reason);
void unblockCommandPort(const string& reason);
void blockCommandPort(const string& reason) override;
void unblockCommandPort(const string& reason) override;

// Legacy version of above.
void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false);
Expand Down
14 changes: 13 additions & 1 deletion sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
peer->commandAddress = message["commandAddress"];
}
peer->setCommit(message.calcU64("CommitCount"), message["Hash"]);


// We check the commit difference with 12,500 commits behind because that
// represents ~30s of commits. If we're behind, let's close the command port
// so we can catch up with the cluster before processing new commands.
const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount();
danieldoglas marked this conversation as resolved.
Show resolved Hide resolved
const string blockReason = "COMMITS_LAGGING_BEHIND";
if (currentCommitDifference > 12'500 && !_blockedCommandPort) {
SINFO("Node is lagging behind, closing command port so it can catch up.");
danieldoglas marked this conversation as resolved.
Show resolved Hide resolved
_server.blockCommandPort(blockReason);
} else if (currentCommitDifference < 10'000 && _blockedCommandPort) {
SINFO("Node is caught up enough, unblocking command port.");
danieldoglas marked this conversation as resolved.
Show resolved Hide resolved
_server.unblockCommandPort(blockReason);
}
// Classify and process the message
if (SIEquals(message.methodLine, "LOGIN")) {
// LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of
Expand Down
5 changes: 4 additions & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,12 @@ class SQLiteNode : public STCPManager {
// Stopwatch to track if we're giving up on the server preventing a standdown.
SStopwatch _standDownTimeout;

// Our current State.
// Our current State.
atomic<SQLiteNodeState> _state;

// Keeps track if we have closed the command port for commits fallen behind.
bool _blockedCommandPort{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we block the command port for other reasons, we should put the fact that this is being set based on the node being behind on the variable name, for clarity

Suggested change
bool _blockedCommandPort{false};
bool _blockedCommandPortForBeingBehind{false};

But do we really need this variable? Is there an easy way to check the reason the command port is blocked so we can do something like

_blockedCommandPortForBeingBehind = getReason() == "COMMITS_LAGGING_BEHIND";
if (peer == _leadPeer && currentCommitDifference >= 12'500 && !_blockedCommandPort) {
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But do we really need this variable? Is there an easy way to check the reason the command port is blocked so we can do something like

tl;dr: Adding this as a separate variable so we don't need to add complexity to the current methods related to block reasons. We had an implementation like you're suggesting before, but that would require adding new locks/changing current lock types.

This was discussed in this thread: https://expensify.slack.com/archives/CC7NECV4L/p1734981555759429?thread_ts=1733270912.693829&cid=CC7NECV4L


// This is an integer that increments every time we change states. This is useful for responses to state changes
// (i.e., approving standup) to verify that the messages we're receiving are relevant to the current state change,
// and not stale responses to old changes.
Expand Down
4 changes: 4 additions & 0 deletions sqlitecluster/SQLiteServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ class SQLiteServer : public STCPManager {

// We call this method whenever a node changes state
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) = 0;

// You must block and unblock the command port with *identical strings*.
virtual void blockCommandPort(const string& reason) = 0;
virtual void unblockCommandPort(const string& reason) = 0;
};
2 changes: 2 additions & 0 deletions test/tests/SQLiteNodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class TestServer : public SQLiteServer {
virtual bool canStandDown() { return true; }
virtual void onNodeLogin(SQLitePeer* peer) { }
virtual void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) {}
virtual void blockCommandPort(const string& reason) { };
virtual void unblockCommandPort(const string& reason) { };
};

struct SQLiteNodeTest : tpunit::TestFixture {
Expand Down
Loading