From 435487c3f2309053a582a18a113315fe944f7c49 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Thu, 28 Mar 2024 22:21:57 -0500 Subject: [PATCH] Utilize rclcpp::WaitSet as part of the executors (#2142) * Deprecate callback_group call taking context Signed-off-by: Michael Carroll * Add base executor objects that can be used by implementors Signed-off-by: Michael Carroll * Template common operations Signed-off-by: Michael Carroll * Address reviewer feedback: * Add callback to EntitiesCollector constructor * Make function to check automatically added callback groups take a list Signed-off-by: Michael Carroll * Lint Signed-off-by: Michael Carroll * Address reviewer feedback and fix templates Signed-off-by: Michael Carroll * Lint and docs Signed-off-by: Michael Carroll * Make executor own the notify waitable Signed-off-by: Michael Carroll * Add pending queue to collector, remove from waitable Also change node's get_guard_condition to return shared_ptr Signed-off-by: Michael Carroll * Change interrupt guard condition to shared_ptr Check if guard condition is valid before adding it to the waitable Signed-off-by: Michael Carroll * Lint and docs Signed-off-by: Michael Carroll * Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll * Don't exchange atomic twice Signed-off-by: Michael Carroll * Fix add_node and add more tests Signed-off-by: Michael Carroll * Make get_notify_guard_condition follow API tick-tock Signed-off-by: Michael Carroll * Improve callback group tick-tocking Signed-off-by: Michael Carroll * Don't lock twice Signed-off-by: Michael Carroll * Address reviewer feedback Signed-off-by: Michael Carroll * Add thread safety annotations and make locks consistent Signed-off-by: Michael Carroll * @wip Signed-off-by: Michael Carroll * Reset callback groups for multithreaded executor Signed-off-by: Michael Carroll * Avoid many small function calls when building executables Signed-off-by: Michael Carroll * Re-trigger guard condition if buffer has data Signed-off-by: Michael Carroll * Address reviewer feedback Signed-off-by: Michael Carroll * Trace points Signed-off-by: Michael Carroll * Remove tracepoints Signed-off-by: Michael Carroll * Reducing diff Signed-off-by: Michael Carroll * Reduce diff Signed-off-by: Michael Carroll * Uncrustify Signed-off-by: Michael Carroll * Restore tests Signed-off-by: Michael Carroll * Back to weak_ptr and reduce test time Signed-off-by: Michael Carroll * reduce diff and lint Signed-off-by: Michael Carroll * Restore static single threaded tests that weren't working before Signed-off-by: Michael Carroll * Restore more tests Signed-off-by: Michael Carroll * Fix multithreaded test Signed-off-by: Michael Carroll * Fix assert Signed-off-by: Michael Carroll * Fix constructor test Signed-off-by: Michael Carroll * Change ready_executables signature back Signed-off-by: Michael Carroll * Don't enforce removing callback groups before nodes Signed-off-by: Michael Carroll * Remove the "add_valid_node" API Signed-off-by: Michael Carroll * Only notify if the trigger condition is valid Signed-off-by: Michael Carroll * Only trigger if valid and needed Signed-off-by: Michael Carroll * Fix spin_some/spin_all implementation Signed-off-by: Michael Carroll * Restore single threaded executor Signed-off-by: Michael Carroll * Picking ABI-incompatible executor changes Signed-off-by: Michael Carroll * Add PIMPL Signed-off-by: Michael Carroll * Additional waitset prune Signed-off-by: Michael Carroll * Fix bad merge Signed-off-by: Michael Carroll * Expand test timeout Signed-off-by: Michael Carroll * Introduce method to clear expired entities from a collection Signed-off-by: Michael Carroll * Make sure to call remove_expired_entities(). Signed-off-by: Chris Lalancette * Prune queued work when callback group is removed Signed-off-by: Michael Carroll * Prune subscriptions from dynamic storage Signed-off-by: Michael Carroll * Styles fixes. Signed-off-by: Chris Lalancette * Re-trigger guard conditions Signed-off-by: Michael Carroll * Condense to just use watiable.take_data Signed-off-by: Michael Carroll * Lint Signed-off-by: Michael Carroll * Address reviewer comments (nits) Signed-off-by: Michael Carroll * Lock mutex when copying Signed-off-by: Michael Carroll * Refactors to static single threaded based on reviewers Signed-off-by: Michael Carroll * More small refactoring Signed-off-by: Michael Carroll * Lint Signed-off-by: Michael Carroll * Lint Signed-off-by: Michael Carroll * Add ready executable accessors to WaitResult Signed-off-by: Michael Carroll * Make use of accessors from wait_set Signed-off-by: Michael Carroll * Fix tests Signed-off-by: Michael Carroll * Fix more tests Signed-off-by: Michael Carroll * Tidy up single threaded executor implementation Signed-off-by: Michael Carroll * Don't null out timer, rely on call Signed-off-by: Michael Carroll * change how timers are checked from wait result in executors Signed-off-by: William Woodall * peak -> peek Signed-off-by: William Woodall * fix bug in next_waitable logic Signed-off-by: William Woodall * fix bug in StaticSTE that broke the add callback groups to executor tests Signed-off-by: William Woodall * style Signed-off-by: William Woodall --------- Signed-off-by: Michael Carroll Signed-off-by: Michael Carroll Signed-off-by: Chris Lalancette Signed-off-by: William Woodall Co-authored-by: Chris Lalancette Co-authored-by: William Woodall --- rclcpp/CMakeLists.txt | 1 - rclcpp/include/rclcpp/any_executable.hpp | 6 +- rclcpp/include/rclcpp/callback_group.hpp | 27 +- rclcpp/include/rclcpp/executor.hpp | 180 +--- .../executor_entities_collection.hpp | 6 + .../executors/executor_notify_waitable.hpp | 4 +- .../static_executor_entities_collector.hpp | 357 -------- .../static_single_threaded_executor.hpp | 111 +-- .../subscription_intra_process_buffer.hpp | 10 + rclcpp/include/rclcpp/wait_result.hpp | 170 +++- rclcpp/include/rclcpp/wait_result_kind.hpp | 1 + .../detail/storage_policy_common.hpp | 78 +- .../wait_set_policies/dynamic_storage.hpp | 75 +- .../sequential_synchronization.hpp | 2 +- .../wait_set_policies/static_storage.hpp | 55 ++ rclcpp/include/rclcpp/wait_set_template.hpp | 19 + rclcpp/src/rclcpp/any_executable.cpp | 1 + rclcpp/src/rclcpp/callback_group.cpp | 1 + rclcpp/src/rclcpp/executor.cpp | 783 +++++++----------- .../executor_entities_collection.cpp | 29 +- .../executors/executor_entities_collector.cpp | 28 +- .../executors/executor_notify_waitable.cpp | 30 +- .../executors/multi_threaded_executor.cpp | 14 +- .../executors/single_threaded_executor.cpp | 5 + .../static_single_threaded_executor.cpp | 265 ++---- rclcpp/test/benchmark/benchmark_executor.cpp | 39 - rclcpp/test/rclcpp/CMakeLists.txt | 9 - .../test/rclcpp/executors/test_executors.cpp | 33 +- .../test_static_single_threaded_executor.cpp | 11 +- .../test_add_callback_groups_to_executor.cpp | 22 +- rclcpp/test/rclcpp/test_executor.cpp | 141 +--- 31 files changed, 976 insertions(+), 1537 deletions(-) delete mode 100644 rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index bd705be06d..319d6217be 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -64,7 +64,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executors/executor_notify_waitable.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp - src/rclcpp/executors/static_executor_entities_collector.cpp src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/experimental/executors/events_executor/events_executor.cpp diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 5d4064f452..e4e9eaecb0 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -45,9 +45,9 @@ struct AnyExecutable rclcpp::ClientBase::SharedPtr client; rclcpp::Waitable::SharedPtr waitable; // These are used to keep the scope on the containing items - rclcpp::CallbackGroup::SharedPtr callback_group; - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base; - std::shared_ptr data; + rclcpp::CallbackGroup::SharedPtr callback_group {nullptr}; + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr}; + std::shared_ptr data {nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 43c7daa888..dcb38d29f6 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -185,18 +185,41 @@ class CallbackGroup * \return the number of entities in the callback group. */ RCLCPP_PUBLIC - size_t size() const; + size_t + size() const; + /// Return a reference to the 'can be taken' atomic boolean. + /** + * The resulting bool will be true in the case that no executor is currently + * using an executable entity from this group. + * The resulting bool will be false in the case that an executor is currently + * using an executable entity from this group, and the group policy doesn't + * allow a second take (eg mutual exclusion) + * \return a reference to the flag + */ RCLCPP_PUBLIC std::atomic_bool & can_be_taken_from(); + /// Get the group type. + /** + * \return the group type + */ RCLCPP_PUBLIC const CallbackGroupType & type() const; + /// Collect all of the entity pointers contained in this callback group. + /** + * \param[in] sub_func Function to execute for each subscription + * \param[in] service_func Function to execute for each service + * \param[in] client_func Function to execute for each client + * \param[in] timer_func Function to execute for each timer + * \param[in] waitable_fuinc Function to execute for each waitable + */ RCLCPP_PUBLIC - void collect_all_ptrs( + void + collect_all_ptrs( std::function sub_func, std::function service_func, std::function client_func, diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 2d5ca2149a..97f2db2673 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -29,26 +29,24 @@ #include "rcl/guard_condition.h" #include "rcl/wait.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/executor_options.hpp" +#include "rclcpp/executors/executor_entities_collection.hpp" +#include "rclcpp/executors/executor_entities_collector.hpp" #include "rclcpp/future_return_code.hpp" -#include "rclcpp/memory_strategies.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/wait_set.hpp" namespace rclcpp { -typedef std::map> WeakCallbackGroupsToNodesMap; - // Forward declaration is used in convenience method signature. class Node; class ExecutorImplementation; @@ -403,17 +401,6 @@ class Executor void cancel(); - /// Support dynamic switching of the memory strategy. - /** - * Switching the memory strategy while the executor is spinning in another threading could have - * unintended consequences. - * \param[in] memory_strategy Shared pointer to the memory strategy to set. - * \throws std::runtime_error if memory_strategy is null - */ - RCLCPP_PUBLIC - void - set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy); - /// Returns true if the executor is currently spinning. /** * This function can be called asynchronously from any thread. @@ -498,6 +485,11 @@ class Executor static void execute_client(rclcpp::ClientBase::SharedPtr client); + /// Gather all of the waitable entities from associated nodes and callback groups. + RCLCPP_PUBLIC + void + collect_entities(); + /// Block until more work becomes avilable or timeout is reached. /** * Builds a set of waitable entities, which are passed to the middleware. @@ -509,62 +501,6 @@ class Executor void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Find node associated with a callback group - /** - * \param[in] weak_groups_to_nodes map of callback groups to nodes - * \param[in] group callback group to find assocatiated node - * \return Pointer to associated node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr - get_node_by_group( - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group); - - /// Return true if the node has been added to this executor. - /** - * \param[in] node_ptr a shared pointer that points to a node base interface - * \param[in] weak_groups_to_nodes map to nodes to lookup - * \return true if the node is associated with the executor, otherwise false - */ - RCLCPP_PUBLIC - bool - has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; - - /// Find the callback group associated with a timer - /** - * \param[in] timer Timer to find associated callback group - * \return Pointer to callback group node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::CallbackGroup::SharedPtr - get_group_by_timer(rclcpp::TimerBase::SharedPtr timer); - - /// Add a callback group to an executor - /** - * \see rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - virtual void - add_callback_group_to_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - virtual void - remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - /// Check for executable in ready state and populate union structure. /** * \param[out] any_executable populated union structure of ready executable @@ -575,33 +511,6 @@ class Executor bool get_next_ready_executable(AnyExecutable & any_executable); - /// Check for executable in ready state and populate union structure. - /** - * This is the implementation of get_next_ready_executable that takes into - * account the current state of callback groups' association with nodes and - * executors. - * - * This checks in a particular order for available work: - * * Timers - * * Subscriptions - * * Services - * * Clients - * * Waitable - * - * If the next executable is not associated with this executor/node pair, - * then this method will return false. - * - * \param[out] any_executable populated union structure of ready executable - * \param[in] weak_groups_to_nodes mapping of callback groups to nodes - * \return true if an executable was ready and any_executable was populated, - * otherwise false - */ - RCLCPP_PUBLIC - bool - get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - /// Wait for executable in ready state and populate union structure. /** * If an executable is ready, it will return immediately, otherwise @@ -619,21 +528,6 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Add all callback groups that can be automatically added from associated nodes. - /** - * The executor, before collecting entities, verifies if any callback group from - * nodes associated with the executor, which is not already associated to an executor, - * can be automatically added to this executor. - * This takes care of any callback group that has been added to a node but not explicitly added - * to the executor. - * It is important to note that in order for the callback groups to be automatically added to an - * executor through this function, the node of the callback groups needs to have been added - * through the `add_node` method. - */ - RCLCPP_PUBLIC - virtual void - add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_); - /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; @@ -643,16 +537,8 @@ class Executor /// Guard condition for signaling the rmw layer to wake up for system shutdown. std::shared_ptr shutdown_guard_condition_; - /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); - - // Mutex to protect the subsequent memory_strategy_. mutable std::mutex mutex_; - /// The memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr - memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_); - /// The context associated with this executor. std::shared_ptr context_; @@ -662,39 +548,31 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); - typedef std::map> - WeakNodesToGuardConditionsMap; - - typedef std::map> - WeakCallbackGroupsToGuardConditionsMap; - - /// maps nodes to guard conditions - WeakNodesToGuardConditionsMap - weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Waitable containing guard conditions controlling the executor flow. + /** + * This waitable contains the interrupt and shutdown guard condition, as well + * as the guard condition associated with each node and callback group. + * By default, if any change is detected in the monitored entities, the notify + * waitable will awake the executor and rebuild the collections. + */ + std::shared_ptr notify_waitable_; - /// maps callback groups to guard conditions - WeakCallbackGroupsToGuardConditionsMap - weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::atomic_bool entities_need_rebuild_; - /// maps callback groups associated to nodes - WeakCallbackGroupsToNodesMap - weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Collector used to associate executable entities from nodes and guard conditions + rclcpp::executors::ExecutorEntitiesCollector collector_; - /// maps callback groups to nodes associated with executor - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// WaitSet to be waited on. + rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::optional> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - /// maps all callback groups to nodes - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Hold the current state of the collection being waited on by the waitset + rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY( + mutex_); - /// nodes that are associated with the executor - std::list - weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Hold the current state of the notify waitable being waited on by the waitset + std::shared_ptr current_notify_waitable_ + RCPPUTILS_TSA_GUARDED_BY(mutex_); /// shutdown callback handle registered to Context rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_; diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp index 166bb99119..517894a2a2 100644 --- a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp +++ b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp @@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection /// Clear the entities collection void clear(); + + /// Remove entities that have expired weak ownership + /** + * \return The total number of removed entities + */ + size_t remove_expired_entities(); }; /// Build an entities collection from callback groups diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp index eee8e59793..2b43fecca1 100644 --- a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable ~ExecutorNotifyWaitable() override = default; RCLCPP_PUBLIC - ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other); + ExecutorNotifyWaitable(ExecutorNotifyWaitable & other); RCLCPP_PUBLIC - ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other); + ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other); /// Add conditions to the wait set /** diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp deleted file mode 100644 index f9fd2ff672..0000000000 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ -#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ - -#include -#include -#include -#include -#include - -#include "rcl/guard_condition.h" -#include "rcl/wait.h" - -#include "rclcpp/experimental/executable_list.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/memory_strategy.hpp" -#include "rclcpp/visibility_control.hpp" -#include "rclcpp/waitable.hpp" - -namespace rclcpp -{ -namespace executors -{ -typedef std::map> WeakCallbackGroupsToNodesMap; - -class StaticExecutorEntitiesCollector final - : public rclcpp::Waitable, - public std::enable_shared_from_this -{ -public: - RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) - - // Constructor - RCLCPP_PUBLIC - StaticExecutorEntitiesCollector() = default; - - // Destructor - RCLCPP_PUBLIC - ~StaticExecutorEntitiesCollector(); - - /// Initialize StaticExecutorEntitiesCollector - /** - * \param p_wait_set A reference to the wait set to be used in the executor - * \param memory_strategy Shared pointer to the memory strategy to set. - * \throws std::runtime_error if memory strategy is null - */ - RCLCPP_PUBLIC - void - init( - rcl_wait_set_t * p_wait_set, - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy); - - /// Finalize StaticExecutorEntitiesCollector to clear resources - RCLCPP_PUBLIC - bool - is_init() {return initialized_;} - - RCLCPP_PUBLIC - void - fini(); - - /// Execute the waitable. - RCLCPP_PUBLIC - void - execute(std::shared_ptr & data) override; - - /// Take the data so that it can be consumed with `execute`. - /** - * For `StaticExecutorEntitiesCollector`, this always return `nullptr`. - * \sa rclcpp::Waitable::take_data() - */ - RCLCPP_PUBLIC - std::shared_ptr - take_data() override; - - /// Function to add_handles_to_wait_set and wait for work and - /** - * block until the wait set is ready or until the timeout has been exceeded. - * \throws std::runtime_error if wait set couldn't be cleared or filled. - * \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error() - */ - RCLCPP_PUBLIC - void - refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - - /** - * \throws std::runtime_error if it couldn't add guard condition to wait set - */ - RCLCPP_PUBLIC - void - add_to_wait_set(rcl_wait_set_t * wait_set) override; - - RCLCPP_PUBLIC - size_t - get_number_of_ready_guard_conditions() override; - - /// Add a callback group to an executor. - /** - * \see rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - bool - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - /// Add a callback group to an executor. - /** - * \see rclcpp::Executor::add_callback_group - * \return boolean whether the node from the callback group is new - */ - RCLCPP_PUBLIC - bool - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - bool - remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group_from_map - */ - RCLCPP_PUBLIC - bool - remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /** - * \see rclcpp::Executor::add_node() - * \throw std::runtime_error if node was already added - */ - RCLCPP_PUBLIC - bool - add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - /** - * \see rclcpp::Executor::remove_node() - * \throw std::runtime_error if no guard condition is associated with node. - */ - RCLCPP_PUBLIC - bool - remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - RCLCPP_PUBLIC - std::vector - get_all_callback_groups(); - - /// Get callback groups that belong to executor. - /** - * \see rclcpp::Executor::get_manually_added_callback_groups() - */ - RCLCPP_PUBLIC - std::vector - get_manually_added_callback_groups(); - - /// Get callback groups that belong to executor. - /** - * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() - */ - RCLCPP_PUBLIC - std::vector - get_automatically_added_callback_groups_from_nodes(); - - /// Complete all available queued work without blocking. - /** - * This function checks if after the guard condition was triggered - * (or a spurious wakeup happened) we are really ready to execute - * i.e. re-collect entities - */ - RCLCPP_PUBLIC - bool - is_ready(rcl_wait_set_t * wait_set) override; - - /// Return number of timers - /** - * \return number of timers - */ - RCLCPP_PUBLIC - size_t - get_number_of_timers() {return exec_list_.number_of_timers;} - - /// Return number of subscriptions - /** - * \return number of subscriptions - */ - RCLCPP_PUBLIC - size_t - get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;} - - /// Return number of services - /** - * \return number of services - */ - RCLCPP_PUBLIC - size_t - get_number_of_services() {return exec_list_.number_of_services;} - - /// Return number of clients - /** - * \return number of clients - */ - RCLCPP_PUBLIC - size_t - get_number_of_clients() {return exec_list_.number_of_clients;} - - /// Return number of waitables - /** - * \return number of waitables - */ - RCLCPP_PUBLIC - size_t - get_number_of_waitables() {return exec_list_.number_of_waitables;} - - /** Return a SubscritionBase Sharedptr by index. - * \param[in] i The index of the SubscritionBase - * \return a SubscritionBase shared pointer - * \throws std::out_of_range if the argument is higher than the size of the structrue. - */ - RCLCPP_PUBLIC - rclcpp::SubscriptionBase::SharedPtr - get_subscription(size_t i) {return exec_list_.subscription[i];} - - /** Return a TimerBase Sharedptr by index. - * \param[in] i The index of the TimerBase - * \return a TimerBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::TimerBase::SharedPtr - get_timer(size_t i) {return exec_list_.timer[i];} - - /** Return a ServiceBase Sharedptr by index. - * \param[in] i The index of the ServiceBase - * \return a ServiceBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::ServiceBase::SharedPtr - get_service(size_t i) {return exec_list_.service[i];} - - /** Return a ClientBase Sharedptr by index - * \param[in] i The index of the ClientBase - * \return a ClientBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::ClientBase::SharedPtr - get_client(size_t i) {return exec_list_.client[i];} - - /** Return a Waitable Sharedptr by index - * \param[in] i The index of the Waitable - * \return a Waitable shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::Waitable::SharedPtr - get_waitable(size_t i) {return exec_list_.waitable[i];} - -private: - /// Function to reallocate space for entities in the wait set. - /** - * \throws std::runtime_error if wait set couldn't be cleared or resized. - */ - void - prepare_wait_set(); - - void - fill_executable_list(); - - void - fill_memory_strategy(); - - /// Return true if the node belongs to the collector - /** - * \param[in] node_ptr a node base interface shared pointer - * \param[in] weak_groups_to_nodes map to nodes to lookup - * \return boolean whether a node belongs the collector - */ - bool - has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; - - /// Add all callback groups that can be automatically added by any executor - /// and is not already associated with an executor from nodes - /// that are associated with executor - /** - * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() - */ - void - add_callback_groups_from_nodes_associated_to_executor(); - - void - fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /// Memory strategy: an interface for handling user-defined memory allocation strategies. - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; - - // maps callback groups to nodes. - WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; - // maps callback groups to nodes. - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; - - typedef std::map> - WeakNodesToGuardConditionsMap; - WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; - - /// List of weak nodes registered in the static executor - std::list weak_nodes_; - - // Mutex to protect vector of new nodes. - std::mutex new_nodes_mutex_; - std::vector new_nodes_; - - /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t * p_wait_set_ = nullptr; - - /// Executable list: timers, subscribers, clients, services and waitables - rclcpp::experimental::ExecutableList exec_list_; - - /// Bool to check if the entities collector has been initialized - bool initialized_ = false; -}; - -} // namespace executors -} // namespace rclcpp - -#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 5294605eaf..6f22909caf 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -15,24 +15,13 @@ #ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ +#include #include -#include -#include #include -#include -#include - -#include "rmw/rmw.h" #include "rclcpp/executor.hpp" -#include "rclcpp/executors/static_executor_entities_collector.hpp" -#include "rclcpp/experimental/executable_list.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/memory_strategies.hpp" -#include "rclcpp/node.hpp" -#include "rclcpp/rate.hpp" -#include "rclcpp/utilities.hpp" -#include "rclcpp/visibility_control.hpp" +#include "rclcpp/executors/executor_entities_collection.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" namespace rclcpp { @@ -65,7 +54,7 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor explicit StaticSingleThreadedExecutor( const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); - /// Default destrcutor. + /// Default destructor. RCLCPP_PUBLIC virtual ~StaticSingleThreadedExecutor(); @@ -116,105 +105,31 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin_all(std::chrono::nanoseconds max_duration) override; - /// Add a callback group to an executor. - /** - * \sa rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - void - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Remove callback group from the executor - /** - * \sa rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - void - remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - bool notify = true) override; - - /// Add a node to the executor. - /** - * \sa rclcpp::Executor::add_node - */ - RCLCPP_PUBLIC - void - add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Convenience function which takes Node and forwards NodeBaseInterface. - /** - * \sa rclcpp::StaticSingleThreadedExecutor::add_node - */ - RCLCPP_PUBLIC - void - add_node(std::shared_ptr node_ptr, bool notify = true) override; - - /// Remove a node from the executor. - /** - * \sa rclcpp::Executor::remove_node - */ - RCLCPP_PUBLIC - void - remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Convenience function which takes Node and forwards NodeBaseInterface. - /** - * \sa rclcpp::Executor::remove_node - */ - RCLCPP_PUBLIC - void - remove_node(std::shared_ptr node_ptr, bool notify = true) override; - - RCLCPP_PUBLIC - std::vector - get_all_callback_groups() override; - - /// Get callback groups that belong to executor. - /** - * \sa rclcpp::Executor::get_manually_added_callback_groups() - */ - RCLCPP_PUBLIC - std::vector - get_manually_added_callback_groups() override; - - /// Get callback groups that belong to executor. - /** - * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() - */ - RCLCPP_PUBLIC - std::vector - get_automatically_added_callback_groups_from_nodes() override; - protected: /** * @brief Executes ready executables from wait set. + * @param collection entities to evaluate for ready executables. + * @param wait_result result to check for ready executables. * @param spin_once if true executes only the first ready executable. * @return true if any executable was ready. */ - RCLCPP_PUBLIC bool - execute_ready_executables(bool spin_once = false); + execute_ready_executables( + const rclcpp::executors::ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result, + bool spin_once); - RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); - RCLCPP_PUBLIC void spin_once_impl(std::chrono::nanoseconds timeout) override; + std::optional> + collect_and_wait(std::chrono::nanoseconds timeout); + private: RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) - - StaticExecutorEntitiesCollector::SharedPtr entities_collector_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 30debed83a..c588efb5b6 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -30,6 +30,7 @@ #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" #include "tracetools/tracetools.h" @@ -99,6 +100,15 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff static_cast(this)); } + void + add_to_wait_set(rcl_wait_set_t * wait_set) override + { + if (this->buffer_->has_data()) { + this->trigger_guard_condition(); + } + detail::add_guard_condition_to_rcl_wait_set(*wait_set, this->gc_); + } + bool is_ready(rcl_wait_set_t * wait_set) override { diff --git a/rclcpp/include/rclcpp/wait_result.hpp b/rclcpp/include/rclcpp/wait_result.hpp index e879043d04..429eb1dd25 100644 --- a/rclcpp/include/rclcpp/wait_result.hpp +++ b/rclcpp/include/rclcpp/wait_result.hpp @@ -17,13 +17,21 @@ #include #include +#include +#include #include +#include #include "rcl/wait.h" #include "rclcpp/macros.hpp" #include "rclcpp/wait_result_kind.hpp" +#include "rclcpp/client.hpp" +#include "rclcpp/service.hpp" +#include "rclcpp/subscription_base.hpp" +#include "rclcpp/timer.hpp" + namespace rclcpp { @@ -134,6 +142,151 @@ class WaitResult final } } + /// Get the next ready timer and its index in the wait result, but do not clear it. + /** + * The returned timer is not cleared automatically, as it the case with the + * other next_ready_*()-like functions. + * Instead, this function returns the timer and the index that identifies it + * in the wait result, so that it can be cleared (marked as taken or used) + * in a separate step with clear_timer_with_index(). + * This is necessary in some multi-threaded executor implementations. + * + * If the timer is not cleared using the index, subsequent calls to this + * function will return the same timer. + * + * If there is no ready timer, then nullptr will be returned and the index + * will be invalid and should not be used. + * + * \param[in] start_index index at which to start searching for the next ready + * timer in the wait result. If the start_index is out of bounds for the + * list of timers in the wait result, then {nullptr, start_index} will be + * returned. Defaults to 0. + * \return next ready timer pointer and its index in the wait result, or + * {nullptr, start_index} if none was found. + */ + std::pair, size_t> + peek_next_ready_timer(size_t start_index = 0) + { + check_wait_result_dirty(); + auto ret = std::shared_ptr{nullptr}; + size_t ii = start_index; + if (this->kind() == WaitResultKind::Ready) { + auto & wait_set = this->get_wait_set(); + auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set(); + for (; ii < wait_set.size_of_timers(); ++ii) { + if (rcl_wait_set.timers[ii] != nullptr) { + ret = wait_set.timers(ii); + break; + } + } + } + return {ret, ii}; + } + + /// Clear the timer at the given index. + /** + * Clearing a timer from the wait result prevents it from being returned by + * the peek_next_ready_timer() on subsequent calls. + * + * The index should come from the peek_next_ready_timer() function, and + * should only be used with this function if the timer pointer was valid. + * + * \throws std::out_of_range if the given index is out of range + */ + void + clear_timer_with_index(size_t index) + { + auto & wait_set = this->get_wait_set(); + auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set(); + if (index >= wait_set.size_of_timers()) { + throw std::out_of_range("given timer index is out of range"); + } + rcl_wait_set.timers[index] = nullptr; + } + + /// Get the next ready subscription, clearing it from the wait result. + std::shared_ptr + next_ready_subscription() + { + check_wait_result_dirty(); + auto ret = std::shared_ptr{nullptr}; + if (this->kind() == WaitResultKind::Ready) { + auto & wait_set = this->get_wait_set(); + auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set(); + for (size_t ii = 0; ii < wait_set.size_of_subscriptions(); ++ii) { + if (rcl_wait_set.subscriptions[ii] != nullptr) { + ret = wait_set.subscriptions(ii); + rcl_wait_set.subscriptions[ii] = nullptr; + break; + } + } + } + return ret; + } + + /// Get the next ready service, clearing it from the wait result. + std::shared_ptr + next_ready_service() + { + check_wait_result_dirty(); + auto ret = std::shared_ptr{nullptr}; + if (this->kind() == WaitResultKind::Ready) { + auto & wait_set = this->get_wait_set(); + auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set(); + for (size_t ii = 0; ii < wait_set.size_of_services(); ++ii) { + if (rcl_wait_set.services[ii] != nullptr) { + ret = wait_set.services(ii); + rcl_wait_set.services[ii] = nullptr; + break; + } + } + } + return ret; + } + + /// Get the next ready client, clearing it from the wait result. + std::shared_ptr + next_ready_client() + { + check_wait_result_dirty(); + auto ret = std::shared_ptr{nullptr}; + if (this->kind() == WaitResultKind::Ready) { + auto & wait_set = this->get_wait_set(); + auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set(); + for (size_t ii = 0; ii < wait_set.size_of_clients(); ++ii) { + if (rcl_wait_set.clients[ii] != nullptr) { + ret = wait_set.clients(ii); + rcl_wait_set.clients[ii] = nullptr; + break; + } + } + } + return ret; + } + + /// Get the next ready waitable, clearing it from the wait result. + std::shared_ptr + next_ready_waitable() + { + check_wait_result_dirty(); + auto waitable = std::shared_ptr{nullptr}; + auto data = std::shared_ptr{nullptr}; + + if (this->kind() == WaitResultKind::Ready) { + auto & wait_set = this->get_wait_set(); + auto rcl_wait_set = wait_set.get_rcl_wait_set(); + while (next_waitable_index_ < wait_set.size_of_waitables()) { + auto cur_waitable = wait_set.waitables(next_waitable_index_++); + if (cur_waitable != nullptr && cur_waitable->is_ready(&rcl_wait_set)) { + waitable = cur_waitable; + break; + } + } + } + + return waitable; + } + private: RCLCPP_DISABLE_COPY(WaitResult) @@ -151,12 +304,25 @@ class WaitResult final // Should be enforced by the static factory methods on this class. assert(WaitResultKind::Ready == wait_result_kind); // Secure thread-safety (if provided) and shared ownership (if needed). - wait_set_pointer_->wait_result_acquire(); + this->get_wait_set().wait_result_acquire(); } - const WaitResultKind wait_result_kind_; + /// Check if the wait result is invalid because the wait set was modified. + void + check_wait_result_dirty() + { + // In the case that the wait set was modified while the result was out, + // we must mark the wait result as no longer valid + if (wait_set_pointer_ && this->get_wait_set().wait_result_dirty_) { + this->wait_result_kind_ = WaitResultKind::Invalid; + } + } + + WaitResultKind wait_result_kind_; WaitSetT * wait_set_pointer_ = nullptr; + + size_t next_waitable_index_ = 0; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/wait_result_kind.hpp b/rclcpp/include/rclcpp/wait_result_kind.hpp index 3ce65bf4f3..7980d1d127 100644 --- a/rclcpp/include/rclcpp/wait_result_kind.hpp +++ b/rclcpp/include/rclcpp/wait_result_kind.hpp @@ -26,6 +26,7 @@ enum RCLCPP_PUBLIC WaitResultKind Ready, //get_subscription_handle().get(), + subscription_entry.subscription->get_subscription_handle().get(), nullptr); if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set"); } } @@ -271,8 +269,7 @@ class StoragePolicyCommon [this](const auto & inner_guard_conditions) { for (const auto & guard_condition : inner_guard_conditions) { - auto guard_condition_ptr_pair = get_raw_pointer_from_smart_pointer(guard_condition); - if (nullptr == guard_condition_ptr_pair.second) { + if (!guard_condition) { // In this case it was probably stored as a weak_ptr, but is now locking to nullptr. if (HasStrongOwnership) { // This will not happen in fixed sized storage, as it holds @@ -285,10 +282,10 @@ class StoragePolicyCommon } rcl_ret_t ret = rcl_wait_set_add_guard_condition( &rcl_wait_set_, - &guard_condition_ptr_pair.second->get_rcl_guard_condition(), + &guard_condition->get_rcl_guard_condition(), nullptr); if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set"); } } }; @@ -301,8 +298,7 @@ class StoragePolicyCommon // Add timers. for (const auto & timer : timers) { - auto timer_ptr_pair = get_raw_pointer_from_smart_pointer(timer); - if (nullptr == timer_ptr_pair.second) { + if (!timer) { // In this case it was probably stored as a weak_ptr, but is now locking to nullptr. if (HasStrongOwnership) { // This will not happen in fixed sized storage, as it holds @@ -315,17 +311,16 @@ class StoragePolicyCommon } rcl_ret_t ret = rcl_wait_set_add_timer( &rcl_wait_set_, - timer_ptr_pair.second->get_timer_handle().get(), + timer->get_timer_handle().get(), nullptr); if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set"); } } // Add clients. for (const auto & client : clients) { - auto client_ptr_pair = get_raw_pointer_from_smart_pointer(client); - if (nullptr == client_ptr_pair.second) { + if (!client) { // In this case it was probably stored as a weak_ptr, but is now locking to nullptr. if (HasStrongOwnership) { // This will not happen in fixed sized storage, as it holds @@ -338,7 +333,7 @@ class StoragePolicyCommon } rcl_ret_t ret = rcl_wait_set_add_client( &rcl_wait_set_, - client_ptr_pair.second->get_client_handle().get(), + client->get_client_handle().get(), nullptr); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); @@ -347,8 +342,7 @@ class StoragePolicyCommon // Add services. for (const auto & service : services) { - auto service_ptr_pair = get_raw_pointer_from_smart_pointer(service); - if (nullptr == service_ptr_pair.second) { + if (!service) { // In this case it was probably stored as a weak_ptr, but is now locking to nullptr. if (HasStrongOwnership) { // This will not happen in fixed sized storage, as it holds @@ -361,17 +355,16 @@ class StoragePolicyCommon } rcl_ret_t ret = rcl_wait_set_add_service( &rcl_wait_set_, - service_ptr_pair.second->get_service_handle().get(), + service->get_service_handle().get(), nullptr); if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set"); } } // Add waitables. for (auto & waitable_entry : waitables) { - auto waitable_ptr_pair = get_raw_pointer_from_smart_pointer(waitable_entry.waitable); - if (nullptr == waitable_ptr_pair.second) { + if (!waitable_entry.waitable) { // In this case it was probably stored as a weak_ptr, but is now locking to nullptr. if (HasStrongOwnership) { // This will not happen in fixed sized storage, as it holds @@ -382,8 +375,7 @@ class StoragePolicyCommon needs_pruning_ = true; continue; } - rclcpp::Waitable & waitable = *waitable_ptr_pair.second; - waitable.add_to_wait_set(&rcl_wait_set_); + waitable_entry.waitable->add_to_wait_set(&rcl_wait_set_); } } @@ -405,6 +397,32 @@ class StoragePolicyCommon needs_resize_ = true; } + size_t size_of_subscriptions() const {return 0;} + size_t size_of_timers() const {return 0;} + size_t size_of_clients() const {return 0;} + size_t size_of_services() const {return 0;} + size_t size_of_waitables() const {return 0;} + + template + typename SubscriptionsIterable::value_type + subscriptions(size_t) const {return nullptr;} + + template + typename TimersIterable::value_type + timers(size_t) const {return nullptr;} + + template + typename ClientsIterable::value_type + clients(size_t) const {return nullptr;} + + template + typename ServicesIterable::value_type + services(size_t) const {return nullptr;} + + template + typename WaitablesIterable::value_type + waitables(size_t) const {return nullptr;} + rcl_wait_set_t rcl_wait_set_; rclcpp::Context::SharedPtr context_; diff --git a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp index 4cec85f39a..8f97596218 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp @@ -204,15 +204,19 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo void storage_rebuild_rcl_wait_set(const ArrayOfExtraGuardConditions & extra_guard_conditions) { + this->storage_acquire_ownerships(); + this->storage_rebuild_rcl_wait_set_with_sets( - subscriptions_, - guard_conditions_, + shared_subscriptions_, + shared_guard_conditions_, extra_guard_conditions, - timers_, - clients_, - services_, - waitables_ + shared_timers_, + shared_clients_, + shared_services_, + shared_waitables_ ); + + this->storage_release_ownerships(); } template @@ -382,6 +386,8 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo return weak_ptr.expired(); }; // remove guard conditions which have been deleted + subscriptions_.erase( + std::remove_if(subscriptions_.begin(), subscriptions_.end(), p), subscriptions_.end()); guard_conditions_.erase( std::remove_if(guard_conditions_.begin(), guard_conditions_.end(), p), guard_conditions_.end()); @@ -407,6 +413,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo } }; // Lock all the weak pointers and hold them until released. + lock_all(subscriptions_, shared_subscriptions_); lock_all(guard_conditions_, shared_guard_conditions_); lock_all(timers_, shared_timers_); lock_all(clients_, shared_clients_); @@ -438,6 +445,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo shared_ptr.reset(); } }; + reset_all(shared_subscriptions_); reset_all(shared_guard_conditions_); reset_all(shared_timers_); reset_all(shared_clients_); @@ -445,6 +453,61 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo reset_all(shared_waitables_); } + size_t size_of_subscriptions() const + { + return shared_subscriptions_.size(); + } + + size_t size_of_timers() const + { + return shared_timers_.size(); + } + + size_t size_of_clients() const + { + return shared_clients_.size(); + } + + size_t size_of_services() const + { + return shared_services_.size(); + } + + size_t size_of_waitables() const + { + return shared_waitables_.size(); + } + + std::shared_ptr + subscriptions(size_t ii) const + { + return shared_subscriptions_[ii].subscription; + } + + std::shared_ptr + timers(size_t ii) const + { + return shared_timers_[ii]; + } + + std::shared_ptr + clients(size_t ii) const + { + return shared_clients_[ii]; + } + + std::shared_ptr + services(size_t ii) const + { + return shared_services_[ii]; + } + + std::shared_ptr + waitables(size_t ii) const + { + return shared_waitables_[ii].waitable; + } + size_t ownership_reference_counter_ = 0; SequenceOfWeakSubscriptions subscriptions_; diff --git a/rclcpp/include/rclcpp/wait_set_policies/sequential_synchronization.hpp b/rclcpp/include/rclcpp/wait_set_policies/sequential_synchronization.hpp index be2e569c40..4afc2a1b27 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/sequential_synchronization.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/sequential_synchronization.hpp @@ -290,7 +290,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon return create_wait_result(WaitResultKind::Empty); } else { // Some other error case, throw. - rclcpp::exceptions::throw_from_rcl_error(ret); + rclcpp::exceptions::throw_from_rcl_error(ret, "rcl_wait() failed"); } } while (should_loop()); diff --git a/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp b/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp index 434947c19f..7f5cad74ad 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp @@ -188,6 +188,61 @@ class StaticStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCom // Explicitly do nothing. } + size_t size_of_subscriptions() const + { + return subscriptions_.size(); + } + + size_t size_of_timers() const + { + return timers_.size(); + } + + size_t size_of_clients() const + { + return clients_.size(); + } + + size_t size_of_services() const + { + return services_.size(); + } + + size_t size_of_waitables() const + { + return waitables_.size(); + } + + typename ArrayOfSubscriptions::value_type + subscriptions(size_t ii) const + { + return subscriptions_[ii]; + } + + typename ArrayOfTimers::value_type + timers(size_t ii) const + { + return timers_[ii]; + } + + typename ArrayOfClients::value_type + clients(size_t ii) const + { + return clients_[ii]; + } + + typename ArrayOfServices::value_type + services(size_t ii) const + { + return services_[ii]; + } + + typename ArrayOfWaitables::value_type + waitables(size_t ii) const + { + return waitables_[ii]; + } + const ArrayOfSubscriptions subscriptions_; const ArrayOfGuardConditions guard_conditions_; const ArrayOfTimers timers_; diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index b3f41f8ed5..ce69da6bf2 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -153,6 +153,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli throw std::runtime_error("subscription already associated with a wait set"); } this->storage_add_subscription(std::move(local_subscription)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } if (mask.include_events) { for (auto key_event_pair : inner_subscription->get_event_handlers()) { @@ -164,6 +165,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli throw std::runtime_error("subscription event already associated with a wait set"); } this->storage_add_waitable(std::move(event), std::move(local_subscription)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } if (mask.include_intra_process_waitable) { @@ -180,6 +182,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_add_waitable( std::move(inner_subscription->get_intra_process_waitable()), std::move(local_subscription)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } }); @@ -224,6 +227,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli auto local_subscription = inner_subscription; local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false); this->storage_remove_subscription(std::move(local_subscription)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } if (mask.include_events) { for (auto key_event_pair : inner_subscription->get_event_handlers()) { @@ -231,6 +235,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli auto local_subscription = inner_subscription; local_subscription->exchange_in_use_by_wait_set_state(event.get(), false); this->storage_remove_waitable(std::move(event)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } if (mask.include_intra_process_waitable) { @@ -239,6 +244,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // This is the case when intra process is enabled for the subscription. inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); this->storage_remove_waitable(std::move(local_waitable)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} } } }); @@ -289,6 +295,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the guard condition has already been added. this->storage_add_guard_condition(std::move(inner_guard_condition)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -326,6 +333,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the guard condition is not in the wait set. this->storage_remove_guard_condition(std::move(inner_guard_condition)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -357,6 +365,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the timer has already been added. this->storage_add_timer(std::move(inner_timer)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -384,6 +393,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the timer is not in the wait set. this->storage_remove_timer(std::move(inner_timer)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -415,6 +425,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the client has already been added. this->storage_add_client(std::move(inner_client)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -442,6 +453,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the client is not in the wait set. this->storage_remove_client(std::move(inner_client)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -473,6 +485,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the service has already been added. this->storage_add_service(std::move(inner_service)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -500,6 +513,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the service is not in the wait set. this->storage_remove_service(std::move(inner_service)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -551,6 +565,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the waitable has already been added. this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -578,6 +593,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli // fixed sized storage policies. // It will throw if the waitable is not in the wait set. this->storage_remove_waitable(std::move(inner_waitable)); + if (this->wait_result_holding_) {this->wait_result_dirty_ = true;} }); } @@ -715,6 +731,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli throw std::runtime_error("wait_result_acquire() called while already holding"); } wait_result_holding_ = true; + wait_result_dirty_ = false; // this method comes from the SynchronizationPolicy this->sync_wait_result_acquire(); // this method comes from the StoragePolicy @@ -734,6 +751,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli throw std::runtime_error("wait_result_release() called while not holding"); } wait_result_holding_ = false; + wait_result_dirty_ = false; // this method comes from the StoragePolicy this->storage_release_ownerships(); // this method comes from the SynchronizationPolicy @@ -741,6 +759,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } bool wait_result_holding_ = false; + bool wait_result_dirty_ = false; }; } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp index 769deacb11..2449cbe1f7 100644 --- a/rclcpp/src/rclcpp/any_executable.cpp +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -21,6 +21,7 @@ AnyExecutable::AnyExecutable() timer(nullptr), service(nullptr), client(nullptr), + waitable(nullptr), callback_group(nullptr), node_base(nullptr) {} diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index 77f6c87bd9..4811c7da51 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -66,6 +66,7 @@ CallbackGroup::size() const timer_ptrs_.size() + waitable_ptrs_.size(); } + void CallbackGroup::collect_all_ptrs( std::function sub_func, std::function service_func, diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 84695ef304..b263e9b64d 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include #include #include @@ -22,13 +24,14 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/dynamic_typesupport/dynamic_message.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/guard_condition.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" @@ -38,21 +41,29 @@ using namespace std::chrono_literals; -using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::Executor; +/// Mask to indicate to the waitset to only add the subscription. +/// The events and intraprocess waitable are already added via the callback group. +static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false}; + class rclcpp::ExecutorImplementation {}; Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), shutdown_guard_condition_(std::make_shared(options.context)), - memory_strategy_(options.memory_strategy), + context_(options.context), + notify_waitable_(std::make_shared( + [this]() { + this->entities_need_rebuild_.store(true); + })), + entities_need_rebuild_(true), + collector_(notify_waitable_), + wait_set_({}, {}, {}, {}, {}, {}, options.context), + current_notify_waitable_(notify_waitable_), impl_(std::make_unique()) { - // Store the context for later use. - context_ = options.context; - shutdown_callback_handle_ = context_->add_on_shutdown_callback( [weak_gc = std::weak_ptr{shutdown_guard_condition_}]() { auto strong_gc = weak_gc.lock(); @@ -61,74 +72,46 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond, - // and one for the executor's guard cond (interrupt_guard_condition_) - memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get()); - - // Put the executor's guard condition in - memory_strategy_->add_guard_condition(*interrupt_guard_condition_.get()); - rcl_allocator_t allocator = memory_strategy_->get_allocator(); + notify_waitable_->set_on_ready_callback( + [this](auto, auto) { + this->entities_need_rebuild_.store(true); + }); - rcl_ret_t ret = rcl_wait_set_init( - &wait_set_, - 0, 2, 0, 0, 0, 0, - context_->get_rcl_context().get(), - allocator); - if (RCL_RET_OK != ret) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to create wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor"); - } + notify_waitable_->add_guard_condition(interrupt_guard_condition_); + notify_waitable_->add_guard_condition(shutdown_guard_condition_); } Executor::~Executor() { - // Disassociate all callback groups. - for (auto & pair : weak_groups_to_nodes_) { - auto group = pair.first.lock(); - if (group) { - std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - // Disassociate all nodes. - std::for_each( - weak_nodes_.begin(), weak_nodes_.end(), [] - (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) { - auto shared_node_ptr = weak_node_ptr.lock(); - if (shared_node_ptr) { - std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } + std::lock_guard guard(mutex_); + + notify_waitable_->remove_guard_condition(interrupt_guard_condition_); + notify_waitable_->remove_guard_condition(shutdown_guard_condition_); + current_collection_.timers.update( + {}, {}, + [this](auto timer) {wait_set_.remove_timer(timer);}); + + current_collection_.subscriptions.update( + {}, {}, + [this](auto subscription) { + wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask); }); - weak_nodes_.clear(); - weak_groups_associated_with_executor_to_nodes_.clear(); - weak_groups_to_nodes_associated_with_executor_.clear(); - weak_groups_to_nodes_.clear(); - for (const auto & pair : weak_groups_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_guard_conditions_.clear(); - for (const auto & pair : weak_nodes_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_nodes_to_guard_conditions_.clear(); + current_collection_.clients.update( + {}, {}, + [this](auto client) {wait_set_.remove_client(client);}); - // Finalize the wait set. - if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to destroy wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - } - // Remove and release the sigint guard condition - memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get()); - memory_strategy_->remove_guard_condition(interrupt_guard_condition_.get()); + current_collection_.services.update( + {}, {}, + [this](auto service) {wait_set_.remove_service(service);}); + + current_collection_.guard_conditions.update( + {}, {}, + [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);}); + + current_collection_.waitables.update( + {}, {}, + [this](auto waitable) {wait_set_.remove_waitable(waitable);}); // Remove shutdown callback handle registered to Context if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { @@ -142,95 +125,39 @@ Executor::~Executor() std::vector Executor::get_all_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; + this->collector_.update_collections(); + return this->collector_.get_all_callback_groups(); } std::vector Executor::get_manually_added_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - return groups; + this->collector_.update_collections(); + return this->collector_.get_manually_added_callback_groups(); } std::vector Executor::get_automatically_added_callback_groups_from_nodes() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; + this->collector_.update_collections(); + return this->collector_.get_automatically_added_callback_groups(); } void -Executor::add_callback_groups_from_nodes_associated_to_executor() -{ - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - node->for_each_callback_group( - [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) - { - if ( - shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) - { - this->add_callback_group_to_map( - shared_group_ptr, - node, - weak_groups_to_nodes_associated_with_executor_, - true); - } - }); - } - } -} - -void -Executor::add_callback_group_to_map( +Executor::add_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, bool notify) { - // If the callback_group already has an executor - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Callback group has already been added to an executor."); - } - - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto insert_info = - weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); - bool was_inserted = insert_info.second; - if (!was_inserted) { - throw std::runtime_error("Callback group was already added to executor."); - } - // Also add to the map that contains all callback groups - weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); + (void) node_ptr; + this->collector_.add_callback_group(group_ptr); - if (node_ptr->get_context()->is_valid()) { - auto callback_group_guard_condition = group_ptr->get_notify_guard_condition(); - weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); - // Add the callback_group's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(*callback_group_guard_condition); + if (!spinning.load()) { + std::lock_guard guard(mutex_); + this->collect_entities(); } if (notify) { - // Interrupt waiting to handle new node try { interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { @@ -241,91 +168,23 @@ Executor::add_callback_group_to_map( } } -void -Executor::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify) -{ - std::lock_guard guard{mutex_}; - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); -} - void Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - // If the node already has an executor - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error( - std::string("Node '") + node_ptr->get_fully_qualified_name() + - "' has already been added to an executor."); - } - std::lock_guard guard{mutex_}; - node_ptr->for_each_callback_group( - [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - if (!group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) - { - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); - } - }); - - const auto gc = node_ptr->get_shared_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = gc.get(); - // Add the node's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(*gc); - weak_nodes_.push_back(node_ptr); -} + this->collector_.add_node(node_ptr); -void -Executor::remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify) -{ - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter != weak_groups_to_nodes.end()) { - node_ptr = iter->second.lock(); - if (node_ptr == nullptr) { - throw std::runtime_error("Node must not be deleted before its callback group(s)."); - } - weak_groups_to_nodes.erase(iter); - weak_groups_to_nodes_.erase(group_ptr); - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } else { - throw std::runtime_error("Callback group needs to be associated with executor."); + if (!spinning.load()) { + std::lock_guard guard(mutex_); + this->collect_entities(); } - // If the node was matched and removed, interrupt waiting. - if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) && - !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_)) - { - auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr); - if (iter != weak_groups_to_guard_conditions_.end()) { - memory_strategy_->remove_guard_condition(iter->second); - } - weak_groups_to_guard_conditions_.erase(weak_group_ptr); - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } + + if (notify) { + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on node add: ") + ex.what()); } } } @@ -335,11 +194,21 @@ Executor::remove_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) { - std::lock_guard guard{mutex_}; - this->remove_callback_group_from_map( - group_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); + this->collector_.remove_callback_group(group_ptr); + + if (!spinning.load()) { + std::lock_guard guard(mutex_); + this->collect_entities(); + } + if (notify) { + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); + } + } } void @@ -351,48 +220,22 @@ Executor::add_node(std::shared_ptr node_ptr, bool notify) void Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - if (!node_ptr->get_associated_with_executor_atomic().load()) { - throw std::runtime_error("Node needs to be associated with an executor."); - } - - std::lock_guard guard{mutex_}; - bool found_node = false; - auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { - bool matched = (node_it->lock() == node_ptr); - if (matched) { - found_node = true; - node_it = weak_nodes_.erase(node_it); - } else { - ++node_it; - } - } - if (!found_node) { - throw std::runtime_error("Node needs to be associated with this executor."); + this->collector_.remove_node(node_ptr); + + if (!spinning.load()) { + std::lock_guard guard(mutex_); + this->collect_entities(); } - for (auto it = weak_groups_to_nodes_associated_with_executor_.begin(); - it != weak_groups_to_nodes_associated_with_executor_.end(); ) - { - auto weak_node_ptr = it->second; - auto shared_node_ptr = weak_node_ptr.lock(); - auto group_ptr = it->first.lock(); - - // Increment iterator before removing in case it's invalidated - it++; - if (shared_node_ptr == node_ptr) { - remove_callback_group_from_map( - group_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); + if (notify) { + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on node remove: ") + ex.what()); } } - - memory_strategy_->remove_guard_condition(node_ptr->get_shared_notify_guard_condition().get()); - weak_nodes_to_guard_conditions_.erase(node_ptr); - - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); } void @@ -459,20 +302,25 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) throw std::runtime_error("spin_some() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - bool work_available = false; + while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { - AnyExecutable any_exec; - if (!work_available) { - wait_for_work(std::chrono::milliseconds::zero()); + if (!wait_result_.has_value()) { + wait_for_work(std::chrono::milliseconds(0)); } + + AnyExecutable any_exec; if (get_next_ready_executable(any_exec)) { execute_any_executable(any_exec); - work_available = true; } else { - if (!work_available || !exhaustive) { - break; - } - work_available = false; + // If nothing is ready, reset the result to signal we are + // ready to wait again + wait_result_.reset(); + } + + if (!wait_result_.has_value() && !exhaustive) { + // In the case of spin some, then we can exit + // In the case of spin all, then we will allow ourselves to wait again. + break; } } } @@ -508,22 +356,13 @@ Executor::cancel() } } -void -Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) -{ - if (memory_strategy == nullptr) { - throw std::runtime_error("Received NULL memory strategy in executor."); - } - std::lock_guard guard{mutex_}; - memory_strategy_ = memory_strategy; -} - void Executor::execute_any_executable(AnyExecutable & any_exec) { if (!spinning.load()) { return; } + if (any_exec.timer) { TRACEPOINT( rclcpp_executor_execute, @@ -545,16 +384,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec) if (any_exec.waitable) { any_exec.waitable->execute(any_exec.data); } + // Reset the callback_group, regardless of type - any_exec.callback_group->can_be_taken_from().store(true); - // Wake the wait, because it may need to be recalculated or work that - // was previously blocked is now available. - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + if (any_exec.callback_group) { + any_exec.callback_group->can_be_taken_from().store(true); } } @@ -694,7 +527,6 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) throw std::runtime_error("Delivered message kind is not supported"); } } - return; } void @@ -716,8 +548,7 @@ Executor::execute_service(rclcpp::ServiceBase::SharedPtr service) } void -Executor::execute_client( - rclcpp::ClientBase::SharedPtr client) +Executor::execute_client(rclcpp::ClientBase::SharedPtr client) { auto request_header = client->create_request_header(); std::shared_ptr response = client->create_response(); @@ -729,227 +560,213 @@ Executor::execute_client( } void -Executor::wait_for_work(std::chrono::nanoseconds timeout) -{ - TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); - { - std::lock_guard guard(mutex_); - - // Check weak_nodes_ to find any callback group that is not owned - // by an executor and add it to the list of callbackgroups for - // collect entities. Also exchange to false so it is not - // allowed to add to another executor - add_callback_groups_from_nodes_associated_to_executor(); - - // Collect the subscriptions and timers to be waited on - memory_strategy_->clear_handles(); - bool has_invalid_weak_groups_or_nodes = - memory_strategy_->collect_entities(weak_groups_to_nodes_); - - if (has_invalid_weak_groups_or_nodes) { - std::vector invalid_group_ptrs; - for (auto pair : weak_groups_to_nodes_) { - auto weak_group_ptr = pair.first; - auto weak_node_ptr = pair.second; - if (weak_group_ptr.expired() || weak_node_ptr.expired()) { - invalid_group_ptrs.push_back(weak_group_ptr); - auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr); - if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) { - auto guard_condition = node_guard_pair->second; - weak_nodes_to_guard_conditions_.erase(weak_node_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - } - } - std::for_each( - invalid_group_ptrs.begin(), invalid_group_ptrs.end(), - [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { - if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) != - weak_groups_to_nodes_associated_with_executor_.end()) - { - weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); - } - if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) != - weak_groups_associated_with_executor_to_nodes_.end()) - { - weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); - } - auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr); - if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) { - auto guard_condition = callback_guard_pair->second; - weak_groups_to_guard_conditions_.erase(group_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_nodes_.erase(group_ptr); - }); - } +Executor::collect_entities() +{ + // Updating the entity collection and waitset expires any active result + this->wait_result_.reset(); + + // Get the current list of available waitables from the collector. + rclcpp::executors::ExecutorEntitiesCollection collection; + this->collector_.update_collections(); + auto callback_groups = this->collector_.get_all_callback_groups(); + rclcpp::executors::build_entities_collection(callback_groups, collection); + + // Make a copy of notify waitable so we can continue to mutate the original + // one outside of the execute loop. + // This prevents the collection of guard conditions in the waitable from changing + // while we are waiting on it. + if (notify_waitable_) { + current_notify_waitable_ = std::make_shared( + *notify_waitable_); + auto notify_waitable = std::static_pointer_cast(current_notify_waitable_); + collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}}); + } + + // We must remove expired entities here, so that we don't continue to use older entities. + // See https://github.com/ros2/rclcpp/issues/2180 for more information. + current_collection_.remove_expired_entities(); + + // Update each of the groups of entities in the current collection, adding or removing + // from the wait set as necessary. + current_collection_.timers.update( + collection.timers, + [this](auto timer) {wait_set_.add_timer(timer);}, + [this](auto timer) {wait_set_.remove_timer(timer);}); + + current_collection_.subscriptions.update( + collection.subscriptions, + [this](auto subscription) { + wait_set_.add_subscription(subscription, kDefaultSubscriptionMask); + }, + [this](auto subscription) { + wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask); + }); - // clear wait set - rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); - if (ret != RCL_RET_OK) { - throw_from_rcl_error(ret, "Couldn't clear wait set"); - } + current_collection_.clients.update( + collection.clients, + [this](auto client) {wait_set_.add_client(client);}, + [this](auto client) {wait_set_.remove_client(client);}); - // The size of waitables are accounted for in size of the other entities - ret = rcl_wait_set_resize( - &wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - if (RCL_RET_OK != ret) { - throw_from_rcl_error(ret, "Couldn't resize the wait set"); - } + current_collection_.services.update( + collection.services, + [this](auto service) {wait_set_.add_service(service);}, + [this](auto service) {wait_set_.remove_service(service);}); - if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); - } - } + current_collection_.guard_conditions.update( + collection.guard_conditions, + [this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);}, + [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);}); - rcl_ret_t status = - rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); - if (status == RCL_RET_WAIT_SET_EMPTY) { - RCUTILS_LOG_WARN_NAMED( - "rclcpp", - "empty wait set received in rcl_wait(). This should never happen."); - } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(status, "rcl_wait() failed"); - } + current_collection_.waitables.update( + collection.waitables, + [this](auto waitable) {wait_set_.add_waitable(waitable);}, + [this](auto waitable) {wait_set_.remove_waitable(waitable);}); - // check the null handles in the wait set and remove them from the handles in memory strategy - // for callback-based entities - std::lock_guard guard(mutex_); - memory_strategy_->remove_null_handles(&wait_set_); + // In the case that an entity already has an expired weak pointer + // before being removed from the waitset, additionally prune the waitset. + this->wait_set_.prune_deleted_entities(); + this->entities_need_rebuild_.store(false); } -rclcpp::node_interfaces::NodeBaseInterface::SharedPtr -Executor::get_node_by_group( - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group) +void +Executor::wait_for_work(std::chrono::nanoseconds timeout) { - if (!group) { - return nullptr; - } - rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group); - const auto finder = weak_groups_to_nodes.find(weak_group_ptr); - if (finder != weak_groups_to_nodes.end()) { - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock(); - return node_ptr; - } - return nullptr; -} + TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); -rclcpp::CallbackGroup::SharedPtr -Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) -{ - std::lock_guard guard{mutex_}; - for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } - } + // Clear any previous wait result + this->wait_result_.reset(); - for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; + { + std::lock_guard guard(mutex_); + if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + this->collect_entities(); } } - return nullptr; + this->wait_result_.emplace(wait_set_.wait(timeout)); + if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); + } } bool Executor::get_next_ready_executable(AnyExecutable & any_executable) -{ - bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_); - return success; -} - -bool -Executor::get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) { TRACEPOINT(rclcpp_executor_get_next_ready); - bool success = false; - std::lock_guard guard{mutex_}; - // Check the timers to see if there are any that are ready - memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); - if (any_executable.timer) { - success = true; + + bool valid_executable = false; + + if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) { + return false; } - if (!success) { - // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes); - if (any_executable.subscription) { - success = true; + + if (!valid_executable) { + size_t current_timer_index = 0; + while (true) { + auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index); + if (nullptr == timer) { + break; + } + current_timer_index = timer_index; + auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get()); + if (entity_iter != current_collection_.timers.end()) { + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from()) { + continue; + } + // At this point the timer is either ready for execution or was perhaps + // it was canceled, based on the result of call(), but either way it + // should not be checked again from peek_next_ready_timer(), so clear + // it from the wait result. + wait_result_->clear_timer_with_index(current_timer_index); + // Check that the timer should be called still, i.e. it wasn't canceled. + if (!timer->call()) { + continue; + } + any_executable.timer = timer; + any_executable.callback_group = callback_group; + valid_executable = true; + break; + } } } - if (!success) { - // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes); - if (any_executable.service) { - success = true; + + if (!valid_executable) { + while (auto subscription = wait_result_->next_ready_subscription()) { + auto entity_iter = current_collection_.subscriptions.find( + subscription->get_subscription_handle().get()); + if (entity_iter != current_collection_.subscriptions.end()) { + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from()) { + continue; + } + any_executable.subscription = subscription; + any_executable.callback_group = callback_group; + valid_executable = true; + break; + } } } - if (!success) { - // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes); - if (any_executable.client) { - success = true; + + if (!valid_executable) { + while (auto service = wait_result_->next_ready_service()) { + auto entity_iter = current_collection_.services.find(service->get_service_handle().get()); + if (entity_iter != current_collection_.services.end()) { + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from()) { + continue; + } + any_executable.service = service; + any_executable.callback_group = callback_group; + valid_executable = true; + break; + } } } - if (!success) { - // Check the waitables to see if there are any that are ready - memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); - if (any_executable.waitable) { - any_executable.data = any_executable.waitable->take_data(); - success = true; + + if (!valid_executable) { + while (auto client = wait_result_->next_ready_client()) { + auto entity_iter = current_collection_.clients.find(client->get_client_handle().get()); + if (entity_iter != current_collection_.clients.end()) { + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from()) { + continue; + } + any_executable.client = client; + any_executable.callback_group = callback_group; + valid_executable = true; + break; + } } } - // At this point any_executable should be valid with either a valid subscription - // or a valid timer, or it should be a null shared_ptr - if (success) { - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter == weak_groups_to_nodes.end()) { - success = false; + + if (!valid_executable) { + while (auto waitable = wait_result_->next_ready_waitable()) { + auto entity_iter = current_collection_.waitables.find(waitable.get()); + if (entity_iter != current_collection_.waitables.end()) { + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from()) { + continue; + } + any_executable.waitable = waitable; + any_executable.callback_group = callback_group; + any_executable.data = waitable->take_data(); + valid_executable = true; + break; + } } } - if (success) { - // If it is valid, check to see if the group is mutually exclusive or - // not, then mark it accordingly ..Check if the callback_group belongs to this executor - if (any_executable.callback_group && any_executable.callback_group->type() == \ - CallbackGroupType::MutuallyExclusive) - { - // It should not have been taken otherwise + if (any_executable.callback_group) { + if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) { assert(any_executable.callback_group->can_be_taken_from().load()); - // Set to false to indicate something is being run from this group - // This is reset to true either when the any_executable is executed or when the - // any_executable is destructued any_executable.callback_group->can_be_taken_from().store(false); } } - // If there is no ready executable, return false - return success; + + + return valid_executable; } bool @@ -972,22 +789,6 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos return success; } -// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. -bool -Executor::has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) const -{ - return std::find_if( - weak_groups_to_nodes.begin(), - weak_groups_to_nodes.end(), - [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { - auto other_ptr = other.second.lock(); - return other_ptr == node_ptr; - }) != weak_groups_to_nodes.end(); -} - bool Executor::is_spinning() { diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 88a878824a..b6e030d340 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -39,6 +39,30 @@ void ExecutorEntitiesCollection::clear() waitables.clear(); } +size_t ExecutorEntitiesCollection::remove_expired_entities() +{ + auto remove_entities = [](auto & collection) -> size_t { + size_t removed = 0; + for (auto it = collection.begin(); it != collection.end(); ) { + if (it->second.entity.expired()) { + ++removed; + it = collection.erase(it); + } else { + ++it; + } + } + return removed; + }; + + return + remove_entities(subscriptions) + + remove_entities(timers) + + remove_entities(guard_conditions) + + remove_entities(clients) + + remove_entities(services) + + remove_entities(waitables); +} + void build_entities_collection( const std::vector & callback_groups, @@ -203,7 +227,7 @@ ready_executables( } } - for (auto & [handle, entry] : collection.waitables) { + for (const auto & [handle, entry] : collection.waitables) { auto waitable = entry.entity.lock(); if (!waitable) { continue; @@ -218,13 +242,10 @@ ready_executables( rclcpp::AnyExecutable exec; exec.waitable = waitable; exec.callback_group = group_info; - exec.data = waitable->take_data(); executables.push_back(exec); added++; } - return added; } - } // namespace executors } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp index 84ada64925..702716a758 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp @@ -61,6 +61,12 @@ ExecutorEntitiesCollector::~ExecutorEntitiesCollector() if (group_ptr) { group_ptr->get_associated_with_executor_atomic().store(false); } + // Disassociate the guard condition from the executor notify waitable + auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr); + if (guard_condition_it != weak_groups_to_guard_conditions_.end()) { + this->notify_waitable_->remove_guard_condition(guard_condition_it->second); + weak_groups_to_guard_conditions_.erase(guard_condition_it); + } } pending_manually_added_groups_.clear(); pending_manually_removed_groups_.clear(); @@ -105,7 +111,8 @@ void ExecutorEntitiesCollector::remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) { - if (!node_ptr->get_associated_with_executor_atomic().load()) { + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (!has_executor.exchange(false)) { throw std::runtime_error( std::string("Node '") + node_ptr->get_fully_qualified_name() + "' needs to be associated with an executor."); @@ -143,6 +150,11 @@ ExecutorEntitiesCollector::add_callback_group(rclcpp::CallbackGroup::SharedPtr g } this->pending_manually_added_groups_.insert(group_ptr); + + // Store callback group notify guard condition in map and add it to the notify waitable + auto group_guard_condition = group_ptr->get_notify_guard_condition(); + weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition}); + this->notify_waitable_->add_guard_condition(group_guard_condition); } void @@ -161,7 +173,6 @@ ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPt throw std::runtime_error("Node must not be deleted before its callback group(s)."); } */ - auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr); std::lock_guard lock(mutex_); bool associated = manually_added_groups_.count(group_ptr) != 0; @@ -314,7 +325,11 @@ ExecutorEntitiesCollector::process_queues() if (node_it != weak_nodes_.end()) { remove_weak_node(node_it); } else { - throw std::runtime_error("Node needs to be associated with this executor."); + // The node may have been destroyed and removed from the colletion before + // we processed the queues. Don't throw if the pointer is already expired. + if (!weak_node_ptr.expired()) { + throw std::runtime_error("Node needs to be associated with this executor."); + } } auto node_ptr = weak_node_ptr.lock(); @@ -337,6 +352,13 @@ ExecutorEntitiesCollector::process_queues() auto group_ptr = weak_group_ptr.lock(); if (group_ptr) { this->add_callback_group_to_collection(group_ptr, manually_added_groups_); + } else { + // Disassociate the guard condition from the executor notify waitable + auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr); + if (guard_condition_it != weak_groups_to_guard_conditions_.end()) { + this->notify_waitable_->remove_guard_condition(guard_condition_it->second); + weak_groups_to_guard_conditions_.erase(guard_condition_it); + } } } pending_manually_added_groups_.clear(); diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 15a31cd60d..85bedcead1 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -27,15 +27,17 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function on_exec { } -ExecutorNotifyWaitable::ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other) -: ExecutorNotifyWaitable(other.execute_callback_) +ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other) { + std::lock_guard lock(other.guard_condition_mutex_); + this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; } -ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(const ExecutorNotifyWaitable & other) +ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other) { if (this != &other) { + std::lock_guard lock(other.guard_condition_mutex_); this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; } @@ -46,20 +48,17 @@ void ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) { std::lock_guard lock(guard_condition_mutex_); - for (auto weak_guard_condition : this->notify_guard_conditions_) { auto guard_condition = weak_guard_condition.lock(); - if (guard_condition) { - auto rcl_guard_condition = &guard_condition->get_rcl_guard_condition(); + if (!guard_condition) {continue;} - rcl_ret_t ret = rcl_wait_set_add_guard_condition( - wait_set, - rcl_guard_condition, NULL); + rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition(); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to add guard condition to wait set"); - } + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, cond, NULL); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); } } } @@ -71,15 +70,16 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) bool any_ready = false; for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) { - auto rcl_guard_condition = wait_set->guard_conditions[ii]; + const auto * rcl_guard_condition = wait_set->guard_conditions[ii]; if (nullptr == rcl_guard_condition) { continue; } - for (auto weak_guard_condition : this->notify_guard_conditions_) { + for (const auto & weak_guard_condition : this->notify_guard_conditions_) { auto guard_condition = weak_guard_condition.lock(); if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { any_ready = true; + break; } } } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..2d5c76e817 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -55,7 +55,7 @@ MultiThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); std::vector threads; size_t thread_id = 0; { @@ -99,6 +99,18 @@ MultiThreadedExecutor::run(size_t this_thread_number) execute_any_executable(any_exec); + if (any_exec.callback_group && + any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group change: ") + ex.what()); + } + } + // Clear the callback_group to prevent the AnyExecutable destructor from // resetting the callback group `can_be_taken_from` any_exec.callback_group.reset(); diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index e7f311c147..975733b497 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -31,6 +31,11 @@ SingleThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + // Clear any previous result and rebuild the waitset + this->wait_result_.reset(); + this->entities_need_rebuild_ = true; + while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_executable; if (get_next_executable(any_executable)) { diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 3c14b37b45..3348f422f9 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -12,31 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rclcpp/executors/static_single_threaded_executor.hpp" - -#include -#include -#include -#include - +#include "rclcpp/executors/executor_entities_collection.hpp" #include "rcpputils/scope_exit.hpp" +#include "rclcpp/executors/static_single_threaded_executor.hpp" +#include "rclcpp/any_executable.hpp" + using rclcpp::executors::StaticSingleThreadedExecutor; -using rclcpp::experimental::ExecutableList; -StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( - const rclcpp::ExecutorOptions & options) +StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options) { - entities_collector_ = std::make_shared(); } -StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() -{ - if (entities_collector_->is_init()) { - entities_collector_->fini(); - } -} +StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} void StaticSingleThreadedExecutor::spin() @@ -46,14 +35,11 @@ StaticSingleThreadedExecutor::spin() } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - // Set memory_strategy_ and exec_list_ based on weak_nodes_ - // Prepare wait_set_ based on memory_strategy_ - entities_collector_->init(&wait_set_, memory_strategy_); - + // This is essentially the contents of the rclcpp::Executor::wait_for_work method, + // except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor + // behavior. while (rclcpp::ok(this->context_) && spinning.load()) { - // Refresh wait set and wait for work - entities_collector_->refresh_wait_set(); - execute_ready_executables(); + this->spin_once_impl(std::chrono::nanoseconds(-1)); } } @@ -64,7 +50,6 @@ StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) if (std::chrono::nanoseconds(0) == max_duration) { max_duration = std::chrono::nanoseconds::max(); } - return this->spin_some_impl(max_duration, false); } @@ -80,36 +65,32 @@ StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration) void StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) { - // Make sure the entities collector has been initialized - if (!entities_collector_->is_init()) { - entities_collector_->init(&wait_set_, memory_strategy_); - } - auto start = std::chrono::steady_clock::now(); auto max_duration_not_elapsed = [max_duration, start]() { - if (std::chrono::nanoseconds(0) == max_duration) { - // told to spin forever if need be - return true; - } else if (std::chrono::steady_clock::now() - start < max_duration) { - // told to spin only for some maximum amount of time - return true; - } - // spun too long - return false; + const auto spin_forever = std::chrono::nanoseconds(0) == max_duration; + const auto cur_duration = std::chrono::steady_clock::now() - start; + return spin_forever || (cur_duration < max_duration); }; if (spinning.exchange(true)) { throw std::runtime_error("spin_some() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { // Get executables that are ready now - entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero()); - // Execute ready executables - bool work_available = execute_ready_executables(); - if (!work_available || !exhaustive) { - break; + std::lock_guard guard(mutex_); + + auto wait_result = this->collect_and_wait(std::chrono::nanoseconds(0)); + if (wait_result.has_value()) { + // Execute ready executables + bool work_available = this->execute_ready_executables( + current_collection_, + wait_result.value(), + false); + if (!work_available || !exhaustive) { + break; + } } } } @@ -117,163 +98,95 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati void StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) { - // Make sure the entities collector has been initialized - if (!entities_collector_->is_init()) { - entities_collector_->init(&wait_set_, memory_strategy_); - } - if (rclcpp::ok(context_) && spinning.load()) { - // Wait until we have a ready entity or timeout expired - entities_collector_->refresh_wait_set(timeout); - // Execute ready executables - execute_ready_executables(true); + std::lock_guard guard(mutex_); + auto wait_result = this->collect_and_wait(timeout); + if (wait_result.has_value()) { + this->execute_ready_executables(current_collection_, wait_result.value(), true); + } } } -void -StaticSingleThreadedExecutor::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify) +std::optional> +StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) { - bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr); - if (is_new_node && notify) { - // Interrupt waiting to handle new node - interrupt_guard_condition_->trigger(); + if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + this->collect_entities(); } -} - -void -StaticSingleThreadedExecutor::add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) -{ - bool is_new_node = entities_collector_->add_node(node_ptr); - if (is_new_node && notify) { - // Interrupt waiting to handle new node - interrupt_guard_condition_->trigger(); + auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout)); + if (wait_result.kind() == WaitResultKind::Empty) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); + return {}; } + return wait_result; } -void -StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, bool notify) +// This preserves the "scheduling semantics" of the StaticSingleThreadedExecutor +// from the original implementation. +bool StaticSingleThreadedExecutor::execute_ready_executables( + const rclcpp::executors::ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result, + bool spin_once) { - this->add_node(node_ptr->get_node_base_interface(), notify); -} - -void -StaticSingleThreadedExecutor::remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) -{ - bool node_removed = entities_collector_->remove_callback_group(group_ptr); - // If the node was matched and removed, interrupt waiting - if (node_removed && notify) { - interrupt_guard_condition_->trigger(); + bool any_ready_executable = false; + if (wait_result.kind() != rclcpp::WaitResultKind::Ready) { + return any_ready_executable; } -} -void -StaticSingleThreadedExecutor::remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) -{ - bool node_removed = entities_collector_->remove_node(node_ptr); - if (!node_removed) { - throw std::runtime_error("Node needs to be associated with this executor."); - } - // If the node was matched and removed, interrupt waiting - if (notify) { - interrupt_guard_condition_->trigger(); + while (auto subscription = wait_result.next_ready_subscription()) { + auto entity_iter = collection.subscriptions.find(subscription->get_subscription_handle().get()); + if (entity_iter != collection.subscriptions.end()) { + execute_subscription(subscription); + any_ready_executable = true; + if (spin_once) {return any_ready_executable;} + } } -} - -std::vector -StaticSingleThreadedExecutor::get_all_callback_groups() -{ - return entities_collector_->get_all_callback_groups(); -} - -std::vector -StaticSingleThreadedExecutor::get_manually_added_callback_groups() -{ - return entities_collector_->get_manually_added_callback_groups(); -} -std::vector -StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes() -{ - return entities_collector_->get_automatically_added_callback_groups_from_nodes(); -} - -void -StaticSingleThreadedExecutor::remove_node(std::shared_ptr node_ptr, bool notify) -{ - this->remove_node(node_ptr->get_node_base_interface(), notify); -} - -bool -StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once) -{ - bool any_ready_executable = false; - - // Execute all the ready subscriptions - for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { - if (i < entities_collector_->get_number_of_subscriptions()) { - if (wait_set_.subscriptions[i]) { - execute_subscription(entities_collector_->get_subscription(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } + size_t current_timer_index = 0; + while (true) { + auto [timer, timer_index] = wait_result.peek_next_ready_timer(current_timer_index); + if (nullptr == timer) { + break; } - } - // Execute all the ready timers - for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { - if (i < entities_collector_->get_number_of_timers()) { - if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) { - auto timer = entities_collector_->get_timer(i); - timer->call(); - execute_timer(std::move(timer)); - if (spin_once) { - return true; - } + current_timer_index = timer_index; + auto entity_iter = collection.timers.find(timer->get_timer_handle().get()); + if (entity_iter != collection.timers.end()) { + wait_result.clear_timer_with_index(current_timer_index); + if (timer->call()) { + execute_timer(timer); any_ready_executable = true; + if (spin_once) {return any_ready_executable;} } } } - // Execute all the ready services - for (size_t i = 0; i < wait_set_.size_of_services; ++i) { - if (i < entities_collector_->get_number_of_services()) { - if (wait_set_.services[i]) { - execute_service(entities_collector_->get_service(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } + + while (auto client = wait_result.next_ready_client()) { + auto entity_iter = collection.clients.find(client->get_client_handle().get()); + if (entity_iter != collection.clients.end()) { + execute_client(client); + any_ready_executable = true; + if (spin_once) {return any_ready_executable;} } } - // Execute all the ready clients - for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { - if (i < entities_collector_->get_number_of_clients()) { - if (wait_set_.clients[i]) { - execute_client(entities_collector_->get_client(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } + + while (auto service = wait_result.next_ready_service()) { + auto entity_iter = collection.services.find(service->get_service_handle().get()); + if (entity_iter != collection.services.end()) { + execute_service(service); + any_ready_executable = true; + if (spin_once) {return any_ready_executable;} } } - // Execute all the ready waitables - for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) { - auto waitable = entities_collector_->get_waitable(i); - if (waitable->is_ready(&wait_set_)) { + + while (auto waitable = wait_result.next_ready_waitable()) { + auto entity_iter = collection.waitables.find(waitable.get()); + if (entity_iter != collection.waitables.end()) { auto data = waitable->take_data(); waitable->execute(data); - if (spin_once) { - return true; - } any_ready_executable = true; + if (spin_once) {return any_ready_executable;} } } return any_ready_executable; diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index 652007b589..65bb3a1007 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -362,42 +362,3 @@ BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark } } } - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_executor_entities_collector_execute)(benchmark::State & st) -{ - rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_ = - std::make_shared(); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - rcl_ret_t ret = rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - RCPPUTILS_SCOPE_EXIT( - { - rcl_ret_t ret = rcl_wait_set_fini(&wait_set); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - }); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - std::shared_ptr data = entities_collector_->take_data(); - entities_collector_->execute(data); - } -} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 8c31a95415..47d041cb56 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -699,15 +699,6 @@ if(TARGET test_multi_threaded_executor) target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME}) endif() -ament_add_gtest(test_static_executor_entities_collector executors/test_static_executor_entities_collector.cpp - APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) -if(TARGET test_static_executor_entities_collector) - ament_target_dependencies(test_static_executor_entities_collector - "rcl" - "test_msgs") - target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) -endif() - ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) if(TARGET test_entities_collector) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 68138eb82b..8ce97ecbe1 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -79,8 +79,6 @@ class TestExecutors : public ::testing::Test int callback_count; }; -// spin_all and spin_some are not implemented correctly in StaticSingleThreadedExecutor, see: -// https://github.com/ros2/rclcpp/issues/1219 for tracking template class TestExecutorsStable : public TestExecutors {}; @@ -154,10 +152,7 @@ TYPED_TEST(TestExecutors, detachOnDestruction) } // Make sure that the executor can automatically remove expired nodes correctly -// Currently fails for StaticSingleThreadedExecutor so it is being skipped, see: -// https://github.com/ros2/rclcpp/issues/1231 -TYPED_TEST(TestExecutorsStable, addTemporaryNode) -{ +TYPED_TEST(TestExecutors, addTemporaryNode) { using ExecutorType = TypeParam; // rmw_connextdds doesn't support events-executor if ( @@ -265,16 +260,19 @@ TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) } ExecutorType executor; - executor.add_node(this->node); - bool timer_completed = false; - auto timer = this->node->create_wall_timer(1ms, [&]() {timer_completed = true;}); + std::atomic_bool timer_completed = false; + auto timer = this->node->create_wall_timer( + 1ms, [&]() { + timer_completed.store(true); + }); + executor.add_node(this->node); std::thread spinner([&]() {executor.spin();}); - // Sleep for a short time to verify executor.spin() is going, and didn't throw. + // Sleep for a short time to verify executor.spin() is going, and didn't throw. auto start = std::chrono::steady_clock::now(); - while (!timer_completed && (std::chrono::steady_clock::now() - start) < 10s) { + while (!timer_completed.load() && (std::chrono::steady_clock::now() - start) < 10s) { std::this_thread::sleep_for(1ms); } @@ -461,11 +459,18 @@ class TestWaitable : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override { + if (trigger_count_ > 0) { + // Keep the gc triggered until the trigger count is reduced back to zero. + // This is necessary if trigger() results in the wait set waking, but not + // executing this waitable, in which case it needs to be re-triggered. + gc_.trigger(); + } rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); } void trigger() { + trigger_count_++; gc_.trigger(); } @@ -493,6 +498,7 @@ class TestWaitable : public rclcpp::Waitable execute(std::shared_ptr & data) override { (void) data; + trigger_count_--; count_++; std::this_thread::sleep_for(3ms); } @@ -516,13 +522,14 @@ class TestWaitable : public rclcpp::Waitable get_number_of_ready_guard_conditions() override {return 1;} size_t - get_count() + get_count() const { return count_; } private: - size_t count_ = 0; + std::atomic trigger_count_ = 0; + std::atomic count_ = 0; rclcpp::GuardCondition gc_; }; diff --git a/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp index 5ca6c1c25a..1a0f3f88c4 100644 --- a/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp @@ -56,7 +56,7 @@ TEST_F(TestStaticSingleThreadedExecutor, add_callback_group_trigger_guard_failed "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( executor.add_callback_group(cb_group, node->get_node_base_interface(), true), - std::runtime_error("error not set")); + std::runtime_error("Failed to trigger guard condition on callback group add: error not set")); } } @@ -69,7 +69,7 @@ TEST_F(TestStaticSingleThreadedExecutor, add_node_trigger_guard_failed) { "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( executor.add_node(node), - std::runtime_error("error not set")); + std::runtime_error("Failed to trigger guard condition on node add: error not set")); } } @@ -86,7 +86,8 @@ TEST_F(TestStaticSingleThreadedExecutor, remove_callback_group_trigger_guard_fai "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( executor.remove_callback_group(cb_group, true), - std::runtime_error("error not set")); + std::runtime_error( + "Failed to trigger guard condition on callback group remove: error not set")); } } @@ -99,7 +100,7 @@ TEST_F(TestStaticSingleThreadedExecutor, remove_node_failed) { "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( executor.remove_node(node, true), - std::runtime_error("Node needs to be associated with this executor.")); + std::runtime_error("Node '/ns/node' needs to be associated with an executor.")); } } @@ -114,7 +115,7 @@ TEST_F(TestStaticSingleThreadedExecutor, remove_node_trigger_guard_failed) { "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( executor.remove_node(node, true), - std::runtime_error("error not set")); + std::runtime_error("Failed to trigger guard condition on node remove: error not set")); } } diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 49131638db..2aa4dcdd72 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -228,22 +228,31 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t GTEST_SKIP(); } + auto count_callback_groups_in_node = [](auto node) { + size_t num = 0; + node->get_node_base_interface()->for_each_callback_group( + [&num](auto) { + num++; + }); + return num; + }; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); executor.add_node(node->get_node_base_interface()); - ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); - std::atomic_int timer_count {0}; + ASSERT_EQ(executor.get_all_callback_groups().size(), count_callback_groups_in_node(node)); + std::atomic_size_t timer_count {0}; auto timer_callback = [&executor, &timer_count]() { - if (timer_count > 0) { - ASSERT_EQ(executor.get_all_callback_groups().size(), 3u); + auto cur_timer_count = timer_count++; + printf("in timer_callback(%zu)\n", cur_timer_count); + if (cur_timer_count > 0) { executor.cancel(); } - timer_count++; }; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( - 2s, timer_callback, cb_grp); + 1s, timer_callback, cb_grp); rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); auto timer2_callback = []() {}; @@ -255,6 +264,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t rclcpp::TimerBase::SharedPtr timer3_ = node->create_wall_timer( 2s, timer3_callback, cb_grp3); executor.spin(); + ASSERT_GT(timer_count.load(), 0u); } /* diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index bdbb0a1079..5ee9573c3f 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -46,23 +46,6 @@ class DummyExecutor : public rclcpp::Executor { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100)); } - - rclcpp::memory_strategy::MemoryStrategy * memory_strategy_ptr() - { - return memory_strategy_.get(); - } - - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( - rclcpp::CallbackGroup::SharedPtr group) - { - std::lock_guard guard_{mutex_}; // only to make the TSA happy - return get_node_by_group(weak_groups_to_nodes_, group); - } - - rclcpp::CallbackGroup::SharedPtr local_get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) - { - return get_group_by_timer(timer); - } }; class TestExecutor : public ::testing::Test @@ -130,7 +113,7 @@ TEST_F(TestExecutor, constructor_bad_wait_set_init) { auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_init, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( static_cast(std::make_unique()), - std::runtime_error("Failed to create wait set in Executor constructor: error not set")); + std::runtime_error("Failed to create wait set: error not set")); } TEST_F(TestExecutor, add_callback_group_twice) { @@ -142,7 +125,7 @@ TEST_F(TestExecutor, add_callback_group_twice) { cb_group->get_associated_with_executor_atomic().exchange(false); RCLCPP_EXPECT_THROW_EQ( dummy.add_callback_group(cb_group, node->get_node_base_interface(), false), - std::runtime_error("Callback group was already added to executor.")); + std::runtime_error("Callback group has already been added to this executor.")); } TEST_F(TestExecutor, add_callback_group_failed_trigger_guard_condition) { @@ -168,9 +151,15 @@ TEST_F(TestExecutor, remove_callback_group_null_node) { node.reset(); + + /** + * TODO(mjcarroll): Assert this when we are enforcing that nodes must be destroyed + * after their created callback groups. RCLCPP_EXPECT_THROW_EQ( dummy.remove_callback_group(cb_group, false), std::runtime_error("Node must not be deleted before its callback group(s).")); + */ + EXPECT_NO_THROW(dummy.remove_callback_group(cb_group, false)); } TEST_F(TestExecutor, remove_callback_group_failed_trigger_guard_condition) { @@ -197,7 +186,7 @@ TEST_F(TestExecutor, remove_node_not_associated) { RCLCPP_EXPECT_THROW_EQ( dummy.remove_node(node->get_node_base_interface(), false), - std::runtime_error("Node needs to be associated with an executor.")); + std::runtime_error("Node '/ns/node' needs to be associated with an executor.")); } TEST_F(TestExecutor, remove_node_associated_with_different_executor) { @@ -211,7 +200,7 @@ TEST_F(TestExecutor, remove_node_associated_with_different_executor) { RCLCPP_EXPECT_THROW_EQ( dummy2.remove_node(node1->get_node_base_interface(), false), - std::runtime_error("Node needs to be associated with this executor.")); + std::runtime_error("Node '/ns/node1' needs to be associated with this executor.")); } TEST_F(TestExecutor, spin_node_once_nanoseconds) { @@ -328,42 +317,14 @@ TEST_F(TestExecutor, cancel_failed_trigger_guard_condition) { std::runtime_error("Failed to trigger guard condition in cancel: error not set")); } -TEST_F(TestExecutor, set_memory_strategy_nullptr) { - DummyExecutor dummy; - - RCLCPP_EXPECT_THROW_EQ( - dummy.set_memory_strategy(nullptr), - std::runtime_error("Received NULL memory strategy in executor.")); -} - -TEST_F(TestExecutor, set_memory_strategy) { - DummyExecutor dummy; - rclcpp::memory_strategy::MemoryStrategy::SharedPtr strategy = - std::make_shared< - rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); - - dummy.set_memory_strategy(strategy); - EXPECT_EQ(dummy.memory_strategy_ptr(), strategy.get()); -} - -TEST_F(TestExecutor, spin_once_failed_trigger_guard_condition) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}); - - dummy.add_node(node); - // Wait for the wall timer to have expired. - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); +TEST_F(TestExecutor, create_executor_fail_wait_set_clear) { + auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( - dummy.spin_once(std::chrono::milliseconds(1)), - std::runtime_error( - "Failed to trigger guard condition from execute_any_executable: error not set")); + DummyExecutor dummy, + std::runtime_error("Couldn't clear the wait set: error not set")); } -TEST_F(TestExecutor, spin_some_fail_wait_set_clear) { +TEST_F(TestExecutor, spin_all_fail_wait_set_clear) { DummyExecutor dummy; auto node = std::make_shared("node", "ns"); auto timer = @@ -371,9 +332,10 @@ TEST_F(TestExecutor, spin_some_fail_wait_set_clear) { dummy.add_node(node); auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR); + RCLCPP_EXPECT_THROW_EQ( - dummy.spin_some(std::chrono::milliseconds(1)), - std::runtime_error("Couldn't clear wait set: error not set")); + dummy.spin_all(std::chrono::milliseconds(1)), + std::runtime_error("Couldn't clear the wait set: error not set")); } TEST_F(TestExecutor, spin_some_fail_wait_set_resize) { @@ -401,7 +363,7 @@ TEST_F(TestExecutor, spin_some_fail_add_handles_to_wait_set) { RCL_RET_ERROR); RCLCPP_EXPECT_THROW_EQ( dummy.spin_some(std::chrono::milliseconds(1)), - std::runtime_error("Couldn't fill wait set")); + std::runtime_error("Couldn't fill wait set: error not set")); } TEST_F(TestExecutor, spin_some_fail_wait) { @@ -417,71 +379,6 @@ TEST_F(TestExecutor, spin_some_fail_wait) { std::runtime_error("rcl_wait() failed: error not set")); } -TEST_F(TestExecutor, get_node_by_group_null_group) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(nullptr)); -} - -TEST_F(TestExecutor, get_node_by_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - ASSERT_EQ(node->get_node_base_interface().get(), dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_node_by_group_not_found) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_nullptr) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(nullptr)); -} - -TEST_F(TestExecutor, get_group_by_timer) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_with_deleted_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - cb_group.reset(); - - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_add_callback_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - TEST_F(TestExecutor, spin_until_future_complete_in_spin_until_future_complete) { DummyExecutor dummy; auto node = std::make_shared("node", "ns");