Skip to content

Commit

Permalink
Merge pull request #1577 from Expensify/tyler-maintain-sync
Browse files Browse the repository at this point in the history
Maintain cluster syncronization without quorum commands
  • Loading branch information
bondydaa authored Oct 25, 2023
2 parents 7de1ae4 + aa675f7 commit d89b94d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 49 deletions.
15 changes: 15 additions & 0 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,7 @@ bool BedrockServer::_isControlCommand(const unique_ptr<BedrockCommand>& command)
SIEquals(command->request.methodLine, "EnableSQLTracing") ||
SIEquals(command->request.methodLine, "BlockWrites") ||
SIEquals(command->request.methodLine, "UnblockWrites") ||
SIEquals(command->request.methodLine, "SetMaxPeerFallBehind") ||
SIEquals(command->request.methodLine, "CRASH_COMMAND")
) {
return true;
Expand Down Expand Up @@ -1929,6 +1930,20 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
__quiesceThread = nullptr;
response.methodLine = "200 Unblocked";
}
} else if (SIEquals(command->request.methodLine, "SetMaxPeerFallBehind")) {
// Look up the existing value so we can report what it was.
uint64_t existingValue = SQLiteNode::MAX_PEER_FALL_BEHIND;
response["previousValue"] = to_string(existingValue);

uint64_t newValue = command->request.calcU64("value");
if (newValue < SQLiteNode::MIN_APPROVE_FREQUENCY) {
// We won't break everything on purpose. This can be used to check the existing value without changing anything by passing `0`.
response.methodLine = "400 Refusing to set peer fall behind below " + to_string(SQLiteNode::MIN_APPROVE_FREQUENCY);
} else {
// Set the new value and return 200 OK.
SQLiteNode::MAX_PEER_FALL_BEHIND = newValue;
response["previousValue"] = to_string(existingValue);
}
}
}

Expand Down
12 changes: 10 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,21 @@ SQLite::~SQLite() {
}

void SQLite::exclusiveLockDB() {
_sharedData.writeLock.lock();
// This order is important and not intuitive. It seems like we should lock `writeLock` before `commitLock` because that's the order these occur in a typical database operation,
// however, there are two possible flows here. For a non-blocking transaction (i.e., one run in parallel with other transactions), the lock order is:
// writeLock, commitLock, but importantly, in this case, these are not locked simultaneously. writeLock is only locked for the time of the actual DB write query, and then released.
// So for a non-blocking transaction, this becomes unimportant, these are used singly.
// However, for a blocking transaction (one run by the blocking commit thread) the commit lock is acquired at the BEGIN of the transaction, and critically - held for the duration
// of the transaction. So in these instances, we lock commitLock first, and then writeLock, but the critical difference is we hold the commitLock through the entire duration of all
// writes in this case.
// So when these are both locked by the same thread at the same time, `commitLock` is always locked first, and we do it the same way here to avoid deadlocks.
_sharedData.commitLock.lock();
_sharedData.writeLock.lock();
}

void SQLite::exclusiveUnlockDB() {
_sharedData.commitLock.unlock();
_sharedData.writeLock.unlock();
_sharedData.commitLock.unlock();
}

bool SQLite::beginTransaction(TRANSACTION_TYPE type) {
Expand Down
170 changes: 123 additions & 47 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@
// Initializations for static vars.
const uint64_t SQLiteNode::RECV_TIMEOUT{STIME_US_PER_S * 30};

// Setting this to 10 or lower may deadlock the server, as followers are only guaranteed to respond to every 10th message.
// If the threshold for blocking commits is less than 10, we may block, but never receive a message indicating that we should unblock.
atomic<uint64_t> SQLiteNode::MAX_PEER_FALL_BEHIND{500};

const string SQLiteNode::CONSISTENCY_LEVEL_NAMES[] = {"ASYNC",
"ONE",
"QUORUM"};

atomic<int64_t> SQLiteNode::currentReplicateThreadID(0);

const size_t SQLiteNode::MIN_APPROVE_FREQUENCY{10};

const vector<SQLitePeer*> SQLiteNode::_initPeers(const string& peerListString) {
// Make the logging macro work in the static initializer.
auto _name = "init";
Expand Down Expand Up @@ -102,6 +108,17 @@ const vector<SQLitePeer*> SQLiteNode::_initPeers(const string& peerListString) {
return peerList;
}

size_t SQLiteNode::_initQuorumSize(const vector<SQLitePeer*>& _peerList, const int priority) {
// We will start with one node required for quorum unless we're a permafollower, in which case, we'll start with 0 to exclude ourself.
size_t result{priority ? 1ul : 0ul};
for (const auto& p : _peerList) {
if (!p->permaFollower) {
++result;
}
}
return result;
}

SQLiteNode::SQLiteNode(SQLiteServer& server, shared_ptr<SQLitePool> dbPool, const string& name,
const string& host, const string& peerList, int priority, uint64_t firstTimeout,
const string& version, const string& commandPort)
Expand All @@ -110,6 +127,7 @@ SQLiteNode::SQLiteNode(SQLiteServer& server, shared_ptr<SQLitePool> dbPool, cons
_name(name),
_peerList(_initPeers(peerList)),
_originalPriority(priority),
_quorumSize(_initQuorumSize(_peerList, _originalPriority)),
_port(host.empty() ? nullptr : openPort(host, 30)),
_version(version),
_commitState(CommitState::UNINITIALIZED),
Expand Down Expand Up @@ -1238,6 +1256,41 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
peer->setCommit(message.calcU64("CommitCount"), message["Hash"]);

// If we're leading, see if this peer meets the definition of "up-to-date", which is to say, it's close enough to in-sync with us.
// We can skip checking if the peer is a permafollower, because we don't care about his state.
if (!peer->permaFollower && _state == SQLiteNodeState::LEADING) {
if (peer->commitCount + MAX_PEER_FALL_BEHIND > getCommitCount()) {
_upToDatePeers.insert(peer);
} else {
_upToDatePeers.erase(peer);
}

// Example
// Quorum size 3:
// We have 1 up-to-date peer.
// 1 >= 3/2
// Integer division, so 3/2 = 1.
// 1 >= 1.
// quorumUpToDate = true
bool quorumUpToDate = _upToDatePeers.size() >= (_quorumSize / 2);

if (quorumUpToDate && _commitsBlocked) {
_commitsBlocked = false;
SWARN("[clustersync] Cluster is no longer behind by over " << MAX_PEER_FALL_BEHIND << " commits. Unblocking new commits.");
_db.exclusiveUnlockDB();
} else if (!quorumUpToDate && !_commitsBlocked && !_db.insideTransaction()) {
_commitsBlocked = true;
uint64_t myCommitCount = getCommitCount();
SWARN("[clustersync] Cluster is behind by over " << MAX_PEER_FALL_BEHIND << " commits. New commits blocked until the cluster catches up.");
uint64_t start = STimeNow();
_db.exclusiveLockDB();
SWARN("[clustersync] Took " << (STimeNow() - start) << "us to block commits. Dumping cluster commit state. I have commit: " << myCommitCount);
for (const auto& p : _peerList) {
SWARN("[clustersync] Peer " << p->name << " has commit " << p->commitCount << ", behind by: " << (myCommitCount - p->commitCount));
}
}
}

// Classify and process the message
if (SIEquals(message.methodLine, "LOGIN")) {
// LOGIN: This is the first message sent to and received from a new peer. It communicates the current state of
Expand Down Expand Up @@ -1665,51 +1718,52 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
// APPROVE_TRANSACTION: Sent to the leader by a follower when it confirms it was able to begin a transaction and
// is ready to commit. Note that this peer approves the transaction for use in the LEADING and STANDINGDOWN
// update loop.
if (!message.isSet("ID")) {
STHROW("missing ID");
}
if (!message.isSet("NewCount")) {
STHROW("missing NewCount");
}
if (!message.isSet("NewHash")) {
STHROW("missing NewHash");
}
if (_state != SQLiteNodeState::LEADING && _state != SQLiteNodeState::STANDINGDOWN) {
STHROW("not leading");
}
SQLitePeer::Response response = SIEquals(message.methodLine, "APPROVE_TRANSACTION") ? SQLitePeer::Response::APPROVE : SQLitePeer::Response::DENY;
try {
// We ignore late approvals of commits that have already been finalized. They could have been committed
// already, in which case `_lastSentTransactionID` will have incremented, or they could have been rolled
// back due to a conflict, which would cuase them to have the wrong hash (the hash of the previous attempt
// at committing the transaction with this ID).
bool hashMatch = message["NewHash"] == _db.getUncommittedHash();
if (hashMatch && to_string(_lastSentTransactionID + 1) == message["ID"]) {
if (message.calcU64("NewCount") != _db.getCommitCount() + 1) {
STHROW("commit count mismatch. Expected: " + message["NewCount"] + ", but would actually be: "
+ to_string(_db.getCommitCount() + 1));
}
if (peer->permaFollower) {
STHROW("permafollowers shouldn't approve/deny");

// If it's DENY, or AsyncNotification isn't set, this means that it's not just a simple notification that the follower has some commit number.
// It's either a real DENY, or a real APPROVE of a quorum transaction.
if (SIEquals(message.methodLine, "DENY_TRANSACTION") || !message.isSet("AsyncNotification")) {
if (!message.isSet("ID")) {
STHROW("missing ID");
}
if (!message.isSet("NewCount")) {
STHROW("missing NewCount");
}
if (!message.isSet("NewHash")) {
STHROW("missing NewHash");
}
if (_state != SQLiteNodeState::LEADING && _state != SQLiteNodeState::STANDINGDOWN) {
STHROW("not leading");
}
SQLitePeer::Response response = SIEquals(message.methodLine, "APPROVE_TRANSACTION") ? SQLitePeer::Response::APPROVE : SQLitePeer::Response::DENY;
try {
// We ignore late approvals of commits that have already been finalized. They could have been committed
// already, in which case `_lastSentTransactionID` will have incremented, or they could have been rolled
// back due to a conflict, which would cause them to have the wrong hash (the hash of the previous attempt
// at committing the transaction with this ID).
bool hashMatch = message["NewHash"] == _db.getUncommittedHash();
if (hashMatch && to_string(_lastSentTransactionID + 1) == message["ID"]) {
if (message.calcU64("NewCount") != _db.getCommitCount() + 1) {
STHROW("commit count mismatch. Expected: " + message["NewCount"] + ", but would actually be: "
+ to_string(_db.getCommitCount() + 1));
}
if (peer->permaFollower) {
STHROW("permafollowers shouldn't approve/deny");
}
PINFO("Peer " << response << " transaction #" << message["NewCount"] << " (" << message["NewHash"] << ")");
peer->transactionResponse = response;
}
PINFO("Peer " << response << " transaction #" << message["NewCount"] << " (" << message["NewHash"] << ")");
peer->transactionResponse = response;
} else {
// Old command. Nothing to do. We already sent a commit or rollback.
PINFO("Peer '" << message.methodLine << "' transaction #" << message["NewCount"]
<< " (" << message["NewHash"] << ") after " << (hashMatch ? "commit" : "rollback") << ".");
} catch (const SException& e) {
// Doesn't correspond to the outstanding transaction not necessarily fatal. This can happen if, for
// example, a command is escalated from one follower, approved by the second, but where the first follower dies
// before the second's approval is received by the leader. In this case the leader will drop the command
// when the initiating peer is lost, and thus won't have an outstanding transaction (or will be processing
// a new transaction) when the old, outdated approval is received. Furthermore, in this case we will have
// already sent a ROLLBACK, so it will already correct itself. If not, then we'll wait for the follower to
// determine it's screwed and reconnect.
SWARN("Received " << message.methodLine << " for transaction #"
<< message.calc("NewCount") << " (" << message["NewHash"] << ", " << message["ID"] << ") but '"
<< e.what() << "', ignoring.");
}
} catch (const SException& e) {
// Doesn't correspond to the outstanding transaction not necessarily fatal. This can happen if, for
// example, a command is escalated from/ one follower, approved by the second, but where the first follower dies
// before the second's approval is received by the leader. In this case the leader will drop the command
// when the initiating peer is lost, and thus won't have an outstanding transaction (or will be processing
// a new transaction) when the old, outdated approval is received. Furthermore, in this case we will have
// already sent a ROLLBACK, so it will already correct itself. If not, then we'll wait for the follower to
// determine it's screwed and reconnect.
SWARN("Received " << message.methodLine << " for transaction #"
<< message.calc("NewCount") << " (" << message["NewHash"] << ", " << message["ID"] << ") but '"
<< e.what() << "', ignoring.");
}
} else {
STHROW("unrecognized message");
Expand Down Expand Up @@ -1957,6 +2011,14 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) {
_db.popCommittedTransactions();
_lastSentTransactionID = _db.getCommitCount();
}

// Mark peers that are up-to-date so we have a valid starting state.
_upToDatePeers.clear();
for (const auto& peer : _peerList) {
if (!peer->permaFollower && (peer->commitCount + MAX_PEER_FALL_BEHIND > getCommitCount())) {
_upToDatePeers.insert(peer);
}
}
} else if (newState == SQLiteNodeState::STANDINGDOWN) {
// start the timeout countdown.
_standDownTimeout.alarmDuration = STIME_US_PER_S * 30; // 30s timeout before we give up
Expand All @@ -1969,6 +2031,14 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) {
_priority = _originalPriority;
}

// If we're switching from LEADING or STANDINGDOWN to anything else (aside from the case where we switch from LEADING to STANDINGDOWN), we unblock commits.
if ((_state == SQLiteNodeState::LEADING || _state == SQLiteNodeState::STANDINGDOWN) && newState != SQLiteNodeState::STANDINGDOWN) {
if (_commitsBlocked) {
_commitsBlocked = false;
_db.exclusiveUnlockDB();
}
}

// Send to everyone we're connected to, whether or not
// we're "LoggedIn" (else we might change state after sending LOGIN,
// but before we receive theirs, and they'll miss it).
Expand Down Expand Up @@ -2278,15 +2348,21 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S

// Are we participating in quorum?
if (_priority) {
// If the ID is /ASYNC_\d+/, no need to respond, leader will ignore it anyway.
// If the ID is /ASYNC_\d+/, leader will keep going regardless, but we send every 10th response anyway, just so leader keeps relatively current with our commit count.
string verb = success ? "APPROVE_TRANSACTION" : "DENY_TRANSACTION";
if (!SStartsWith(message["ID"], "ASYNC_")) {
uint64_t currentCommitCount = db.getCommitCount();
bool isAsync = SStartsWith(message["ID"], "ASYNC_");
bool asyncNotification = isAsync && (currentCommitCount % MIN_APPROVE_FREQUENCY == 0);
if (!isAsync || asyncNotification) {
// Not a permafollower, approve the transaction
PINFO(verb << " #" << db.getCommitCount() + 1 << " (" << message["NewHash"] << ").");
PINFO(verb << " #" << currentCommitCount + 1 << " (" << message["NewHash"] << ").");
SData response(verb);
response["NewCount"] = SToStr(db.getCommitCount() + 1);
response["NewCount"] = SToStr(currentCommitCount + 1);
response["NewHash"] = success ? db.getUncommittedHash() : message["NewHash"];
response["ID"] = message["ID"];
if (asyncNotification) {
response["AsyncNotification"] = "true";
}
if (_leadPeer) {
_sendToPeer(_leadPeer, response);
} else {
Expand Down
13 changes: 13 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class SQLiteNode : public STCPManager {
// Receive timeout for cluster messages.
static const uint64_t RECV_TIMEOUT;

// The minimum frequency of APPROVE_TRANSACTION messages we'll send when following, back to leader, to indicate our own current synchronization state.
// This is expressed as "every Nth message", where e.g., if MIN_APPROVE_FREQUENCY is 10, we will respond to at least every 10th BEGIN_TRANSACTION message.
static const size_t MIN_APPROVE_FREQUENCY;

// The maximum number of commits behind we'll allow a quorum number of peers to be before we block commits on leader.
static atomic<uint64_t> MAX_PEER_FALL_BEHIND;

// Get and SQLiteNode State from it's name.
static SQLiteNodeState stateFromName(const string& name);

Expand Down Expand Up @@ -193,6 +200,7 @@ class SQLiteNode : public STCPManager {
static atomic<int64_t> currentReplicateThreadID;

static const vector<SQLitePeer*> _initPeers(const string& peerList);
static size_t _initQuorumSize(const vector<SQLitePeer*>& _peerList, const int priority);

// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
Expand Down Expand Up @@ -262,6 +270,11 @@ class SQLiteNode : public STCPManager {
// to make sure it's up-to-date. Store the configured priority here and use "-1" until we're ready to fully join the cluster.
const int _originalPriority;

// If we're leading and we're too far ahead of the rest of the cluster, we block new commits. This prevents us from forking too far ahead of everyone else.
const size_t _quorumSize;
bool _commitsBlocked{false};
set<SQLitePeer*> _upToDatePeers;

// A string representing an address (i.e., `127.0.0.1:80`) where this server accepts commands. I.e., "the command port".
const unique_ptr<Port> _port;

Expand Down

0 comments on commit d89b94d

Please sign in to comment.