diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 68f9a1bd5..80a7b38a7 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1719,11 +1719,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // If the server is strugling and falling behind on replication, we might have too many threads // causing a resource exhaustion. If that happens, all the transations that are already threaded // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that - // we're cancelling all threads that would need to wait on the transaction failed to be threaded. + // we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads + // that depend on the transaction that failed to be threaded. uint64_t cancelAfter = message.calcU64("NewCount") - 1; - _localCommitNotifier.cancel(cancelAfter); - _leaderCommitNotifier.cancel(cancelAfter); - _changeState(SQLiteNodeState::SEARCHING); + _changeState(SQLiteNodeState::SEARCHING, cancelAfter); SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter="<< cancelAfter << " e.what()=" << e.what()); STHROW("Error starting replicate thread so giving up and reconnecting."); } @@ -1929,7 +1928,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { } } -void SQLiteNode::_changeState(SQLiteNodeState newState) { +void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter) { SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount()); _localCommitNotifier.notifyThrough(_db.getCommitCount()); @@ -1940,7 +1939,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) { // If we were following, and now we're not, we give up an any replications. if (_state == SQLiteNodeState::FOLLOWING) { _replicationThreadsShouldExit = true; - uint64_t cancelAfter = _leaderCommitNotifier.getValue(); + uint64_t cancelAfter = commitIDToCancelAfter > 0 ? commitIDToCancelAfter : _leaderCommitNotifier.getValue(); SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter); _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 44aa6b8c6..8212ae18a 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -226,7 +226,7 @@ class SQLiteNode : public STCPManager { // Add required headers for messages being sent to peers. SData _addPeerHeaders(SData message); - void _changeState(SQLiteNodeState newState); + void _changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter = 0); // Handlers for transaction messages. void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict);