Skip to content

Commit

Permalink
Merge pull request #827 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
rafecolton authored Jul 29, 2020
2 parents 20a7f5b + 1387e1d commit ef1b3e2
Show file tree
Hide file tree
Showing 18 changed files with 867 additions and 418 deletions.
30 changes: 19 additions & 11 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include "BedrockCore.h"
#include <iomanip>

#include <sys/time.h>
#include <sys/resource.h>

set<string>BedrockServer::_blacklistedParallelCommands;
shared_timed_mutex BedrockServer::_blacklistedParallelCommandMutex;

Expand Down Expand Up @@ -176,16 +179,21 @@ void BedrockServer::sync(const SData& args,

// Initialize the DB.
int64_t mmapSizeGB = args.isSet("-mmapSizeGB") ? stoll(args["-mmapSizeGB"]) : 0;
SQLite db(args["-db"], args.calc("-cacheSize"), true, args.calc("-maxJournalSize"), -1, workerThreads - 1, args["-synchronous"], mmapSizeGB, args.test("-pageLogging"));

// And the command processor.
// We use fewer FDs on test machines that have other resource restrictions in place.
int fdLimit = args.isSet("-live") ? 25'000 : 250;
SINFO("Setting dbPool size to: " << fdLimit);
SQLitePool dbPool(fdLimit, args["-db"], args.calc("-cacheSize"), true, args.calc("-maxJournalSize"), workerThreads, args["-synchronous"], mmapSizeGB, args.test("-pageLogging"));
SQLite& db = dbPool.getBase();

// Initialize the command processor.
BedrockCore core(db, server);

// And the sync node.
uint64_t firstTimeout = STIME_US_PER_M * 2 + SRandom::rand64() % STIME_US_PER_S * 30;

// Initialize the shared pointer to our sync node object.
server._syncNode = make_shared<SQLiteNode>(server, db, args["-nodeName"], args["-nodeHost"],
server._syncNode = make_shared<SQLiteNode>(server, dbPool, args["-nodeName"], args["-nodeHost"],
args["-peerList"], args.calc("-priority"), firstTimeout,
server._version);

Expand All @@ -201,15 +209,14 @@ void BedrockServer::sync(const SData& args,
list<thread> workerThreadList;
for (int threadId = 0; threadId < workerThreads; threadId++) {
workerThreadList.emplace_back(worker,
ref(args),
ref(dbPool),
ref(replicationState),
ref(upgradeInProgress),
ref(leaderVersion),
ref(syncNodeQueuedCommands),
ref(server._completedCommands),
ref(server),
threadId,
workerThreads);
threadId);
}

// Now we jump into our main command processing loop.
Expand Down Expand Up @@ -755,20 +762,21 @@ void BedrockServer::sync(const SData& args,
server._syncThreadComplete.store(true);
}

void BedrockServer::worker(const SData& args,
void BedrockServer::worker(SQLitePool& dbPool,
atomic<SQLiteNode::State>& replicationState,
atomic<bool>& upgradeInProgress,
atomic<string>& leaderVersion,
BedrockTimeoutCommandQueue& syncNodeQueuedCommands,
BedrockTimeoutCommandQueue& syncNodeCompletedCommands,
BedrockServer& server,
int threadId,
int threadCount)
int threadId)
{
// Worker 0 is the "blockingCommit" thread.
SInitialize(threadId ? "worker" + to_string(threadId) : "blockingCommit");
int64_t mmapSizeGB = args.isSet("-mmapSizeGB") ? stoll(args["-mmapSizeGB"]) : 0;
SQLite db(args["-db"], args.calc("-cacheSize"), false, args.calc("-maxJournalSize"), threadId, threadCount - 1, args["-synchronous"], mmapSizeGB, args.test("-pageLogging"));

// Get a DB handle to work on. This will automatically be returned when dbScope goes out of scope.
SQLite& db = dbPool.get();
SQLiteScopedHandle dbScope(dbPool, db);
BedrockCore core(db, server);

// Command to work on. This default command is replaced when we find work to do.
Expand Down
5 changes: 2 additions & 3 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,14 @@ class BedrockServer : public SQLiteServer {
BedrockServer& server);

// Each worker thread runs this function. It gets the same data as the sync thread, plus its individual thread ID.
static void worker(const SData& args,
static void worker(SQLitePool& dbPool,
atomic<SQLiteNode::State>& _replicationState,
atomic<bool>& upgradeInProgress,
atomic<string>& leaderVersion,
BedrockTimeoutCommandQueue& syncNodeQueuedCommands,
BedrockTimeoutCommandQueue& syncNodeCompletedCommands,
BedrockServer& server,
int threadId,
int threadCount);
int threadId);

// Send a reply for a completed command back to the initiating client. If the `originator` of the command is set,
// then this is an error, as the command should have been sent back to a peer.
Expand Down
151 changes: 0 additions & 151 deletions libstuff/SLockTimer.h

This file was deleted.

1 change: 0 additions & 1 deletion libstuff/libstuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,6 @@ struct STestTimer {
// Other libstuff headers.
#include "SRandom.h"
#include "SPerformanceTimer.h"
#include "SLockTimer.h"
#include "SSynchronizedQueue.h"

#endif // LIBSTUFF_H
15 changes: 12 additions & 3 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "plugins/MySQL.h"
#include <sys/stat.h> // for umask()
#include <dlfcn.h>
#include <sys/resource.h>

/////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -294,6 +295,17 @@ int main(int argc, char* argv[]) {
SASSERT(SFileExists(args["-db"]));
}

// Set our soft limit to the same as our hard limit to allow for more file handles.
struct rlimit limits;
if (!getrlimit(RLIMIT_NOFILE, &limits)) {
limits.rlim_cur = limits.rlim_max;
if (setrlimit(RLIMIT_NOFILE, &limits)) {
SERROR("Couldn't set FD limit");
}
} else {
SERROR("Couldn't get FD limit");
}

// Log stack traces if we have unhandled exceptions.
set_terminate(STerminateHandler);

Expand Down Expand Up @@ -352,9 +364,6 @@ int main(int argc, char* argv[]) {
}
}

// Log how much time we spent in our main mutex.
SQLite::g_commitLock.log();

// All done
SINFO("Graceful process shutdown complete");
return 0;
Expand Down
Loading

0 comments on commit ef1b3e2

Please sign in to comment.