Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClockWaiter and ClockConditionalVariable #2691

Open
wants to merge 2 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,17 @@ class Clock
ros_time_is_active();

/**
* Deprecated. This API is broken, as there is no way to get a deep
* copy of a clock. Therefore one can experience spurious wakeups triggered
* by some other instance of a clock.
*
* Cancels an ongoing or future sleep operation of one thread.
*
* This function can be used by one thread, to wakeup another thread that is
* blocked using any of the sleep_ or wait_ methods of this class.
*/
RCLCPP_PUBLIC
[[deprecated("Use ClockConditionalVariable")]]
void
cancel_sleep_or_wait();

Expand Down Expand Up @@ -260,6 +265,108 @@ class Clock
std::shared_ptr<Impl> impl_;
};

/**
* A synchronization primitive, equal to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* Note, this class does not handle shutdowns, if you want to
* haven them handles as well, use ClockConditionalVariable.
*/
class ClockWaiter
{
private:
class ClockWaiterImpl;
std::unique_ptr<ClockWaiterImpl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter)

RCLCPP_PUBLIC
ClockWaiter(const rclcpp::Clock::SharedPtr &clock);

RCLCPP_PUBLIC
~ClockWaiter();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*/
RCLCPP_PUBLIC
bool
wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred );

/**
* Notify the blocked thread, that is should reevaluate the wakeup condition.
* E.g. the given pred function in wait_until shall be reevaluated.
*/
RCLCPP_PUBLIC
void notify_one();
};


/**
* A synchronization primitive, similar to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* This primitive will wake up if the context was shut down.
*/
class ClockConditionalVariable
{
class Impl;
std::unique_ptr<Impl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable)

RCLCPP_PUBLIC
ClockConditionalVariable(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context());
RCLCPP_PUBLIC
~ClockConditionalVariable();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* The given lock must be created using the mutex returned my mutex().
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*
* @return true if until was reached.
*/
RCLCPP_PUBLIC
bool wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred);

/**
* Notify the blocked thread, that is should reevaluate the wakeup condition.
* E.g. the given pred function in wait_until shall be reevaluated.
*/
RCLCPP_PUBLIC
void notify_one();

/**
* Returns the internal mutex. In order to be race free with the context shutdown,
* this mutex must be used for the wait_until call.
*/
RCLCPP_PUBLIC
std::mutex &mutex();
};



} // namespace rclcpp

#endif // RCLCPP__CLOCK_HPP_
203 changes: 203 additions & 0 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,207 @@ Clock::create_jump_callback(
// *INDENT-ON*
}

class ClockWaiter::ClockWaiterImpl
{
private:
std::condition_variable cv_;

rclcpp::Clock::SharedPtr clock_;
bool time_source_changed_ = false;
std::function<void (const rcl_time_jump_t &)> post_time_jump_callback;

bool
wait_until_system_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, pred);
}

bool
wait_until_steady_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
// Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch
const rclcpp::Time rcl_entry = clock_->now();
const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
const rclcpp::Duration delta_t = abs_time - rcl_entry;
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

return cv_.wait_until(lock, chrono_until, pred);
}


bool
wait_until_ros_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
// - Trigger via on_clock_change to detect if time source changes, to invalidate sleep
rcl_jump_threshold_t threshold;
threshold.on_clock_change = true;
// 0 is disable, so -1 and 1 are smallest possible time changes
threshold.min_backward.nanoseconds = -1;
threshold.min_forward.nanoseconds = 1;

time_source_changed_ = false;

auto clock_handler = clock_->create_jump_callback(
nullptr,
post_time_jump_callback,
threshold);

if (!clock_->ros_time_is_active()) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, [this, &pred] () {
return time_source_changed_ || pred();
});
}


// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
cv_.wait(lock, [this, &pred, &abs_time] () {
return clock_->now() >= abs_time || time_source_changed_ || pred();
});

return clock_->now() < abs_time;
}


public:
ClockWaiterImpl(const rclcpp::Clock::SharedPtr &clock) :
clock_(clock)
{
}

bool
wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
switch(clock_->get_clock_type())
{
case RCL_CLOCK_UNINITIALIZED:
throw std::runtime_error("Error, wait on uninitialized clock called");
case RCL_ROS_TIME:
return wait_until_ros_time(lock, abs_time, pred);
break;
case RCL_STEADY_TIME:
return wait_until_steady_time(lock, abs_time, pred);
break;
case RCL_SYSTEM_TIME:
return wait_until_system_time(lock, abs_time, pred);
break;
}

return false;
}

void notify_one()
{
cv_.notify_one();
}
};

ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr &clock) :
impl_(std::make_unique<ClockWaiterImpl>(clock))
{
}

ClockWaiter::~ClockWaiter() = default;

bool ClockWaiter::wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
return impl_->wait_until(lock, abs_time, pred);
}

void ClockWaiter::notify_one()
{
impl_->notify_one();
}

class ClockConditionalVariable::Impl
{
std::mutex pred_mutex_;
bool shutdown_ = false;
rclcpp::Context::SharedPtr context_;
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_;
ClockWaiter::UniquePtr clock_;
public:

Impl(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context) :
context_(context),
clock_(std::make_unique<ClockWaiter>(clock))
{
if (!context_ || !context_->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
}
// Wake this thread if the context is shutdown
shutdown_cb_handle_ = context_->add_on_shutdown_callback(
[this]() {
{
std::unique_lock lock(pred_mutex_);
shutdown_ = true;
}
clock_->notify_one();
});
}

bool wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred)
{
if(lock.mutex() != &pred_mutex_)
{
throw std::runtime_error("ClockConditionalVariable::wait_until: Internal error, given lock does not use mutex returned by this->mutex()");
}

clock_->wait_until(lock, until, [this, &pred] () -> bool {
return shutdown_ || pred(); });
return true;
}

void notify_one()
{
clock_->notify_one();
}

std::mutex &mutex()
{
return pred_mutex_;
};
};

ClockConditionalVariable::ClockConditionalVariable(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context) :
impl_(std::make_unique<Impl>(clock, context))
{
}

ClockConditionalVariable::~ClockConditionalVariable() = default;

void ClockConditionalVariable::notify_one()
{
impl_->notify_one();
}

bool ClockConditionalVariable::wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred)
{
return impl_->wait_until(lock, until, pred);
}

std::mutex &ClockConditionalVariable::mutex()
{
return impl_->mutex();
}

} // namespace rclcpp