diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index b0c41a35c..3bdc7573a 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -23,6 +23,7 @@ SQLite::SQLite(const string& filename, int cacheSize, bool enableFullCheckpoints whitelist(nullptr), _maxJournalSize(maxJournalSize), _insideTransaction(false), + _dbCountAtStart(0), _beginElapsed(0), _readElapsed(0), _writeElapsed(0), @@ -229,6 +230,7 @@ SQLite::SQLite(const SQLite& from) : _journalSize(from._journalSize), _maxJournalSize(from._maxJournalSize), _insideTransaction(false), + _dbCountAtStart(0), _beginElapsed(0), _readElapsed(0), _writeElapsed(0), @@ -508,6 +510,13 @@ bool SQLite::beginTransaction(bool useCache, const string& transactionName) { uint64_t before = STimeNow(); _currentTransactionAttemptCount = -1; _insideTransaction = !SQuery(_db, "starting db transaction", "BEGIN CONCURRENT"); + + // Because some other thread could commit once we've run `BEGIN CONCURRENT`, this value can be slightly behind + // where we're actually able to start such that we know we shouldn't get a conflict if this commits successfully on + // leader. However, this is perfectly safe, it just adds the possibility that threads on followers wait for an + // extra transaction to complete before starting, which is an anti-optimization, but the alternative is wrapping + // the above `BEGIN CONCURRENT` and the `getCommitCount` call in a lock, which is worse. + _dbCountAtStart = getCommitCount(); _queryCache.clear(); _transactionName = transactionName; _useCache = useCache; @@ -757,7 +766,7 @@ bool SQLite::prepare() { string query = "INSERT INTO " + _journalName + " VALUES (" + SQ(commitCount + 1) + ", " + SQ(_uncommittedQuery) + ", " + SQ(_uncommittedHash) + " )"; // These are the values we're currently operating on, until we either commit or rollback. - _sharedData->_inFlightTransactions[commitCount + 1] = make_pair(_uncommittedQuery, _uncommittedHash); + _sharedData->_inFlightTransactions[commitCount + 1] = make_tuple(_uncommittedQuery, _uncommittedHash, _dbCountAtStart); int result = SQuery(_db, "updating journal", query); _prepareElapsed += STimeNow() - before; @@ -868,6 +877,7 @@ int SQLite::commit() { _useCache = false; _queryCount = 0; _cacheHits = 0; + _dbCountAtStart = 0; } else { if (_currentTransactionAttemptCount != -1) { string logLine = SWHEREAMI + "[row-level-locking] transaction attempt:" + @@ -885,11 +895,11 @@ int SQLite::commit() { return result; } -map> SQLite::getCommittedTransactions() { +map> SQLite::getCommittedTransactions() { SQLITE_COMMIT_AUTOLOCK; - // Maps a committed transaction ID to the correct query and hash for that transaction. - map> result; + // Maps a committed transaction ID to the correct query and hash, and starting commit count for that transaction. + map> result; // If nothing's been committed, nothing to return. if (_sharedData->_committedTransactionIDs.empty()) { @@ -962,6 +972,7 @@ void SQLite::rollback() { _useCache = false; _queryCount = 0; _cacheHits = 0; + _dbCountAtStart = 0; // Reset this to the default on any completion of the transaction, successful or not. _enableCheckpointInterrupt = true; @@ -1196,6 +1207,10 @@ bool SQLite::getUpdateNoopMode() const { return _noopUpdateMode; } +uint64_t SQLite::getDBCountAtStart() const { + return _dbCountAtStart; +} + void SQLite::addCheckpointListener(SQLite::CheckpointRequiredListener& listener) { lock_guard lock(_sharedData->_checkpointListenerMutex); _sharedData->_checkpointListeners.insert(&listener); diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index b8821a25d..2ece78735 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -195,7 +195,7 @@ class SQLite { // This atomically removes and returns committed transactions from our inflight list. SQLiteNode can call this, and // it will return a map of transaction IDs to pairs of (query, hash), so that those transactions can be replicated // out to peers. - map> getCommittedTransactions(); + map> getCommittedTransactions(); // The whitelist is either nullptr, in which case the feature is disabled, or it's a map of table names to sets of // column names that are allowed for reading. Using whitelist at all put the database handle into a more @@ -221,6 +221,9 @@ class SQLite { // checkpoints to complete, thus causing an endless cycle of interrupted transactions. void disableCheckpointInterruptForNextTransaction() { _enableCheckpointInterrupt = false; } + // public read-only accessor for _dbCountAtStart. + uint64_t getDBCountAtStart() const; + private: // This structure contains all of the data that's shared between a set of SQLite objects that share the same @@ -291,7 +294,7 @@ class SQLite { // // This is a map of all currently "in flight" transactions. These are transactions for which a `prepare()` has been // called to generate a journal row, but have not yet been sent to peers. - map> _inFlightTransactions; + map> _inFlightTransactions; // This mutex prevents any thread starting a new transaction when locked. The checkpoint thread will lock it // when required to make sure it can get exclusive use of the DB. @@ -338,6 +341,11 @@ class SQLite { string _uncommittedQuery; string _uncommittedHash; + // The latest transaction ID at the start of the current transaction (note: it is allowed for this to be *higher* + // than the state inside the transaction, if another thread committed to the DB while we were in + // `beginTransaction`). + uint64_t _dbCountAtStart; + // The name of the journal table string _journalName; diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index f606163d1..0c7f11f75 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -113,30 +113,31 @@ void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, SQLite& string name = node.name; SINFO("Replicate thread started: " << command.methodLine); if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) { - uint64_t currentCount = command.calcU64("NewCount") - 1; + uint64_t newCount = command.calcU64("NewCount"); + uint64_t currentCount = newCount - 1; - // We have to wait for the DB to come up-to-date for QUORUM transactions. - SINFO("Thread for commit " << currentCount); + // Transactions are either ASYNC or QUORUM. QUORUM transactions can only start when the DB is completely + // up-to-date. ASYNC transactions can start as soon as the DB is at `dbCountAtStart` (the same value that + // the DB was at when the transaction began on leader). bool quorum = !SStartsWith(command["ID"], "ASYNC"); - if (quorum) { - SINFO("Waiting on DB"); - while (true) { - SQLiteSequentialNotifier::RESULT result = node._localCommitNotifier.waitFor(currentCount); - if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) { - // This should be impossible. - SERROR("Got UNKNOWN result from waitFor, which shouldn't happen"); - } else if (result == SQLiteSequentialNotifier::RESULT::COMPLETED) { - // Success case. - break; - } else if (result == SQLiteSequentialNotifier::RESULT::CANCELED) { - SINFO("_localCommitNotifier.waitFor canceled early, returning."); - return; - } else if (result == SQLiteSequentialNotifier::RESULT::CHECKPOINT_REQUIRED) { - SINFO("Checkpoint required while waiting for DB to come up-to-date. Just waiting again."); - continue; - } else { - SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?"); - } + uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calc("dbCountAtStart") : currentCount; + SINFO("Thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")"); + while (true) { + SQLiteSequentialNotifier::RESULT result = node._localCommitNotifier.waitFor(waitForCount); + if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) { + // This should be impossible. + SERROR("Got UNKNOWN result from waitFor, which shouldn't happen"); + } else if (result == SQLiteSequentialNotifier::RESULT::COMPLETED) { + // Success case. + break; + } else if (result == SQLiteSequentialNotifier::RESULT::CANCELED) { + SINFO("_localCommitNotifier.waitFor canceled early, returning."); + return; + } else if (result == SQLiteSequentialNotifier::RESULT::CHECKPOINT_REQUIRED) { + SINFO("Checkpoint required while waiting for DB to come up-to-date. Just waiting again."); + continue; + } else { + SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?"); } } @@ -147,7 +148,7 @@ void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, SQLite& if (attemptCount > 1) { SINFO("Commit attempt number " << attemptCount << " for concurrent replication."); } - SINFO("BEGIN for commit " << currentCount); + SINFO("BEGIN for commit " << newCount); node.handleBeginTransaction(db, peer, command); // Now we need to wait for the DB to be up-to-date (if the transaction is QUORUM, we can @@ -351,12 +352,14 @@ void SQLiteNode::_sendOutstandingTransactions() { if (id <= _lastSentTransactionID) { continue; } - string& query = i.second.first; - string& hash = i.second.second; + string& query = get<0>(i.second); + string& hash = get<1>(i.second); + int64_t dbCountAtStart = get<2>(i.second); SData transaction("BEGIN_TRANSACTION"); transaction["NewCount"] = to_string(id); transaction["NewHash"] = hash; transaction["leaderSendTime"] = sendTime; + transaction["dbCountAtStart"] = to_string(dbCountAtStart); transaction["ID"] = "ASYNC_" + to_string(id); transaction.content = query; _sendToAllPeers(transaction, true); // subscribed only @@ -1012,10 +1015,6 @@ bool SQLiteNode::update() { // We'll send the commit count to peers. uint64_t commitCount = _db.getCommitCount(); - // If there was nothing changed, then we shouldn't have anything to commit. - // Except that this is allowed right now. - // SASSERT(!_db.getUncommittedQuery().empty()); - // There's no handling for a failed prepare. This should only happen if the DB has been corrupted or // something catastrophic like that. SASSERT(_db.prepare()); @@ -1027,8 +1026,9 @@ bool SQLiteNode::update() { transaction.set("NewCount", commitCount + 1); transaction.set("NewHash", _db.getUncommittedHash()); transaction.set("leaderSendTime", to_string(STimeNow())); + transaction.set("dbCountAtStart", to_string(_db.getDBCountAtStart())); if (_commitConsistency == ASYNC) { - transaction["ID"] = "ASYNC_" + to_string(_lastSentTransactionID + 1); + transaction.set("ID", "ASYNC_" + to_string(_lastSentTransactionID + 1)); } else { transaction.set("ID", _lastSentTransactionID + 1); } @@ -1547,6 +1547,7 @@ void SQLiteNode::_onMESSAGE(Peer* peer, const SData& message) { transaction.set("NewCount", commitCount + 1); transaction.set("NewHash", _db.getUncommittedHash()); transaction.set("leaderSendTime", to_string(STimeNow())); + transaction.set("dbCountAtStart", to_string(_db.getDBCountAtStart())); transaction.set("ID", _lastSentTransactionID + 1); transaction.content = _db.getUncommittedQuery(); _sendToPeer(peer, transaction); diff --git a/test/lib/BedrockTester.cpp b/test/lib/BedrockTester.cpp index 687d9816e..50432a77a 100644 --- a/test/lib/BedrockTester.cpp +++ b/test/lib/BedrockTester.cpp @@ -88,20 +88,21 @@ BedrockTester::BedrockTester(int threadID, const map& args, } map defaultArgs = { - {"-db", _dbName}, - {"-serverHost", _serverAddr}, - {"-nodeName", "bedrock_test"}, - {"-nodeHost", "localhost:" + to_string(_nodePort)}, - {"-controlPort", "localhost:" + to_string(_controlPort)}, - {"-priority", "200"}, - {"-plugins", "db"}, - {"-workerThreads", "8"}, - {"-mmapSizeGB", "1"}, - {"-maxJournalSize", "25000"}, - {"-v", ""}, + {"-db", _dbName}, + {"-serverHost", _serverAddr}, + {"-nodeName", "bedrock_test"}, + {"-nodeHost", "localhost:" + to_string(_nodePort)}, + {"-controlPort", "localhost:" + to_string(_controlPort)}, + {"-priority", "200"}, + {"-plugins", "db"}, + {"-workerThreads", "8"}, + {"-mmapSizeGB", "1"}, + {"-maxJournalSize", "25000"}, + {"-v", ""}, {"-quorumCheckpoint", "50"}, {"-enableMultiWrite", "true"}, - {"-cacheSize", "1000"}, + {"-cacheSize", "1000"}, + {"-parallelReplication", "true"}, }; // Set defaults.