Skip to content

Commit

Permalink
fixing locks issues
Browse files Browse the repository at this point in the history
  • Loading branch information
danieldoglas committed Dec 23, 2024
1 parent 5962cdc commit 88c2980
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
31 changes: 13 additions & 18 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,7 @@ void BedrockServer::sync()

// Process any activity in our plugins.
AutoTimerTime postPollTime(postPollTimer);
_syncNode->postPoll(fdm, nextActivity, [&](const int64_t currentCommitCountDiff){
// If the command port is already closed, we don't need to check anything and can early return.
if (_isCommandPortLikelyBlocked) {
return;
}
});
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
_notifyDoneSync.postPoll(fdm);
}
Expand Down Expand Up @@ -1206,7 +1201,7 @@ bool BedrockServer::_wouldCrash(const unique_ptr<BedrockCommand>& command) {
}

void BedrockServer::_resetServer() {
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);

_requestCount = 0;
_upgradeInProgress = false;
Expand Down Expand Up @@ -1319,7 +1314,7 @@ BedrockServer::BedrockServer(const SData& args_)
// Allow sending control commands when the server's not LEADING/FOLLOWING.
SINFO("Opening control port on '" << args["-controlPort"] << "'");
{
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
_controlPort = openPort(args["-controlPort"]);
}

Expand Down Expand Up @@ -1375,7 +1370,7 @@ bool BedrockServer::shutdownComplete() {
}

void BedrockServer::prePoll(fd_map& fdm) {
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);

// This will interrupt poll when we shut down.
_notifyDone.prePoll(fdm);
Expand All @@ -1401,7 +1396,7 @@ void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// NOTE: There are no sockets managed here, just ports.
// Open the port the first time we enter a command-processing state
{
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
if (_commandPortBlockReasons.empty() && (getState() == SQLiteNodeState::LEADING || getState() == SQLiteNodeState::FOLLOWING) && _shutdownState.load() == RUNNING) {

// Open the port
Expand Down Expand Up @@ -1570,7 +1565,7 @@ void BedrockServer::_reply(unique_ptr<BedrockCommand>& command) {


void BedrockServer::blockCommandPort(const string& reason) {
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
_commandPortBlockReasons.insert(reason);
_isCommandPortLikelyBlocked = true;
if (_commandPortBlockReasons.size() == 1) {
Expand All @@ -1581,7 +1576,7 @@ void BedrockServer::blockCommandPort(const string& reason) {
}

void BedrockServer::unblockCommandPort(const string& reason) {
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
auto it = _commandPortBlockReasons.find(reason);
if (it == _commandPortBlockReasons.end()) {
SWARN("Tried to remove command port block because: " << reason << ", but it wasn't blocked for that reason!");
Expand All @@ -1594,12 +1589,12 @@ void BedrockServer::unblockCommandPort(const string& reason) {
}
}

virtual bool isCommandPortClosed(const string& reason) {
if (!strlen(reason)) {
bool BedrockServer::isCommandPortClosed(const string& reason) {
if (reason.empty()) {
return _isCommandPortLikelyBlocked;
}
// Get the shared mutex so we don't execute read operations while changing the set
shared_mutex<mutex> lock(_portMutex);
shared_lock<decltype(_portMutex)> lock(_portMutex);
return _commandPortBlockReasons.find(reason) != _commandPortBlockReasons.end();
}

Expand Down Expand Up @@ -1741,7 +1736,7 @@ void BedrockServer::_status(unique_ptr<BedrockCommand>& command) {
}

{
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
content["commandPortBlockReasons"] = SComposeJSONArray(_commandPortBlockReasons);
}

Expand Down Expand Up @@ -2015,7 +2010,7 @@ void BedrockServer::_beginShutdown(const string& reason, bool detach) {
// down, so otherwise there's a race condition where that happens just after we close them but before we
// change the state.
{
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);
_commandPortPublic = nullptr;
_commandPortPrivate = nullptr;
if (!_detach) {
Expand Down Expand Up @@ -2094,7 +2089,7 @@ void BedrockServer::_acceptSockets() {

// Lock _portMutex so suppressing the port does not cause it to be null
// in the middle of this function.
lock_guard<mutex> lock(_portMutex);
lock_guard<decltype(_portMutex)> lock(_portMutex);

for (auto& p : _portPluginMap) {
portList.push_back(reference_wrapper<const unique_ptr<Port>>(p.first));
Expand Down
3 changes: 2 additions & 1 deletion BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class BedrockServer : public SQLiteServer {
// You must block and unblock the command port with *identical strings*.
void blockCommandPort(const string& reason) override;
void unblockCommandPort(const string& reason) override;
bool isCommandPortClosed(const string& reason) override;

// Legacy version of above.
void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false);
Expand Down Expand Up @@ -377,7 +378,7 @@ class BedrockServer : public SQLiteServer {
atomic<bool> _detach;

// Pointers to the ports on which we accept commands.
mutex _portMutex;
shared_mutex _portMutex;

// The "control port" is intended to be open to privileged clients (i.e., localhost and other nodes in the Bedrock
// cluster) it can be used to run any command including commands meant for cluster operations, changing server
Expand Down
10 changes: 5 additions & 5 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,12 +1657,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
const int64_t currentCommitDifference = message.calcU64("NewCount") - getCommitCount();
const string blockReason = "COMMITS_LAGGING_BEHIND";
if (currentCommitCountDiff > 50'000 && !isCommandPortClosed(blockReason)) {
if (currentCommitDifference > 50'000 && !_server.isCommandPortClosed(blockReason)) {
SINFO("Node is lagging behind, closing command port so it can catch up.");
blockCommandPort(blockReason);
} else if (isCommandPortClosed(blockReason) && currentCommitCountDiff < 10'000) {
_server.blockCommandPort(blockReason);
} else if (currentCommitDifference < 10'000 && _server.isCommandPortClosed(blockReason)) {
SINFO("Node is caught up enough, unblocking command port.");
unblockCommandPort(blockReason);
_server.unblockCommandPort(blockReason);
}
} catch (const system_error& e) {
// If the server is strugling and falling behind on replication, we might have too many threads
Expand Down Expand Up @@ -2644,7 +2644,7 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
size_t messagesDeqeued = 0;
while (true) {
SData message = peer->popMessage();
_onMESSAGE(peer, messagem);
_onMESSAGE(peer, message);
messagesDeqeued++;
if (messagesDeqeued >= 100) {
// We should run again immediately, we have more to do.
Expand Down

0 comments on commit 88c2980

Please sign in to comment.