From 92f7272215c86f192fca9208e3e3f013aefce46f Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 18:43:21 +0200 Subject: [PATCH 1/8] calculating the cancel after based on the NewCount we failed to create a thread for --- sqlitecluster/SQLiteNode.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 01cb75cd2..02a892e09 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1716,7 +1716,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { uint64_t threadAttemptStartTimestamp = STimeNow(); thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach(); } catch (const system_error& e) { - SWARN("Caught system_error starting _replicate thread with " << _replicationThreadCount.load() << " threads. e.what()=" << e.what()); + + uint64_t cancelAfter = message.calcU64("NewCount") - 1; + _localCommitNotifier.cancel(cancelAfter); + _leaderCommitNotifier.cancel(cancelAfter); + SWARN(format("Caught system_error starting _replicate thread with {} threads. cancelAfter={} e.what()={}", _replicationThreadCount.load(), cancelAfter, e.what())); STHROW("Error starting replicate thread so giving up and reconnecting."); } SDEBUG("Done spawning concurrent replicate thread: " << threadID); From 6daf2534ffc7137e60f4a3bc762d086a677fb9a0 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 18:45:57 +0200 Subject: [PATCH 2/8] adding comment --- sqlitecluster/SQLiteNode.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 02a892e09..316e97f4c 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1716,7 +1716,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { uint64_t threadAttemptStartTimestamp = STimeNow(); thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach(); } catch (const system_error& e) { - + // If the server is strugling and falling behind on replication, we might have too many threads + // causing a resource exhaustion. If that happens, all the transations that are already threaded + // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that + // we're cancelling all threads that would need to wait on the transaction that is failing. uint64_t cancelAfter = message.calcU64("NewCount") - 1; _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); From 4dac2561cc29800fc43471c8d83a52b3f277c493 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 18:46:29 +0200 Subject: [PATCH 3/8] adjust comment --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 316e97f4c..fd65588b8 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1719,7 +1719,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // If the server is strugling and falling behind on replication, we might have too many threads // causing a resource exhaustion. If that happens, all the transations that are already threaded // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that - // we're cancelling all threads that would need to wait on the transaction that is failing. + // we're cancelling all threads that would need to wait on the transaction failed to be threaded. uint64_t cancelAfter = message.calcU64("NewCount") - 1; _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); From d2053e55cf9c6a037d3cb424cd114c5fbb3af271 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 18:53:26 +0200 Subject: [PATCH 4/8] removing format --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index fd65588b8..01240f718 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1723,7 +1723,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { uint64_t cancelAfter = message.calcU64("NewCount") - 1; _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); - SWARN(format("Caught system_error starting _replicate thread with {} threads. cancelAfter={} e.what()={}", _replicationThreadCount.load(), cancelAfter, e.what())); + SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter=" << cancelAfter << " e.what()=" << e.what()); STHROW("Error starting replicate thread so giving up and reconnecting."); } SDEBUG("Done spawning concurrent replicate thread: " << threadID); From a8ef77d05bdfa31d561af1f216e60a56d6e3eb0a Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 19:04:36 +0200 Subject: [PATCH 5/8] change state to searching --- sqlitecluster/SQLiteNode.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 01240f718..68f9a1bd5 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1723,7 +1723,8 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { uint64_t cancelAfter = message.calcU64("NewCount") - 1; _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); - SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter=" << cancelAfter << " e.what()=" << e.what()); + _changeState(SQLiteNodeState::SEARCHING); + SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter="<< cancelAfter << " e.what()=" << e.what()); STHROW("Error starting replicate thread so giving up and reconnecting."); } SDEBUG("Done spawning concurrent replicate thread: " << threadID); From a6f3e5358261bc095b092816d6e188ea670b4e42 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 19:30:59 +0200 Subject: [PATCH 6/8] drying the code --- sqlitecluster/SQLiteNode.cpp | 11 +++++------ sqlitecluster/SQLiteNode.h | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 68f9a1bd5..80a7b38a7 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1719,11 +1719,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // If the server is strugling and falling behind on replication, we might have too many threads // causing a resource exhaustion. If that happens, all the transations that are already threaded // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that - // we're cancelling all threads that would need to wait on the transaction failed to be threaded. + // we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads + // that depend on the transaction that failed to be threaded. uint64_t cancelAfter = message.calcU64("NewCount") - 1; - _localCommitNotifier.cancel(cancelAfter); - _leaderCommitNotifier.cancel(cancelAfter); - _changeState(SQLiteNodeState::SEARCHING); + _changeState(SQLiteNodeState::SEARCHING, cancelAfter); SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter="<< cancelAfter << " e.what()=" << e.what()); STHROW("Error starting replicate thread so giving up and reconnecting."); } @@ -1929,7 +1928,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { } } -void SQLiteNode::_changeState(SQLiteNodeState newState) { +void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter) { SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount()); _localCommitNotifier.notifyThrough(_db.getCommitCount()); @@ -1940,7 +1939,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) { // If we were following, and now we're not, we give up an any replications. if (_state == SQLiteNodeState::FOLLOWING) { _replicationThreadsShouldExit = true; - uint64_t cancelAfter = _leaderCommitNotifier.getValue(); + uint64_t cancelAfter = commitIDToCancelAfter > 0 ? commitIDToCancelAfter : _leaderCommitNotifier.getValue(); SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter); _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 44aa6b8c6..8212ae18a 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -226,7 +226,7 @@ class SQLiteNode : public STCPManager { // Add required headers for messages being sent to peers. SData _addPeerHeaders(SData message); - void _changeState(SQLiteNodeState newState); + void _changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter = 0); // Handlers for transaction messages. void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict); From fbb1f118a62478afab6c5147e0026c2ae64198ef Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 19:35:39 +0200 Subject: [PATCH 7/8] fixing comment --- sqlitecluster/SQLiteNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 80a7b38a7..211695816 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1717,7 +1717,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach(); } catch (const system_error& e) { // If the server is strugling and falling behind on replication, we might have too many threads - // causing a resource exhaustion. If that happens, all the transations that are already threaded + // causing a resource exhaustion. If that happens, all the transactions that are already threaded // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that // we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads // that depend on the transaction that failed to be threaded. From d4fbfc9a949c57dc215dcdbb335af320ecaedbb6 Mon Sep 17 00:00:00 2001 From: Daniel Silva Date: Tue, 4 Jun 2024 19:40:07 +0200 Subject: [PATCH 8/8] addressing comments --- sqlitecluster/SQLiteNode.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 211695816..1d20e46c8 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1721,9 +1721,8 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that // we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads // that depend on the transaction that failed to be threaded. - uint64_t cancelAfter = message.calcU64("NewCount") - 1; - _changeState(SQLiteNodeState::SEARCHING, cancelAfter); - SWARN("Caught system_error starting _replicate thread with" << _replicationThreadCount.load() << " threads. cancelAfter="<< cancelAfter << " e.what()=" << e.what()); + _changeState(SQLiteNodeState::SEARCHING, message.calcU64("NewCount") - 1); + SWARN("Caught system_error starting _replicate thread with " << _replicationThreadCount.load() << " threads. e.what()=" << e.what()); STHROW("Error starting replicate thread so giving up and reconnecting."); } SDEBUG("Done spawning concurrent replicate thread: " << threadID); @@ -1939,7 +1938,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance // If we were following, and now we're not, we give up an any replications. if (_state == SQLiteNodeState::FOLLOWING) { _replicationThreadsShouldExit = true; - uint64_t cancelAfter = commitIDToCancelAfter > 0 ? commitIDToCancelAfter : _leaderCommitNotifier.getValue(); + uint64_t cancelAfter = commitIDToCancelAfter ? commitIDToCancelAfter : _leaderCommitNotifier.getValue(); SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter); _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter);