Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expensify_prod branch #1672

Merged
merged 8 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper()
break;
}
}

// Break out of `poll` in main.cpp.
_notifyDone.push(true);
SINFO("Exiting syncWrapper");
}

void BedrockServer::sync()
Expand Down Expand Up @@ -230,13 +234,17 @@ void BedrockServer::sync()
// commands, and we'll shortly run through the existing queue.
if (_shutdownState.load() == CLIENTS_RESPONDED) {
_syncNode->beginShutdown();

// This will cause us to skip the next `poll` iteration which avoids a 1 second wait.
_notifyDone.push(true);
}

// The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for
// activity. Once any of them has activity (or the timeout ends), poll will return.
fd_map fdm;

// Pre-process any sockets the sync node is managing (i.e., communication with peer nodes).
_notifyDone.prePoll(fdm);
_syncNode->prePoll(fdm);

// Add our command queues to our fd_map.
Expand All @@ -262,6 +270,7 @@ void BedrockServer::sync()
AutoTimerTime postPollTime(postPollTimer);
_syncNode->postPoll(fdm, nextActivity);
_syncNodeQueuedCommands.postPoll(fdm);
_notifyDone.postPoll(fdm);
}

// Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication
Expand Down Expand Up @@ -671,7 +680,7 @@ void BedrockServer::worker(int threadId)
});

// Get the next one.
command = commandQueue.get(1000000);
command = commandQueue.get(100000);

SAUTOPREFIX(command->request);
SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, "
Expand Down Expand Up @@ -1362,6 +1371,9 @@ bool BedrockServer::shutdownComplete() {
void BedrockServer::prePoll(fd_map& fdm) {
lock_guard<mutex> lock(_portMutex);

// This will interrupt poll when we shut down.
_notifyDone.prePoll(fdm);

// Add all our ports. There are no sockets directly managed here.
if (_commandPortPublic) {
SFDset(fdm, _commandPortPublic->s, SREADEVTS);
Expand All @@ -1378,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) {
}

void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
_notifyDone.postPoll(fdm);

// NOTE: There are no sockets managed here, just ports.
// Open the port the first time we enter a command-processing state
SQLiteNodeState state = _replicationState.load();
Expand Down
4 changes: 4 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer {

// We call this method whenever a node changes state
void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override;

// This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down.
// to wait up to a full second for them.
SSynchronizedQueue<bool> _notifyDone;
};
6 changes: 3 additions & 3 deletions libstuff/SSignal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() {
// Wait for a signal to appear.
siginfo_t siginfo = {0};
struct timespec timeout;
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
timeout.tv_sec = 0;
timeout.tv_nsec = 100'000'000; // 100ms in ns.
int result = -1;
while (result == -1) {
result = sigtimedwait(&signals, &siginfo, &timeout);
Expand Down Expand Up @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() {
void SStopSignalThread() {
_SSignal_threadStopFlag = true;
if (_SSignal_threadInitialized.test_and_set()) {
// Send ourselves a singnal to interrupt our thread.
// Send ourselves a signal to interrupt our thread.
SINFO("Joining signal thread.");
_SSignal_signalThread.join();
_SSignal_threadInitialized.clear();
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
}
case STCPManager::Socket::CLOSED: {
// Done; clean up and try to reconnect
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5);
uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1);
if (socket->connectFailure) {
SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms");
} else {
Expand Down
30 changes: 27 additions & 3 deletions test/clustertest/BedrockClusterTester.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
_cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount);
}

auto start = STimeNow();

// Now start them all.
list<thread> threads;
for (auto it = _cluster.begin(); it != _cluster.end(); it++) {
threads.emplace_back([it](){
it->startServer();
});
usleep(100'000);
}
for (auto& i : threads) {
i.join();
Expand All @@ -176,16 +179,37 @@ ClusterTester<T>::ClusterTester(ClusterSize size,
usleep(100000); // 0.1 seconds.
}
}
auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl;
}
}

template <typename T>
ClusterTester<T>::~ClusterTester()
{
// Shut them down in reverse order so they don't try and stand up as leader in the middle of everything.
for (int i = _size - 1; i >= 0; i--) {
stopNode(i);
auto start = STimeNow();

// Shut down everything but the leader first.
list<thread> threads;
for (int i = _size - 1; i > 0; i--) {
threads.emplace_back([&, i](){
stopNode(i);
});
}
for (auto& t: threads) {
t.join();
}

// Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down.
stopNode(0);

auto end = STimeNow();

if ((end - start) > 5000000) {
cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl;
}
_cluster.clear();
}

Expand Down
35 changes: 33 additions & 2 deletions test/lib/tpunit++.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <string.h>
#include <iostream>
#include <regex>
#include <chrono>
using namespace tpunit;

bool tpunit::TestFixture::exitFlag = false;
Expand Down Expand Up @@ -166,11 +167,15 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
}

list<TestFixture*> afterTests;
mutex testTimeLock;
multimap<chrono::milliseconds, string> testTimes;

for (int threadID = 0; threadID < threads; threadID++) {
// Capture everything by reference except threadID, because we don't want it to be incremented for the
// next thread in the loop.
thread t = thread([&, threadID]{
auto start = chrono::steady_clock::now();

threadInitFunction();
try {
// Do test.
Expand Down Expand Up @@ -258,6 +263,11 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
exitFlag = true;
printf("Thread %d caught shutdown exception, exiting.\n", threadID);
}
auto end = chrono::steady_clock::now();
if (currentTestName.size()) {
lock_guard<mutex> lock(testTimeLock);
testTimes.emplace(make_pair(chrono::duration_cast<std::chrono::milliseconds>(end - start), currentTestName));
}
});
threadList.push_back(move(t));
}
Expand Down Expand Up @@ -294,6 +304,17 @@ int tpunit::TestFixture::tpunit_detail_do_run(const set<string>& include, const
}
}

cout << endl;
cout << "Slowest Test Classes: " << endl;
auto it = testTimes.rbegin();
for (size_t i = 0; i < 10; i++) {
if (it == testTimes.rend()) {
break;
}
cout << it->first << ": " << it->second << endl;
it++;
}

return tpunit_detail_stats()._failures;
}
return 1;
Expand Down Expand Up @@ -419,23 +440,33 @@ void tpunit::TestFixture::tpunit_detail_do_tests(TestFixture* f) {
f->_stats._assertions = 0;
f->_stats._exceptions = 0;
f->testOutputBuffer = "";
auto start = chrono::steady_clock::now();
tpunit_detail_do_methods(f->_befores);
tpunit_detail_do_method(t);
tpunit_detail_do_methods(f->_afters);
auto end = chrono::steady_clock::now();
stringstream timeStream;
timeStream << "(" << chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (chrono::duration_cast<std::chrono::milliseconds>(end - start) > 5000ms) {
timeStream << " \xF0\x9F\x90\x8C";
}
timeStream << ")";
string timeStr = timeStream.str();
const char* time = timeStr.c_str();

// No new assertions or exceptions. This not currently synchronized correctly. They can cause tests that
// passed to appear failed when another test failed while this test was running. They cannot cause failed
// tests to appear to have passed.
if(!f->_stats._assertions && !f->_stats._exceptions) {
lock_guard<recursive_mutex> lock(m);
printf("\xE2\x9C\x85 %s\n", t->_name);
printf("\xE2\x9C\x85 %s %s\n", t->_name, time);
tpunit_detail_stats()._passes++;
} else {
lock_guard<recursive_mutex> lock(m);

// Dump the test buffer if the test included any log lines.
f->printTestBuffer();
printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s\n", t->_name);
printf("\xE2\x9D\x8C !FAILED! \xE2\x9D\x8C %s %s\n", t->_name, time);
tpunit_detail_stats()._failures++;
tpunit_detail_stats()._failureNames.emplace(t->_name);
}
Expand Down