Skip to content

Commit

Permalink
Merge pull request #1999 from Expensify/tyler-improve-fork-detection
Browse files Browse the repository at this point in the history
  • Loading branch information
cead22 authored Dec 7, 2024
2 parents 566c4b2 + c86f799 commit 9b71f40
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 262 deletions.
197 changes: 134 additions & 63 deletions sqlitecluster/SQLiteNode.cpp

Large diffs are not rendered by default.

9 changes: 2 additions & 7 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class SQLiteNode : public STCPManager {
// commitCount that we do, this will return null.
void _updateSyncPeer();

void _dieIfForkedFromCluster();

const string _commandAddress;
const string _name;
const vector<SQLitePeer*> _peerList;
Expand Down Expand Up @@ -384,13 +386,6 @@ class SQLiteNode : public STCPManager {
// This can be removed once we've figured out why replication falls behind. See this issue: https://github.com/Expensify/Expensify/issues/210528
atomic<size_t> _concurrentReplicateTransactions = 0;

// We keep a set of strings that are the names of nodes we've forked from, in the case we ever receive a hash mismatch while trying to synchronize.
// Whenever we become LEADING or FOLLOWING this is cleared. This resets the case where one node has forked, we attempt to synchronize from it, and fail,
// but later synchronize from someone else. Once we've come up completely, we no longer "hold a grudge" against this node, which will likely get fixed
// while we're online.
// In the event that this list becomes longer than half the cluster size, the node kills itself and logs that it's in an unrecoverable state.
set<string> _forkedFrom;

// A pointer to a SQLite instance that is passed to plugin's stateChanged function. This prevents plugins from operating on the same handle that
// the sync node is when they run queries in stateChanged.
SQLite* pluginDB;
Expand Down
5 changes: 2 additions & 3 deletions sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SQLitePeer::SQLitePeer(const string& name_, const string& host_, const STable& p
transactionResponse(Response::NONE),
version(),
lastPingTime(0),
forked(false),
hash()
{ }

Expand Down Expand Up @@ -79,6 +80,7 @@ void SQLitePeer::reset() {
version = "";
lastPingTime = 0,
setCommit(0, "");
forked = false;
}

void SQLitePeer::shutdownSocket() {
Expand Down Expand Up @@ -205,9 +207,6 @@ string SQLitePeer::responseName(Response response) {
case Response::DENY:
return "DENY";
break;
case Response::ABSTAIN:
return "ABSTAIN";
break;
default:
return "";
}
Expand Down
6 changes: 4 additions & 2 deletions sqlitecluster/SQLitePeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ class SQLitePeer {
enum class Response {
NONE,
APPROVE,
DENY,
ABSTAIN
DENY
};

enum class PeerPostPollStatus {
Expand Down Expand Up @@ -91,6 +90,9 @@ class SQLitePeer {
atomic<string> version;
atomic<uint64_t> lastPingTime;

// Set to true when this peer is known to be unusable, I.e., when it has a database that is forked from us.
atomic<bool> forked;

private:
// For initializing the permafollower value from the params list.
static bool isPermafollower(const STable& params);
Expand Down
62 changes: 36 additions & 26 deletions test/clustertest/tests/ForkCheckTest.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#include "test/lib/BedrockTester.h"
#include <sys/wait.h>

#include <libstuff/SData.h>
#include <libstuff/SQResult.h>
#include <sqlitecluster/SQLite.h>
#include <sqlitecluster/SQLiteNode.h>
#include <test/clustertest/BedrockClusterTester.h>

struct ForkCheckTest : tpunit::TestFixture {
ForkCheckTest()
: tpunit::TestFixture("ForkCheck", TEST(ForkCheckTest::test)) {}
: tpunit::TestFixture("ForkCheck",
TEST(ForkCheckTest::forkAtShutDown)) {}

pair<uint64_t, string> getMaxJournalCommit(BedrockTester& tester, bool online = true) {
SQResult journals;
Expand All @@ -30,48 +33,50 @@ struct ForkCheckTest : tpunit::TestFixture {
return make_pair(maxJournalCommit, maxJournalTable);
}

void test() {
// Create a cluster, wait for it to come up.
BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER);

// We'll tell the threads to stop when they're done.
atomic<bool> stop(false);

// We want to not spam a stopped leader.
atomic<bool> leaderIsUp(true);

vector<thread> createThreads(size_t num, BedrockClusterTester& tester, atomic<bool>& stop, atomic<bool>& leaderIsUp) {
// Just use a bunch of copies of the same command.
SData spamCommand("idcollision");

// In a vector.
const vector<SData> commands(100, spamCommand);

// Now create 9 threads spamming 100 commands at a time, each. 9 cause we have three nodes.
vector<thread> threads;
for (size_t i = 0; i < 9; i++) {
threads.emplace_back([&tester, i, &commands, &stop, &leaderIsUp](){
for (size_t num = 0; num < 9; num++) {
threads.emplace_back([&tester, num, &stop, &leaderIsUp](){
const vector<SData> commands(100, SData("idcollision"));
while (!stop) {
// Pick a tester, send, don't care about the result.
size_t testerNum = i % 3;
size_t testerNum = num % 5;
if (testerNum == 0 && !leaderIsUp) {
// If we're looking for leader and it's down, wait a second to avoid pegging the CPU.
sleep(1);
} else {
// If we're not leader or leader is up, spam away!
tester.getTester(testerNum).executeWaitMultipleData(commands);
// If leader's off, don't use it.
testerNum = 1;
}
tester.getTester(testerNum).executeWaitMultipleData(commands);
}
});
}

return threads;
}

// This primary test here checks that a node that is forked will not be able to rejoin the cluster when reconnecting.
// This is a reasonable test for a fork that happens at shutdown.
void forkAtShutDown() {
// Create a cluster, wait for it to come up.
BedrockClusterTester tester(ClusterSize::FIVE_NODE_CLUSTER);

// We'll tell the threads to stop when they're done.
atomic<bool> stop(false);

// We want to not spam a stopped leader.
atomic<bool> leaderIsUp(true);

// Now create 15 threads spamming 100 commands at a time, each. 15 because we have five nodes.
vector<thread> threads = createThreads(15, tester, stop, leaderIsUp);

// Let them spam for a second.
sleep(1);

// We can try and stop the leader.
leaderIsUp = false;
tester.getTester(0).stopServer();

// Spam a few more commands and then we can stop.
// Spam a few more commands so thar the follower is ahead of the stopped leader, and then we can stop.
sleep(1);
stop = true;
for (auto& t : threads) {
Expand Down Expand Up @@ -119,5 +124,10 @@ struct ForkCheckTest : tpunit::TestFixture {

// And that signal should have been ABORT.
ASSERT_EQUAL(SIGABRT, WTERMSIG(status));

// We call stopServer on the forked leader because it crashed, but the cluster tester doesn't realize, so shutting down
// normally will time out after a minute. Calling `stopServer` explicitly will clear the server PID, and we won't need
// to wait for this timeout.
tester.getTester(0).stopServer();
}
} __ForkCheckTest;
161 changes: 0 additions & 161 deletions test/clustertest/tests/ForkedNodeApprovalTest.cpp

This file was deleted.

0 comments on commit 9b71f40

Please sign in to comment.