Skip to content

Commit

Permalink
Merge pull request #1667 from Expensify/tyler-improve-shutdown-speed
Browse files Browse the repository at this point in the history
Improve shutdown speed.
  • Loading branch information
johnmlee101 authored Mar 11, 2024
2 parents 42e3a53 + fdfe60b commit 5ba835c
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
16 changes: 15 additions & 1 deletion BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper()
break;
}
}

// Break out of `poll` in main.cpp.
_notifyDone.push(true);
SINFO("Exiting syncWrapper");
}

void BedrockServer::sync()
Expand Down Expand Up @@ -230,13 +234,17 @@ void BedrockServer::sync()
// commands, and we'll shortly run through the existing queue.
if (_shutdownState.load() == CLIENTS_RESPONDED) {
_syncNode->beginShutdown();

// This will cause us to skip the next `poll` iteration which avoids a 1 second wait.
_notifyDone.push(true);
}

// The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for
// activity. Once any of them has activity (or the timeout ends), poll will return.
fd_map fdm;

// Pre-process any sockets the sync node is managing (i.e., communication with peer nodes).
_notifyDone.prePoll(fdm);
_syncNode->prePoll(fdm);

// Add our command queues to our fd_map.
Expand All @@ -262,6 +270,7 @@ void BedrockServer::sync()
AutoTimerTime postPollTime(postPollTimer);
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
_notifyDone.postPoll(fdm);
}

// Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication
Expand Down Expand Up @@ -671,7 +680,7 @@ void BedrockServer::worker(int threadId)
});

// Get the next one.
command = commandQueue.get(1000000);
command = commandQueue.get(100000);

SAUTOPREFIX(command->request);
SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, "
Expand Down Expand Up @@ -1362,6 +1371,9 @@ bool BedrockServer::shutdownComplete() {
void BedrockServer::prePoll(fd_map& fdm) {
lock_guard<mutex> lock(_portMutex);

// This will interrupt poll when we shut down.
_notifyDone.prePoll(fdm);

// Add all our ports. There are no sockets directly managed here.
if (_commandPortPublic) {
SFDset(fdm, _commandPortPublic->s, SREADEVTS);
Expand All @@ -1378,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) {
}

void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
_notifyDone.postPoll(fdm);

// NOTE: There are no sockets managed here, just ports.
// Open the port the first time we enter a command-processing state
SQLiteNodeState state = _replicationState.load();
Expand Down
4 changes: 4 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer {

// We call this method whenever a node changes state
void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override;

// This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down.
// to wait up to a full second for them.
SSynchronizedQueue<bool> _notifyDone;
};
6 changes: 3 additions & 3 deletions libstuff/SSignal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() {
// Wait for a signal to appear.
siginfo_t siginfo = {0};
struct timespec timeout;
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
timeout.tv_sec = 0;
timeout.tv_nsec = 100'000'000; // 100ms in ns.
int result = -1;
while (result == -1) {
result = sigtimedwait(&signals, &siginfo, &timeout);
Expand Down Expand Up @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() {
void SStopSignalThread() {
_SSignal_threadStopFlag = true;
if (_SSignal_threadInitialized.test_and_set()) {
// Send ourselves a singnal to interrupt our thread.
// Send ourselves a signal to interrupt our thread.
SINFO("Joining signal thread.");
_SSignal_signalThread.join();
_SSignal_threadInitialized.clear();
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
}
case STCPManager::Socket::CLOSED: {
// Done; clean up and try to reconnect
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5);
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1);
if (socket->connectFailure) {
SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms");
} else {
Expand Down
30 changes: 27 additions & 3 deletions test/clustertest/BedrockClusterTester.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
_cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount);
}

auto start = STimeNow();

// Now start them all.
list<thread> threads;
for (auto it = _cluster.begin(); it != _cluster.end(); it++) {
threads.emplace_back([it](){
it->startServer();
});
usleep(100'000);
}
for (auto& i : threads) {
i.join();
Expand All @@ -176,16 +179,37 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
usleep(100000); // 0.1 seconds.
}
}
auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl;
}
}

template <typename T>
ClusterTester<T>::~ClusterTester()
{
// Shut them down in reverse order so they don't try and stand up as leader in the middle of everything.
for (int i = _size - 1; i >= 0; i--) {
stopNode(i);
auto start = STimeNow();

// Shut down everything but the leader first.
list<thread> threads;
for (int i = _size - 1; i > 0; i--) {
threads.emplace_back([&, i](){
stopNode(i);
});
}
for (auto& t: threads) {
t.join();
}

// Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down.
stopNode(0);

auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl;
}
_cluster.clear();
}

Expand Down

0 comments on commit 5ba835c

Please sign in to comment.