Skip to content

Commit

Permalink
fix(Core/Database): Gracefully close database workers (azerothcore#20936
Browse files Browse the repository at this point in the history
)

* Gracefully close database workers

* Change init order. Such a silly compiler flag

* Fix hang if db connection failed to open
  • Loading branch information
Takenbacon authored Dec 18, 2024
1 parent c8734af commit a23b13d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 55 deletions.
25 changes: 14 additions & 11 deletions src/common/Threading/PCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ class ProducerConsumerQueue
std::mutex _queueLock;
std::queue<T> _queue;
std::condition_variable _condition;
std::atomic<bool> _cancel;
std::atomic<bool> _shutdown;

public:
ProducerConsumerQueue() : _shutdown(false) { }
ProducerConsumerQueue() : _cancel(false), _shutdown(false) { }

void Push(const T& value)
{
Expand All @@ -57,10 +58,8 @@ class ProducerConsumerQueue
{
std::lock_guard<std::mutex> lock(_queueLock);

if (_queue.empty() || _shutdown)
{
if (_queue.empty() || _cancel)
return false;
}

value = _queue.front();

Expand All @@ -75,21 +74,18 @@ class ProducerConsumerQueue

// we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
while (_queue.empty() && !_shutdown)
{
while (_queue.empty() && !_cancel && !_shutdown)
_condition.wait(lock);
}

if (_queue.empty() || _shutdown)
{
if (_queue.empty() || _cancel)
return;
}

value = _queue.front();

_queue.pop();
}

// Clears the queue and will immediately stop any consumers
void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock);
Expand All @@ -103,8 +99,15 @@ class ProducerConsumerQueue
_queue.pop();
}

_shutdown = true;
_cancel = true;

_condition.notify_all();
}

// Graceful stop, will wait for queue to become empty before stopping consumers
void Shutdown()
{
_shutdown = true;
_condition.notify_all();
}

Expand Down
7 changes: 1 addition & 6 deletions src/server/database/Database/DatabaseWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@ DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, M
{
_connection = connection;
_queue = newQueue;
_cancelationToken = false;
_workerThread = std::thread(&DatabaseWorker::WorkerThread, this);
}

DatabaseWorker::~DatabaseWorker()
{
_cancelationToken = true;

_queue->Cancel();

_workerThread.join();
}

Expand All @@ -47,7 +42,7 @@ void DatabaseWorker::WorkerThread()

_queue->WaitAndPop(operation);

if (_cancelationToken || !operation)
if (!operation)
return;

operation->SetConnection(_connection);
Expand Down
2 changes: 0 additions & 2 deletions src/server/database/Database/DatabaseWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class AC_DATABASE_API DatabaseWorker
void WorkerThread();
std::thread _workerThread;

std::atomic<bool> _cancelationToken;

DatabaseWorker(DatabaseWorker const& right) = delete;
DatabaseWorker& operator=(DatabaseWorker const& right) = delete;
};
Expand Down
7 changes: 6 additions & 1 deletion src/server/database/Database/DatabaseWorkerPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ uint32 DatabaseWorkerPool<T>::Open()
template <class T>
void DatabaseWorkerPool<T>::Close()
{
LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
LOG_INFO("sql.driver", "Closing down DatabasePool '{}'. Waiting for {} queries to finish...", GetDatabaseName(), _queue->Size());

// Gracefully close async query queue, worker threads will block when the destructor
// is called from the .clear() functions below until the queue is empty
_queue->Shutdown();

//! Closes the actualy MySQL connection.
_connections[IDX_ASYNC].clear();
Expand Down Expand Up @@ -432,6 +436,7 @@ uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConne
if (uint32 error = connection->Open())
{
// Failed to open a connection or invalid version, abort and cleanup
_queue->Cancel();
_connections[type].clear();
return error;
}
Expand Down
35 changes: 0 additions & 35 deletions src/server/game/World/World.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@
#include <boost/asio/ip/address.hpp>
#include <cmath>

namespace
{
TaskScheduler playersSaveScheduler;
}

std::atomic_long World::_stopEvent = false;
uint8 World::_exitCode = SHUTDOWN_EXIT_CODE;
uint32 World::m_worldLoopCounter = 0;
Expand Down Expand Up @@ -2497,11 +2492,6 @@ void World::Update(uint32 diff)
sScriptMgr->OnWorldUpdate(diff);
}

{
METRIC_TIMER("world_update_time", METRIC_TAG("type", "Update playersSaveScheduler"));
playersSaveScheduler.Update(diff);
}

{
METRIC_TIMER("world_update_time", METRIC_TAG("type", "Update metrics"));
// Stats logger update
Expand Down Expand Up @@ -2691,31 +2681,6 @@ void World::ShutdownServ(uint32 time, uint32 options, uint8 exitcode, const std:
_shutdownMask = options;
_exitCode = exitcode;

auto const& playersOnline = GetActiveSessionCount();

if (time < 5 && playersOnline)
{
// Set time to 5s for save all players
time = 5;
}

playersSaveScheduler.CancelAll();

if (time >= 5)
{
playersSaveScheduler.Schedule(Seconds(time - 5), [this](TaskContext /*context*/)
{
if (!GetActiveSessionCount())
{
LOG_INFO("server", "> No players online. Skip save before shutdown");
return;
}

LOG_INFO("server", "> Save players before shutdown server");
ObjectAccessor::SaveAllPlayers();
});
}

LOG_WARN("server", "Time left until shutdown/restart: {}", time);

///- If the shutdown time is 0, set m_stopEvent (except if shutdown is 'idle' with remaining sessions)
Expand Down

0 comments on commit a23b13d

Please sign in to comment.