Skip to content

Commit

Permalink
Merge pull request dealii#17320 from bangerth/try-emplace-task
Browse files Browse the repository at this point in the history
Add TaskResult::try_emplace_task().
  • Loading branch information
kronbichler authored Jul 21, 2024
2 parents 4e3ab59 + 944f5c8 commit 2bc7e48
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 5 deletions.
102 changes: 97 additions & 5 deletions include/deal.II/base/task_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,72 @@ namespace Threads
* result are not put into the current object, it still means that that task
* is working on data that is changing as it is working, with obviously
* unpredictable results.)
*
* @note As mentioned above, it is a considered a bug to assign a task to
* a TaskResult object that already has a task running. That means that
* you will get in trouble if multiple threads (or multiple tasks running
* concurrently) call this operator at the same time: One of these
* threads will set a task, and the other threads will try the same but
* because the background task is likely still running will encounter
* an error. As a consequence, you cannot easily use this operator
* from multiple threads. Use try_emplace_task() in that case.
*/
void
operator=(const Task<T> &t);

/**
* This function is similar to `operator=()` in that it associates a
* task with the current object if one has not been associated so far,
* but does not do so if a task is already assigned. For this to work,
* the object provided as argument to this function must be a "callable"
* (i.e., a object `creator` that can be called on a separate task via
* its `operator()`), rather than a Task object itself.
*
* As a consequence, code such as the following will work:
* @code
* class LazyInt
* {
* public:
* LazyInt () {} // no task assigned to task_result
*
* int get () {
* task_result.try_emplace_task( []() { return 42; } );
* return task_result.value();
* }
*
* private:
* TaskResult<int> task_result;
* }
* @endcode
* In this context, the `LazyInt::get()` function is thread-safe, i.e., it
* can be called more than once from multiple threads. One of these threads
* -- namely, the first one to get into `try_emplace_task()` -- will create
* a task that calls the lambda function that returns `42` whereas all of
* the others will simply proceed to the call to `value()` that waits for
* the task to finish.
*
* On the other hand, implementing the `get()` function as
* @code
* int get () {
* task_result = Threads::new_task( []() { return 42; } );
* return task_result.value();
* }
* @endcode
* would have led to the errors mentioned above because `operator=` called
* from separate threads would have emplaced a task while another task
* is (possibly) still running.
*
* @note There is nothing that prevents you from concurrently calling
* this function with *different* callables as arguments, i.e., with
* functions that create non-identical objects. That is obviously
* not the intent here since you can't control which callable will
* eventually be turned into a task.
*/
template <typename Callable>
void
try_emplace_task(const Callable &creator) const
DEAL_II_CXX20_REQUIRES((std::is_invocable_r_v<T, Callable>));

/**
* Reset the current object to a state as if it had been
* default-constructed. For the same reasons as outlined
Expand Down Expand Up @@ -357,6 +419,35 @@ namespace Threads
}


template <typename T>
template <typename Callable>
void
TaskResult<T>::try_emplace_task(const Callable &creator) const
DEAL_II_CXX20_REQUIRES((std::is_invocable_r_v<T, Callable>))
{
// If the result is already available, simply return.
if (result_is_available)
return;

// If the result was not available above, we need to go under a lock
// to check that perhaps it has appeared in the meantime. We again use
// the double-checking pattern:
{
std::lock_guard<std::mutex> lock(mutex);
if (result_is_available)
return;
else
// If there is no result, but there is a task, some other thread has
// emplaced it in the meantime and we can simply return
if (task.has_value())
return;
else
// If there is no task object, emplace one:
task = Threads::new_task(creator);
}
}



template <typename T>
inline void
Expand Down Expand Up @@ -401,11 +492,12 @@ namespace Threads
// If we have waited before, then return immediately:
if (result_is_available)
return;
else // If we have not waited, wait now. We need to use the double-checking
// pattern to ensure that if two threads get to this place at the same
// time, one returns right away while the other does the work. Note
// that this happens under the lock, so only one thread gets to be in
// this code block at the same time:
else
// If we have not waited, wait now. We need to use the double-checking
// pattern to ensure that if two threads get to this place at the same
// time, one returns right away while the other does the work. Note
// that this happens under the lock, so only one thread gets to be in
// this code block at the same time:
{
std::lock_guard<std::mutex> lock(mutex);
if (result_is_available)
Expand Down
69 changes: 69 additions & 0 deletions tests/multithreading/task_result_03.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// ------------------------------------------------------------------------
//
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (C) 2009 - 2024 by the deal.II authors
//
// This file is part of the deal.II library.
//
// Part of the source code is dual licensed under Apache-2.0 WITH
// LLVM-exception OR LGPL-2.1-or-later. Detailed license information
// governing the source code and code contributions can be found in
// LICENSE.md and CONTRIBUTING.md at the top level directory of deal.II.
//
// ------------------------------------------------------------------------


// Test TaskResult::try_emplace_task()

#include <deal.II/base/task_result.h>

#include "../tests.h"



int
main()
{
initlog();

// Create a default-constructed TaskResult object not associated with a task:
Threads::TaskResult<int> task_result;

// Next run 10 threads in parallel that all try to set a task
// object. Calling TaskResult::operator=() will not work because
// some thread will try to set a task while there is already a
// running task, and that is considered an error.
//
// On the other hand, exactly one of the threads will succeed in
// emplacing a task with try_emplace_task().
//
// To make things a bit more interesting, and make sure that there
// really is contention, we let all the threads spin until a
// condition variable changes state. In other words, they will all
// try *at more or less the same time* to call try_emplace_task().
std::atomic<bool> condition(false);

const unsigned int n_threads = 10;
std::list<std::thread> threads;
for (unsigned int t = 0; t < n_threads; ++t)
threads.emplace_back(std::thread([&]() {
while (condition.load() == false)
/* keep checking */;

task_result.try_emplace_task([]() { return 42; });
}));

// Now let all of these threads go to work:
condition = true;

// Wait for all of these threads to finish trying to emplace a task:
for (auto &t : threads)
t.join();

// Wait for that background task to finish, then get its
// value:
deallog << task_result.value() << std::endl;

// Ensure that we can continue to ask for the return value:
deallog << task_result.value() << std::endl;
}
3 changes: 3 additions & 0 deletions tests/multithreading/task_result_03.output
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

DEAL::42
DEAL::42

0 comments on commit 2bc7e48

Please sign in to comment.