Skip to content

Commit

Permalink
Merge pull request #1842 from arcaneframework/dev/gg-begin-support-to…
Browse files Browse the repository at this point in the history
…-concurrent-usage-of-runqueue

Begin support to allow concurrent usage of RunQueue for RunCommand creation
  • Loading branch information
grospelliergilles authored Dec 13, 2024
2 parents cd59698 + 86de69b commit 2be15c0
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 43 deletions.
3 changes: 3 additions & 0 deletions arcane/src/arcane/accelerator/core/RunCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ RunCommand::
RunCommand(const RunQueue& run_queue)
: m_p(run_queue._getCommandImpl())
{
m_p->m_has_living_run_command = true;
}

/*---------------------------------------------------------------------------*/
Expand All @@ -39,6 +40,8 @@ RunCommand(const RunQueue& run_queue)
RunCommand::
~RunCommand()
{
m_p->m_has_living_run_command = false;
m_p->_notifyDestroyRunCommand();
}

/*---------------------------------------------------------------------------*/
Expand Down
3 changes: 2 additions & 1 deletion arcane/src/arcane/accelerator/core/RunCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class ARCANE_ACCELERATOR_CORE_EXPORT RunCommand

public:

RunCommand(RunCommand&& command) = delete;
RunCommand(const RunCommand&) = delete;
RunCommand& operator=(const RunCommand&) = delete;
RunCommand& operator=(RunCommand&&) = delete;

public:

Expand Down Expand Up @@ -132,7 +134,6 @@ class ARCANE_ACCELERATOR_CORE_EXPORT RunCommand

private:

//RunQueue m_run_queue;
impl::RunCommandImpl* m_p;
};

Expand Down
17 changes: 17 additions & 0 deletions arcane/src/arcane/accelerator/core/RunCommandImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ notifyEndLaunchKernel()
// TODO: utiliser la bonne stream en séquentiel
m_stop_event->recordQueue(stream);
stream->notifyEndLaunchKernel(*this);
m_queue->_addRunningCommand(this);
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -221,6 +222,8 @@ _reset()
m_loop_one_exec_stat.reset();
m_loop_one_exec_stat_ptr = nullptr;
m_has_been_launched = false;
m_has_living_run_command = false;
m_may_be_put_in_pool = false;
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -288,6 +291,20 @@ _getOrCreateReduceMemoryImpl()
return new ReduceMemoryImpl(this);
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/
/*!
* \brief Méthode appelée quand l'instance RunCommand associée est détruite.
*/
void RunCommandImpl::
_notifyDestroyRunCommand()
{
// Si la commande n'a pas été lancé, il faut la remettre dans le pool
// des commandes de la file (sinon on aura une fuite mémoire)
if (!m_has_been_launched || m_may_be_put_in_pool)
m_queue->_putInCommandPool(this);
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

Expand Down
23 changes: 23 additions & 0 deletions arcane/src/arcane/accelerator/core/RunQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,29 @@ memoryRessource() const
/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueue::
setConcurrentCommandCreation(bool v)
{
_checkNotNull();
if (isAcceleratorPolicy())
ARCANE_FATAL("setting concurrent command creation is not supported for RunQueue running on accelerator");
m_p->setConcurrentCommandCreation(v);
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

bool RunQueue::
isConcurrentCommandCreation() const
{
if (m_p)
return m_p->isConcurrentCommandCreation();
return false;
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

extern "C++" ePointerAccessibility
getPointerAccessibility(RunQueue* queue, const void* ptr, PointerAttribute* ptr_attr)
{
Expand Down
21 changes: 20 additions & 1 deletion arcane/src/arcane/accelerator/core/RunQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ class ARCANE_ACCELERATOR_CORE_EXPORT RunQueue
eMemoryRessource memoryRessource() const;
//!@}

public:

/*!
* \brief Indique si on autorise la création de RunCommand pour cette instance
* depuis plusieurs threads.
*
* Cela nécessite d'utiliser un verrou (comme std::mutex) et peut dégrader les
* performances. Le défaut est \a false.
*
* Cette méthode n'est pas supportée pour les files qui sont associées
* à des accélérateurs (isAcceleratorPolicy()==true)
*/
void setConcurrentCommandCreation(bool v);
//! Indique si la création concurrente de plusieurs RunCommand est autorisée
bool isConcurrentCommandCreation() const;

public:

/*!
Expand All @@ -209,6 +225,10 @@ class ARCANE_ACCELERATOR_CORE_EXPORT RunQueue
ARCANE_DEPRECATED_REASON("Y2024: Use toCudaNativeStream(), toHipNativeStream() or toSyclNativeStream() instead")
void* platformStream() const;

public:

impl::RunQueueImpl* _internalImpl() const;

private:

// Les méthodes de création sont réservée à Runner.
Expand All @@ -224,7 +244,6 @@ class ARCANE_ACCELERATOR_CORE_EXPORT RunQueue
impl::IRunnerRuntime* _internalRuntime() const;
impl::IRunQueueStream* _internalStream() const;
impl::RunCommandImpl* _getCommandImpl() const;
impl::RunQueueImpl* _internalImpl() const;
void _checkNotNull() const;

// Pour VariableViewBase
Expand Down
175 changes: 161 additions & 14 deletions arcane/src/arcane/accelerator/core/RunQueueImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "arcane/utils/FatalErrorException.h"
#include "arcane/utils/MemoryUtils.h"
#include "arcane/utils/SmallArray.h"

#include "arcane/accelerator/core/internal/IRunnerRuntime.h"
#include "arcane/accelerator/core/internal/IRunQueueStream.h"
Expand All @@ -26,6 +27,8 @@
#include "arcane/accelerator/core/DeviceId.h"
#include "arcane/accelerator/core/RunQueueEvent.h"

#include <unordered_set>

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

Expand All @@ -35,6 +38,36 @@ namespace Arcane::Accelerator::impl
/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

//! Verrou pour le pool de RunCommand en multi-thread.
class RunQueueImpl::Lock
{
public:

explicit Lock(RunQueueImpl* p)
{
if (p->m_use_pool_mutex) {
m_mutex = p->m_pool_mutex.get();
if (m_mutex) {
m_mutex->lock();
}
}
}
~Lock()
{
if (m_mutex)
m_mutex->unlock();
}
Lock(const Lock&) = delete;
Lock& operator=(const Lock&) = delete;

private:

std::mutex* m_mutex = nullptr;
};

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

RunQueueImpl::
RunQueueImpl(RunnerImpl* runner_impl, Int32 id, const RunQueueBuildInfo& bi)
: m_runner_impl(runner_impl)
Expand All @@ -51,11 +84,40 @@ RunQueueImpl(RunnerImpl* runner_impl, Int32 id, const RunQueueBuildInfo& bi)
RunQueueImpl::
~RunQueueImpl()
{
delete m_queue_stream;
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
_freeCommandsInPool()
{
bool is_check = arcaneIsCheck();
std::unordered_set<RunCommandImpl*> command_set;
while (!m_run_command_pool.empty()) {
RunCommand::_internalDestroyImpl(m_run_command_pool.top());
RunCommandImpl* c = m_run_command_pool.top();
if (is_check) {
if (command_set.find(c) != command_set.end())
std::cerr << "Command is present several times in the command pool\n";
command_set.insert(c);
}
RunCommand::_internalDestroyImpl(c);
m_run_command_pool.pop();
}
delete m_queue_stream;
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
_destroy(RunQueueImpl* q)
{
q->_freeCommandsInPool();
delete q;
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -180,19 +242,19 @@ create(RunnerImpl* r, const RunQueueBuildInfo& bi)
RunCommandImpl* RunQueueImpl::
_internalCreateOrGetRunCommandImpl()
{
auto& pool = m_run_command_pool;
RunCommandImpl* p = nullptr;

// TODO: rendre thread-safe
if (!pool.empty()) {
p = pool.top();
pool.pop();
{
auto& pool = m_run_command_pool;
Lock my_lock(this);
if (!pool.empty()) {
p = pool.top();
pool.pop();
}
}
else {
if (!p)
p = RunCommand::_internalCreateImpl(this);
}
p->_reset();
m_active_run_command_list.add(p);
return p;
}

Expand All @@ -207,11 +269,75 @@ _internalCreateOrGetRunCommandImpl()
void RunQueueImpl::
_internalFreeRunningCommands()
{
for (RunCommandImpl* p : m_active_run_command_list) {
p->notifyEndExecuteKernel();
m_run_command_pool.push(p);
SmallArray<RunCommandImpl*> command_list;
if (m_use_pool_mutex) {
// Recopie les commandes dans un tableau local car m_active_run_command_list
// peut être modifié par un autre thread.
{
Lock my_lock(this);
for (RunCommandImpl* p : m_active_run_command_list) {
command_list.add(p);
}
m_active_run_command_list.clear();
}
for (RunCommandImpl* p : command_list) {
p->notifyEndExecuteKernel();
}
{
Lock my_lock(this);
for (RunCommandImpl* p : command_list) {
_checkPutCommandInPoolNoLock(p);
}
}
}
m_active_run_command_list.clear();
else {
for (RunCommandImpl* p : m_active_run_command_list) {
p->notifyEndExecuteKernel();
_checkPutCommandInPoolNoLock(p);
}
m_active_run_command_list.clear();
}
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/
/*!
* \brief Remet la commande dans le pool si possible.
*
* On ne remet pas la commande dans le pool tant qu'il y a une RunCommand
* qui y fait référence. Dans ce cas la commande sera remise dans le pool
* lors de l'appel au destructeur de RunCommand. Cela est nécessaire pour
* gérer le cas où une RunCommand est créée mais n'est jamais utilisée car
* dans ce cas elle ne sera jamais dans m_active_run_command_list et ne
* sera pas traitée lors de l'appel à _internalFreeRunningCommands().
*/
void RunQueueImpl::
_checkPutCommandInPoolNoLock(RunCommandImpl* p)
{
if (p->m_has_living_run_command)
p->m_may_be_put_in_pool = true;
else
m_run_command_pool.push(p);
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
_addRunningCommand(RunCommandImpl* p)
{
Lock my_lock(this);
m_active_run_command_list.add(p);
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
_putInCommandPool(RunCommandImpl* p)
{
Lock my_lock(this);
m_run_command_pool.push(p);
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -246,6 +372,27 @@ _reset(RunQueueImpl* p)
/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
setConcurrentCommandCreation(bool v)
{
m_use_pool_mutex = v;
if (!m_pool_mutex.get())
m_pool_mutex = std::make_unique<std::mutex>();
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

void RunQueueImpl::
dumpStats(std::ostream& ostr) const
{
ostr << "nb_pool=" << m_run_command_pool.size()
<< " nb_active=" << m_active_run_command_list.size() << "\n";
}

/*---------------------------------------------------------------------------*/
/*---------------------------------------------------------------------------*/

} // namespace Arcane::Accelerator::impl

/*---------------------------------------------------------------------------*/
Expand Down
3 changes: 2 additions & 1 deletion arcane/src/arcane/accelerator/core/Runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ _freePool()
{
RunQueueImplStack& s = m_run_queue_pool;
while (!s.empty()) {
delete s.top();
RunQueueImpl* q = s.top();
RunQueueImpl::_destroy(q);
s.pop();
}
}
Expand Down
Loading

0 comments on commit 2be15c0

Please sign in to comment.