Skip to content

Commit

Permalink
Merge pull request #835 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
cead22 authored Aug 5, 2020
2 parents 85e9f83 + 8cc572d commit cf0ebaf
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 48 deletions.
23 changes: 19 additions & 4 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ SQLite::SQLite(const string& filename, int cacheSize, bool enableFullCheckpoints
whitelist(nullptr),
_maxJournalSize(maxJournalSize),
_insideTransaction(false),
_dbCountAtStart(0),
_beginElapsed(0),
_readElapsed(0),
_writeElapsed(0),
Expand Down Expand Up @@ -229,6 +230,7 @@ SQLite::SQLite(const SQLite& from) :
_journalSize(from._journalSize),
_maxJournalSize(from._maxJournalSize),
_insideTransaction(false),
_dbCountAtStart(0),
_beginElapsed(0),
_readElapsed(0),
_writeElapsed(0),
Expand Down Expand Up @@ -508,6 +510,13 @@ bool SQLite::beginTransaction(bool useCache, const string& transactionName) {
uint64_t before = STimeNow();
_currentTransactionAttemptCount = -1;
_insideTransaction = !SQuery(_db, "starting db transaction", "BEGIN CONCURRENT");

// Because some other thread could commit once we've run `BEGIN CONCURRENT`, this value can be slightly behind
// where we're actually able to start such that we know we shouldn't get a conflict if this commits successfully on
// leader. However, this is perfectly safe, it just adds the possibility that threads on followers wait for an
// extra transaction to complete before starting, which is an anti-optimization, but the alternative is wrapping
// the above `BEGIN CONCURRENT` and the `getCommitCount` call in a lock, which is worse.
_dbCountAtStart = getCommitCount();
_queryCache.clear();
_transactionName = transactionName;
_useCache = useCache;
Expand Down Expand Up @@ -757,7 +766,7 @@ bool SQLite::prepare() {
string query = "INSERT INTO " + _journalName + " VALUES (" + SQ(commitCount + 1) + ", " + SQ(_uncommittedQuery) + ", " + SQ(_uncommittedHash) + " )";

// These are the values we're currently operating on, until we either commit or rollback.
_sharedData->_inFlightTransactions[commitCount + 1] = make_pair(_uncommittedQuery, _uncommittedHash);
_sharedData->_inFlightTransactions[commitCount + 1] = make_tuple(_uncommittedQuery, _uncommittedHash, _dbCountAtStart);

int result = SQuery(_db, "updating journal", query);
_prepareElapsed += STimeNow() - before;
Expand Down Expand Up @@ -868,6 +877,7 @@ int SQLite::commit() {
_useCache = false;
_queryCount = 0;
_cacheHits = 0;
_dbCountAtStart = 0;
} else {
if (_currentTransactionAttemptCount != -1) {
string logLine = SWHEREAMI + "[row-level-locking] transaction attempt:" +
Expand All @@ -885,11 +895,11 @@ int SQLite::commit() {
return result;
}

map<uint64_t, pair<string, string>> SQLite::getCommittedTransactions() {
map<uint64_t, tuple<string, string, uint64_t>> SQLite::getCommittedTransactions() {
SQLITE_COMMIT_AUTOLOCK;

// Maps a committed transaction ID to the correct query and hash for that transaction.
map<uint64_t, pair<string, string>> result;
// Maps a committed transaction ID to the correct query and hash, and starting commit count for that transaction.
map<uint64_t, tuple<string, string, uint64_t>> result;

// If nothing's been committed, nothing to return.
if (_sharedData->_committedTransactionIDs.empty()) {
Expand Down Expand Up @@ -962,6 +972,7 @@ void SQLite::rollback() {
_useCache = false;
_queryCount = 0;
_cacheHits = 0;
_dbCountAtStart = 0;

// Reset this to the default on any completion of the transaction, successful or not.
_enableCheckpointInterrupt = true;
Expand Down Expand Up @@ -1196,6 +1207,10 @@ bool SQLite::getUpdateNoopMode() const {
return _noopUpdateMode;
}

uint64_t SQLite::getDBCountAtStart() const {
return _dbCountAtStart;
}

void SQLite::addCheckpointListener(SQLite::CheckpointRequiredListener& listener) {
lock_guard<mutex> lock(_sharedData->_checkpointListenerMutex);
_sharedData->_checkpointListeners.insert(&listener);
Expand Down
12 changes: 10 additions & 2 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class SQLite {
// This atomically removes and returns committed transactions from our inflight list. SQLiteNode can call this, and
// it will return a map of transaction IDs to pairs of (query, hash), so that those transactions can be replicated
// out to peers.
map<uint64_t, pair<string,string>> getCommittedTransactions();
map<uint64_t, tuple<string,string, uint64_t>> getCommittedTransactions();

// The whitelist is either nullptr, in which case the feature is disabled, or it's a map of table names to sets of
// column names that are allowed for reading. Using whitelist at all put the database handle into a more
Expand All @@ -221,6 +221,9 @@ class SQLite {
// checkpoints to complete, thus causing an endless cycle of interrupted transactions.
void disableCheckpointInterruptForNextTransaction() { _enableCheckpointInterrupt = false; }

// public read-only accessor for _dbCountAtStart.
uint64_t getDBCountAtStart() const;

private:

// This structure contains all of the data that's shared between a set of SQLite objects that share the same
Expand Down Expand Up @@ -291,7 +294,7 @@ class SQLite {
//
// This is a map of all currently "in flight" transactions. These are transactions for which a `prepare()` has been
// called to generate a journal row, but have not yet been sent to peers.
map<uint64_t, pair<string, string>> _inFlightTransactions;
map<uint64_t, tuple<string, string, uint64_t>> _inFlightTransactions;

// This mutex prevents any thread starting a new transaction when locked. The checkpoint thread will lock it
// when required to make sure it can get exclusive use of the DB.
Expand Down Expand Up @@ -338,6 +341,11 @@ class SQLite {
string _uncommittedQuery;
string _uncommittedHash;

// The latest transaction ID at the start of the current transaction (note: it is allowed for this to be *higher*
// than the state inside the transaction, if another thread committed to the DB while we were in
// `beginTransaction`).
uint64_t _dbCountAtStart;

// The name of the journal table
string _journalName;

Expand Down
61 changes: 31 additions & 30 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,31 @@ void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, SQLite&
string name = node.name;
SINFO("Replicate thread started: " << command.methodLine);
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
uint64_t currentCount = command.calcU64("NewCount") - 1;
uint64_t newCount = command.calcU64("NewCount");
uint64_t currentCount = newCount - 1;

// We have to wait for the DB to come up-to-date for QUORUM transactions.
SINFO("Thread for commit " << currentCount);
// Transactions are either ASYNC or QUORUM. QUORUM transactions can only start when the DB is completely
// up-to-date. ASYNC transactions can start as soon as the DB is at `dbCountAtStart` (the same value that
// the DB was at when the transaction began on leader).
bool quorum = !SStartsWith(command["ID"], "ASYNC");
if (quorum) {
SINFO("Waiting on DB");
while (true) {
SQLiteSequentialNotifier::RESULT result = node._localCommitNotifier.waitFor(currentCount);
if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) {
// This should be impossible.
SERROR("Got UNKNOWN result from waitFor, which shouldn't happen");
} else if (result == SQLiteSequentialNotifier::RESULT::COMPLETED) {
// Success case.
break;
} else if (result == SQLiteSequentialNotifier::RESULT::CANCELED) {
SINFO("_localCommitNotifier.waitFor canceled early, returning.");
return;
} else if (result == SQLiteSequentialNotifier::RESULT::CHECKPOINT_REQUIRED) {
SINFO("Checkpoint required while waiting for DB to come up-to-date. Just waiting again.");
continue;
} else {
SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?");
}
uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calc("dbCountAtStart") : currentCount;
SINFO("Thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")");
while (true) {
SQLiteSequentialNotifier::RESULT result = node._localCommitNotifier.waitFor(waitForCount);
if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) {
// This should be impossible.
SERROR("Got UNKNOWN result from waitFor, which shouldn't happen");
} else if (result == SQLiteSequentialNotifier::RESULT::COMPLETED) {
// Success case.
break;
} else if (result == SQLiteSequentialNotifier::RESULT::CANCELED) {
SINFO("_localCommitNotifier.waitFor canceled early, returning.");
return;
} else if (result == SQLiteSequentialNotifier::RESULT::CHECKPOINT_REQUIRED) {
SINFO("Checkpoint required while waiting for DB to come up-to-date. Just waiting again.");
continue;
} else {
SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?");
}
}

Expand All @@ -147,7 +148,7 @@ void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, SQLite&
if (attemptCount > 1) {
SINFO("Commit attempt number " << attemptCount << " for concurrent replication.");
}
SINFO("BEGIN for commit " << currentCount);
SINFO("BEGIN for commit " << newCount);
node.handleBeginTransaction(db, peer, command);

// Now we need to wait for the DB to be up-to-date (if the transaction is QUORUM, we can
Expand Down Expand Up @@ -351,12 +352,14 @@ void SQLiteNode::_sendOutstandingTransactions() {
if (id <= _lastSentTransactionID) {
continue;
}
string& query = i.second.first;
string& hash = i.second.second;
string& query = get<0>(i.second);
string& hash = get<1>(i.second);
int64_t dbCountAtStart = get<2>(i.second);
SData transaction("BEGIN_TRANSACTION");
transaction["NewCount"] = to_string(id);
transaction["NewHash"] = hash;
transaction["leaderSendTime"] = sendTime;
transaction["dbCountAtStart"] = to_string(dbCountAtStart);
transaction["ID"] = "ASYNC_" + to_string(id);
transaction.content = query;
_sendToAllPeers(transaction, true); // subscribed only
Expand Down Expand Up @@ -1012,10 +1015,6 @@ bool SQLiteNode::update() {
// We'll send the commit count to peers.
uint64_t commitCount = _db.getCommitCount();

// If there was nothing changed, then we shouldn't have anything to commit.
// Except that this is allowed right now.
// SASSERT(!_db.getUncommittedQuery().empty());

// There's no handling for a failed prepare. This should only happen if the DB has been corrupted or
// something catastrophic like that.
SASSERT(_db.prepare());
Expand All @@ -1027,8 +1026,9 @@ bool SQLiteNode::update() {
transaction.set("NewCount", commitCount + 1);
transaction.set("NewHash", _db.getUncommittedHash());
transaction.set("leaderSendTime", to_string(STimeNow()));
transaction.set("dbCountAtStart", to_string(_db.getDBCountAtStart()));
if (_commitConsistency == ASYNC) {
transaction["ID"] = "ASYNC_" + to_string(_lastSentTransactionID + 1);
transaction.set("ID", "ASYNC_" + to_string(_lastSentTransactionID + 1));
} else {
transaction.set("ID", _lastSentTransactionID + 1);
}
Expand Down Expand Up @@ -1547,6 +1547,7 @@ void SQLiteNode::_onMESSAGE(Peer* peer, const SData& message) {
transaction.set("NewCount", commitCount + 1);
transaction.set("NewHash", _db.getUncommittedHash());
transaction.set("leaderSendTime", to_string(STimeNow()));
transaction.set("dbCountAtStart", to_string(_db.getDBCountAtStart()));
transaction.set("ID", _lastSentTransactionID + 1);
transaction.content = _db.getUncommittedQuery();
_sendToPeer(peer, transaction);
Expand Down
25 changes: 13 additions & 12 deletions test/lib/BedrockTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,21 @@ BedrockTester::BedrockTester(int threadID, const map<string, string>& args,
}

map <string, string> defaultArgs = {
{"-db", _dbName},
{"-serverHost", _serverAddr},
{"-nodeName", "bedrock_test"},
{"-nodeHost", "localhost:" + to_string(_nodePort)},
{"-controlPort", "localhost:" + to_string(_controlPort)},
{"-priority", "200"},
{"-plugins", "db"},
{"-workerThreads", "8"},
{"-mmapSizeGB", "1"},
{"-maxJournalSize", "25000"},
{"-v", ""},
{"-db", _dbName},
{"-serverHost", _serverAddr},
{"-nodeName", "bedrock_test"},
{"-nodeHost", "localhost:" + to_string(_nodePort)},
{"-controlPort", "localhost:" + to_string(_controlPort)},
{"-priority", "200"},
{"-plugins", "db"},
{"-workerThreads", "8"},
{"-mmapSizeGB", "1"},
{"-maxJournalSize", "25000"},
{"-v", ""},
{"-quorumCheckpoint", "50"},
{"-enableMultiWrite", "true"},
{"-cacheSize", "1000"},
{"-cacheSize", "1000"},
{"-parallelReplication", "true"},
};

// Set defaults.
Expand Down

0 comments on commit cf0ebaf

Please sign in to comment.