diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 89c39652e2..db44fa9983 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -577,11 +577,14 @@ class IntraProcessManager auto server = get_matching_intra_process_action_server(ipc_action_client_id); if (server) { + { + std::unique_lock lock(mutex_); + clients_uuid_to_id_[goal_id] = ipc_action_client_id; + } + server->store_ipc_action_goal_request( ipc_action_client_id, std::move(goal_request)); - std::unique_lock lock(mutex_); - clients_uuid_to_id_[goal_id] = ipc_action_client_id; return true; } return false; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 19df759334..0b9f7aad6b 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -351,7 +351,11 @@ class Publisher : public PublisherBase bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); - if (inter_process_publish_needed) { + // If the publisher is configured with transient local durability, we must publish + // inter-process. This ensures that the RMW stores the messages for late joiner subscriptions. + // This has the consequence of subscriptions experiencing the double-delivery issue + // mentioned in https://github.com/ros2/rclcpp/issues/1750 + if (inter_process_publish_needed || buffer_) { auto ros_msg_ptr = std::make_shared(); // TODO(clalancette): This is unnecessarily doing an additional conversion // that may have already been done in do_intra_process_publish_and_return_shared(). @@ -363,11 +367,6 @@ class Publisher : public PublisherBase buffer_->add_shared(ros_msg_ptr); } } else { - if (buffer_) { - auto ros_msg_ptr = std::make_shared(); - rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); - buffer_->add_shared(ros_msg_ptr); - } this->do_intra_process_publish(std::move(msg)); } } diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index 16702dfe82..62dd641c1f 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl_action/action_client.h" #include "rcl_action/wait.h" @@ -31,6 +32,67 @@ namespace rclcpp_action { +struct ClientBaseData +{ + struct FeedbackReadyData + { + FeedbackReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), feedback_message(msg) {} + rcl_ret_t ret; + std::shared_ptr feedback_message; + }; + struct StatusReadyData + { + StatusReadyData(rcl_ret_t retIn, std::shared_ptr msg) + : ret(retIn), status_message(msg) {} + rcl_ret_t ret; + std::shared_ptr status_message; + }; + struct GoalResponseData + { + GoalResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), goal_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr goal_response; + }; + struct CancelResponseData + { + CancelResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), cancel_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + }; + struct ResultResponseData + { + ResultResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr response) + : ret(retIn), response_header(header), result_response(response) {} + rcl_ret_t ret; + rmw_request_id_t response_header; + std::shared_ptr result_response; + }; + + std::variant< + FeedbackReadyData, + StatusReadyData, + GoalResponseData, + CancelResponseData, + ResultResponseData + > data; + + explicit ClientBaseData(FeedbackReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(StatusReadyData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(GoalResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(CancelResponseData && data_in) + : data(std::move(data_in)) {} + explicit ClientBaseData(ResultResponseData && data_in) + : data(std::move(data_in)) {} +}; + class ClientBaseImpl { public: @@ -94,11 +156,13 @@ class ClientBaseImpl size_t num_clients{0u}; size_t num_services{0u}; - bool is_feedback_ready{false}; - bool is_status_ready{false}; - bool is_goal_response_ready{false}; - bool is_cancel_response_ready{false}; - bool is_result_response_ready{false}; + // Lock for action_client_ + std::recursive_mutex action_client_mutex_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); rclcpp::Context::SharedPtr context_; rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; @@ -142,6 +206,7 @@ bool ClientBase::action_server_is_ready() const { bool is_ready; + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_server_is_available( this->pimpl_->node_handle.get(), this->pimpl_->client_handle.get(), @@ -255,6 +320,7 @@ ClientBase::get_number_of_ready_services() void ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) { + std::lock_guard lock(pimpl_->action_client_mutex_); rcl_ret_t ret = rcl_action_wait_set_add_action_client( wait_set, pimpl_->client_handle.get(), nullptr, nullptr); if (RCL_RET_OK != ret) { @@ -265,23 +331,56 @@ ClientBase::add_to_wait_set(rcl_wait_set_t * wait_set) bool ClientBase::is_ready(rcl_wait_set_t * wait_set) { - rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( - wait_set, pimpl_->client_handle.get(), - &pimpl_->is_feedback_ready, - &pimpl_->is_status_ready, - &pimpl_->is_goal_response_ready, - &pimpl_->is_cancel_response_ready, - &pimpl_->is_result_response_ready); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to check for any ready entities"); + bool is_feedback_ready{false}; + bool is_status_ready{false}; + bool is_goal_response_ready{false}; + bool is_cancel_response_ready{false}; + bool is_result_response_ready{false}; + + rcl_ret_t ret; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + ret = rcl_action_client_wait_set_get_entities_ready( + wait_set, pimpl_->client_handle.get(), + &is_feedback_ready, + &is_status_ready, + &is_goal_response_ready, + &is_cancel_response_ready, + &is_result_response_ready); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to check for any ready entities"); + } + } + + pimpl_->next_ready_event = ClientBaseImpl::NO_EVENT_READY; + + if (is_goal_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::GoalClient); + return true; + } + + if (is_result_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::ResultClient); + return true; + } + + if (is_cancel_response_ready) { + pimpl_->next_ready_event = static_cast(EntityType::CancelClient); + return true; + } + + if (is_feedback_ready) { + pimpl_->next_ready_event = static_cast(EntityType::FeedbackSubscription); + return true; + } + + if (is_status_ready) { + pimpl_->next_ready_event = static_cast(EntityType::StatusSubscription); + return true; } - return - pimpl_->is_feedback_ready || - pimpl_->is_status_ready || - pimpl_->is_goal_response_ready || - pimpl_->is_cancel_response_ready || - pimpl_->is_result_response_ready; + + return false; } void @@ -432,7 +531,6 @@ ClientBase::set_callback_to_entity( } }; - // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has // been replaced but the middleware hasn't been told about the new one yet. @@ -550,140 +648,159 @@ ClientBase::clear_on_ready_callback() std::shared_ptr ClientBase::take_data() { - if (pimpl_->is_goal_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr goal_response = this->create_goal_response(); - rcl_ret_t ret = rcl_action_take_goal_response( - pimpl_->client_handle.get(), &response_header, goal_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, goal_response)); - } else if (pimpl_->is_result_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr result_response = this->create_result_response(); - rcl_ret_t ret = rcl_action_take_result_response( - pimpl_->client_handle.get(), &response_header, result_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, result_response)); - } else if (pimpl_->is_cancel_response_ready) { - rmw_request_id_t response_header; - std::shared_ptr cancel_response = this->create_cancel_response(); - rcl_ret_t ret = rcl_action_take_cancel_response( - pimpl_->client_handle.get(), &response_header, cancel_response.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, response_header, cancel_response)); - } else if (pimpl_->is_feedback_ready) { - std::shared_ptr feedback_message = this->create_feedback_message(); - rcl_ret_t ret = rcl_action_take_feedback( - pimpl_->client_handle.get(), feedback_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, feedback_message)); - } else if (pimpl_->is_status_ready) { - std::shared_ptr status_message = this->create_status_message(); - rcl_ret_t ret = rcl_action_take_status( - pimpl_->client_handle.get(), status_message.get()); - return std::static_pointer_cast( - std::make_shared>>( - ret, status_message)); - } else { - throw std::runtime_error("Taking data from action client but nothing is ready"); + // next_ready_event is an atomic, caching localy + size_t next_ready_event = pimpl_->next_ready_event.exchange(ClientBaseImpl::NO_EVENT_READY); + + if (next_ready_event == ClientBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. + return nullptr; } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ClientBase::take_data_by_entity_id(size_t id) { + std::shared_ptr data_ptr; + rcl_ret_t ret; + // Mark as ready the entity from which we want to take data switch (static_cast(id)) { case EntityType::GoalClient: - pimpl_->is_goal_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr goal_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + + goal_response = this->create_goal_response(); + ret = rcl_action_take_goal_response( + pimpl_->client_handle.get(), &response_header, goal_response.get()); + } + data_ptr = std::make_shared( + ClientBaseData::GoalResponseData( + ret, response_header, goal_response)); + } break; case EntityType::ResultClient: - pimpl_->is_result_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr result_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + result_response = this->create_result_response(); + ret = rcl_action_take_result_response( + pimpl_->client_handle.get(), &response_header, result_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::ResultResponseData( + ret, response_header, result_response)); + } break; case EntityType::CancelClient: - pimpl_->is_cancel_response_ready = true; + { + rmw_request_id_t response_header; + std::shared_ptr cancel_response; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + cancel_response = this->create_cancel_response(); + ret = rcl_action_take_cancel_response( + pimpl_->client_handle.get(), &response_header, cancel_response.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::CancelResponseData( + ret, response_header, cancel_response)); + } break; case EntityType::FeedbackSubscription: - pimpl_->is_feedback_ready = true; + { + std::shared_ptr feedback_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + feedback_message = this->create_feedback_message(); + ret = rcl_action_take_feedback( + pimpl_->client_handle.get(), feedback_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::FeedbackReadyData( + ret, feedback_message)); + } break; case EntityType::StatusSubscription: - pimpl_->is_status_ready = true; + { + std::shared_ptr status_message; + { + std::lock_guard lock(pimpl_->action_client_mutex_); + status_message = this->create_status_message(); + ret = rcl_action_take_status( + pimpl_->client_handle.get(), status_message.get()); + } + data_ptr = + std::make_shared( + ClientBaseData::StatusReadyData( + ret, status_message)); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ClientBase::execute(std::shared_ptr & data) +ClientBase::execute(std::shared_ptr & data_in) { - if (!data) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } - if (pimpl_->is_goal_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_goal_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto goal_response = std::get<2>(*shared_ptr); - this->handle_goal_response(response_header, goal_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response"); - } - } else if (pimpl_->is_result_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_result_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto result_response = std::get<2>(*shared_ptr); - this->handle_result_response(response_header, result_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response"); - } - } else if (pimpl_->is_cancel_response_ready) { - auto shared_ptr = std::static_pointer_cast< - std::tuple>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_cancel_response_ready = false; - if (RCL_RET_OK == ret) { - auto response_header = std::get<1>(*shared_ptr); - auto cancel_response = std::get<2>(*shared_ptr); - this->handle_cancel_response(response_header, cancel_response); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response"); - } - } else if (pimpl_->is_feedback_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_feedback_ready = false; - if (RCL_RET_OK == ret) { - auto feedback_message = std::get<1>(*shared_ptr); - this->handle_feedback_message(feedback_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback"); - } - } else if (pimpl_->is_status_ready) { - auto shared_ptr = std::static_pointer_cast>>(data); - auto ret = std::get<0>(*shared_ptr); - pimpl_->is_status_ready = false; - if (RCL_RET_OK == ret) { - auto status_message = std::get<1>(*shared_ptr); - this->handle_status_message(status_message); - } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status"); - } - } else { - throw std::runtime_error("Executing action client but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_feedback_message(data.feedback_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_status_message(data.status_message); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_goal_response(data.response_header, data.goal_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_result_response(data.response_header, data.result_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response"); + } + } + if constexpr (std::is_same_v) { + if (RCL_RET_OK == data.ret) { + this->handle_cancel_response(data.response_header, data.cancel_response); + } else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) { + rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response"); + } + } + }, data_ptr->data); } void diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index c7c6f24778..9a1c1c3303 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "rcl_action/action_server.h" @@ -33,8 +34,50 @@ using rclcpp_action::ServerBase; using rclcpp_action::GoalUUID; +struct ServerBaseData; + namespace rclcpp_action { + +struct ServerBaseData +{ + using GoalRequestData = std::tuple< + rcl_ret_t, + const rcl_action_goal_info_t, + rmw_request_id_t, + std::shared_ptr + >; + + using CancelRequestData = std::tuple< + rcl_ret_t, + std::shared_ptr, + rmw_request_id_t + >; + + using ResultRequestData = std::tuple, rmw_request_id_t>; + + using GoalExpiredData = struct Empty {}; + + std::variant data; + + explicit ServerBaseData(GoalRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(CancelRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(ResultRequestData && data_in) + : data(std::move(data_in)) {} + explicit ServerBaseData(GoalExpiredData && data_in) + : data(std::move(data_in)) {} +}; + +enum class ActionEventType : std::size_t +{ + GoalService, + ResultService, + CancelService, + Expired, +}; + class ServerBaseImpl { public: @@ -60,11 +103,6 @@ class ServerBaseImpl size_t num_services_ = 0; size_t num_guard_conditions_ = 0; - std::atomic goal_request_ready_{false}; - std::atomic cancel_request_ready_{false}; - std::atomic result_request_ready_{false}; - std::atomic goal_expired_{false}; - // Lock for unordered_maps std::recursive_mutex unordered_map_mutex_; @@ -75,8 +113,15 @@ class ServerBaseImpl // rcl goal handles are kept so api to send result doesn't try to access freed memory std::unordered_map> goal_handles_; + + // next ready event for taking, will be set by is_ready and will be processed by take_data + std::atomic next_ready_event; + // used to indicate that next_ready_event has no ready event for processing + static constexpr size_t NO_EVENT_READY = std::numeric_limits::max(); + rclcpp::Logger logger_; }; + } // namespace rclcpp_action ServerBase::ServerBase( @@ -194,124 +239,170 @@ ServerBase::is_ready(rcl_wait_set_t * wait_set) &goal_expired); } - pimpl_->goal_request_ready_ = goal_request_ready; - pimpl_->cancel_request_ready_ = cancel_request_ready; - pimpl_->result_request_ready_ = result_request_ready; - pimpl_->goal_expired_ = goal_expired; - if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - return pimpl_->goal_request_ready_.load() || - pimpl_->cancel_request_ready_.load() || - pimpl_->result_request_ready_.load() || - pimpl_->goal_expired_.load(); -} + pimpl_->next_ready_event = ServerBaseImpl::NO_EVENT_READY; -std::shared_ptr -ServerBase::take_data() -{ - if (pimpl_->goal_request_ready_.load()) { - rcl_ret_t ret; - rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); - rmw_request_id_t request_header; + if (goal_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::GoalService); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + if (cancel_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::CancelService); + return true; + } - std::shared_ptr message = create_goal_request(); - ret = rcl_action_take_goal_request( - pimpl_->action_server_.get(), - &request_header, - message.get()); - - return std::static_pointer_cast( - std::make_shared - >>( - ret, - goal_info, - request_header, message)); - } else if (pimpl_->cancel_request_ready_.load()) { - rcl_ret_t ret; - rmw_request_id_t request_header; + if (result_request_ready) { + pimpl_->next_ready_event = static_cast(ActionEventType::ResultService); + return true; + } - // Initialize cancel request - auto request = std::make_shared(); + if (goal_expired) { + pimpl_->next_ready_event = static_cast(ActionEventType::Expired); + return true; + } - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_cancel_request( - pimpl_->action_server_.get(), - &request_header, - request.get()); + return false; +} - return std::static_pointer_cast( - std::make_shared - , - rmw_request_id_t>>(ret, request, request_header)); - } else if (pimpl_->result_request_ready_.load()) { - rcl_ret_t ret; - // Get the result request message - rmw_request_id_t request_header; - std::shared_ptr result_request = create_result_request(); - std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); - ret = rcl_action_take_result_request( - pimpl_->action_server_.get(), &request_header, result_request.get()); +std::shared_ptr +ServerBase::take_data() +{ + size_t next_ready_event = pimpl_->next_ready_event.exchange(ServerBaseImpl::NO_EVENT_READY); - return std::static_pointer_cast( - std::make_shared, rmw_request_id_t>>( - ret, result_request, request_header)); - } else if (pimpl_->goal_expired_.load()) { + if (next_ready_event == ServerBaseImpl::NO_EVENT_READY) { + // there is a known bug in iron, that take_data might be called multiple + // times. Therefore instead of throwing, we just return a nullptr as a workaround. return nullptr; - } else { - throw std::runtime_error("Taking data from action server but nothing is ready"); } + + return take_data_by_entity_id(next_ready_event); } std::shared_ptr ServerBase::take_data_by_entity_id(size_t id) { + static_assert( + static_cast(EntityType::GoalService) == + static_cast(ActionEventType::GoalService)); + static_assert( + static_cast(EntityType::ResultService) == + static_cast(ActionEventType::ResultService)); + static_assert( + static_cast(EntityType::CancelService) == + static_cast(ActionEventType::CancelService)); + + std::shared_ptr data_ptr; // Mark as ready the entity from which we want to take data - switch (static_cast(id)) { - case EntityType::GoalService: - pimpl_->goal_request_ready_ = true; + switch (static_cast(id)) { + case ActionEventType::GoalService: + { + rcl_ret_t ret; + rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); + rmw_request_id_t request_header; + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + std::shared_ptr message = create_goal_request(); + ret = rcl_action_take_goal_request( + pimpl_->action_server_.get(), + &request_header, + message.get()); + + data_ptr = std::make_shared( + ServerBaseData::GoalRequestData(ret, goal_info, request_header, message)); + } break; - case EntityType::ResultService: - pimpl_->result_request_ready_ = true; + case ActionEventType::ResultService: + { + rcl_ret_t ret; + // Get the result request message + rmw_request_id_t request_header; + std::shared_ptr result_request = create_result_request(); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_result_request( + pimpl_->action_server_.get(), &request_header, result_request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::ResultRequestData(ret, result_request, request_header)); + } break; - case EntityType::CancelService: - pimpl_->cancel_request_ready_ = true; + case ActionEventType::CancelService: + { + rcl_ret_t ret; + rmw_request_id_t request_header; + + // Initialize cancel request + auto request = std::make_shared(); + + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_cancel_request( + pimpl_->action_server_.get(), + &request_header, + request.get()); + + data_ptr = + std::make_shared( + ServerBaseData::CancelRequestData(ret, request, request_header)); + } + break; + case ActionEventType::Expired: + { + data_ptr = + std::make_shared(ServerBaseData::GoalExpiredData()); + } break; } - return take_data(); + return std::static_pointer_cast(data_ptr); } void -ServerBase::execute(std::shared_ptr & data) +ServerBase::execute(std::shared_ptr & data_in) { - if (!data && !pimpl_->goal_expired_.load()) { - throw std::runtime_error("'data' is empty"); + if (!data_in) { + // workaround, if take_data was called multiple timed, it returns a nullptr + // normally we should throw here, but as an API stable bug fix, we just ignore this... + return; } - if (pimpl_->goal_request_ready_.load()) { - execute_goal_request_received(data); - } else if (pimpl_->cancel_request_ready_.load()) { - execute_cancel_request_received(data); - } else if (pimpl_->result_request_ready_.load()) { - execute_result_request_received(data); - } else if (pimpl_->goal_expired_.load()) { - execute_check_expired_goals(); - } else { - throw std::runtime_error("Executing action server but nothing is ready"); - } + std::shared_ptr data_ptr = std::static_pointer_cast(data_in); + + std::visit( + [&](auto && data) -> void { + using T = std::decay_t; + if constexpr (std::is_same_v) { + execute_goal_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_cancel_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_result_request_received(data_in); + } + if constexpr (std::is_same_v) { + execute_check_expired_goals(); + } + }, + data_ptr->data); } void ServerBase::execute_goal_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - >>(data); - rcl_ret_t ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::GoalRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + rcl_action_goal_info_t goal_info = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + const std::shared_ptr message = std::get<3>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -320,14 +411,6 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - rcl_action_goal_info_t goal_info = std::get<1>(*shared_ptr); - rmw_request_id_t request_header = std::get<2>(*shared_ptr); - std::shared_ptr message = std::get<3>(*shared_ptr); - - bool expected = true; - if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) { - return; - } GoalUUID uuid = get_goal_id_from_goal_request(message.get()); convert(uuid, &goal_info); @@ -352,8 +435,36 @@ ServerBase::execute_goal_request_received(std::shared_ptr & data) // if goal is accepted, create a goal handle, and store it if (GoalResponse::ACCEPT_AND_EXECUTE == status || GoalResponse::ACCEPT_AND_DEFER == status) { RCLCPP_DEBUG(pimpl_->logger_, "Accepted goal %s", to_string(uuid).c_str()); + // rcl_action will set time stamp + auto deleter = [](rcl_action_goal_handle_t * ptr) + { + if (nullptr != ptr) { + rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr); + if (RCL_RET_OK != fail_ret) { + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp_action"), + "failed to fini rcl_action_goal_handle_t in deleter"); + } + delete ptr; + } + }; + rcl_action_goal_handle_t * rcl_handle; + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info); + } + if (!rcl_handle) { + throw std::runtime_error("Failed to accept new goal\n"); + } - auto handle = get_rcl_action_goal_handle(goal_info, uuid); + std::shared_ptr handle(new rcl_action_goal_handle_t, deleter); + // Copy out goal handle since action server storage disappears when it is fini'd + *handle = *rcl_handle; + + { + std::lock_guard lock(pimpl_->unordered_map_mutex_); + pimpl_->goal_handles_[uuid] = handle; + } if (GoalResponse::ACCEPT_AND_EXECUTE == status) { // Change status to executing @@ -471,12 +582,15 @@ ServerBase::process_cancel_request(rcl_action_cancel_request_t & cancel_request) void ServerBase::execute_cancel_request_received(std::shared_ptr & data) { - pimpl_->cancel_request_ready_ = false; + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::CancelRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + - auto shared_ptr = std::static_pointer_cast - , - rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -485,8 +599,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); // Convert c++ message to C message rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request(); @@ -494,7 +606,53 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) cancel_request.goal_info.stamp.sec = request->goal_info.stamp.sec; cancel_request.goal_info.stamp.nanosec = request->goal_info.stamp.nanosec; - auto response = process_cancel_request(cancel_request); + // Get a list of goal info that should be attempted to be cancelled + rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response(); + + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_process_cancel_request( + pimpl_->action_server_.get(), + &cancel_request, + &cancel_response); + } + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + RCPPUTILS_SCOPE_EXIT( + { + ret = rcl_action_cancel_response_fini(&cancel_response); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR(pimpl_->logger_, "Failed to fini cancel response"); + } + }); + + auto response = std::make_shared(); + + response->return_code = cancel_response.msg.return_code; + auto & goals = cancel_response.msg.goals_canceling; + // For each canceled goal, call cancel callback + for (size_t i = 0; i < goals.size; ++i) { + const rcl_action_goal_info_t & goal_info = goals.data[i]; + GoalUUID uuid; + convert(goal_info, &uuid); + auto response_code = call_handle_cancel_callback(uuid); + if (CancelResponse::ACCEPT == response_code) { + action_msgs::msg::GoalInfo cpp_info; + cpp_info.goal_id.uuid = uuid; + cpp_info.stamp.sec = goal_info.stamp.sec; + cpp_info.stamp.nanosec = goal_info.stamp.nanosec; + response->goals_canceling.push_back(cpp_info); + } + } + + // If the user rejects all individual requests to cancel goals, + // then we consider the top-level cancel request as rejected. + if (goals.size >= 1u && 0u == response->goals_canceling.size()) { + response->return_code = action_msgs::srv::CancelGoal::Response::ERROR_REJECTED; + } if (!response->goals_canceling.empty()) { // at least one goal state changed, publish a new status message @@ -516,9 +674,14 @@ ServerBase::execute_cancel_request_received(std::shared_ptr & data) void ServerBase::execute_result_request_received(std::shared_ptr & data) { - auto shared_ptr = std::static_pointer_cast - , rmw_request_id_t>>(data); - auto ret = std::get<0>(*shared_ptr); + std::shared_ptr data_ptr = std::static_pointer_cast(data); + const ServerBaseData::ResultRequestData & gData( + std::get(data_ptr->data)); + + rcl_ret_t ret = std::get<0>(gData); + std::shared_ptr result_request = std::get<1>(gData); + rmw_request_id_t request_header = std::get<2>(gData); + if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -527,10 +690,7 @@ ServerBase::execute_result_request_received(std::shared_ptr & data) } else if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - auto result_request = std::get<1>(*shared_ptr); - auto request_header = std::get<2>(*shared_ptr); - pimpl_->result_request_ready_ = false; std::shared_ptr result_response; // check if the goal exists @@ -653,10 +813,56 @@ ServerBase::get_status_array() void ServerBase::publish_status() { - auto status_msg = get_status_array(); + rcl_ret_t ret; + + // We need to hold the lock across this entire method because + // rcl_action_server_get_goal_handles() returns an internal pointer to the + // goal data. + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + + // Get all goal handles known to C action server + rcl_action_goal_handle_t ** goal_handles = NULL; + size_t num_goals = 0; + ret = rcl_action_server_get_goal_handles( + pimpl_->action_server_.get(), &goal_handles, &num_goals); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + auto status_msg = std::make_shared(); + status_msg->status_list.reserve(num_goals); + // Populate a c++ status message with the goals and their statuses + rcl_action_goal_status_array_t c_status_array = + rcl_action_get_zero_initialized_goal_status_array(); + ret = rcl_action_get_goal_status_array(pimpl_->action_server_.get(), &c_status_array); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + RCPPUTILS_SCOPE_EXIT( + { + ret = rcl_action_goal_status_array_fini(&c_status_array); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR(pimpl_->logger_, "Failed to fini status array message"); + } + }); + + for (size_t i = 0; i < c_status_array.msg.status_list.size; ++i) { + auto & c_status_msg = c_status_array.msg.status_list.data[i]; + + action_msgs::msg::GoalStatus msg; + msg.status = c_status_msg.status; + // Convert C goal info to C++ goal info + convert(c_status_msg.goal_info, &msg.goal_info.goal_id.uuid); + msg.goal_info.stamp.sec = c_status_msg.goal_info.stamp.sec; + msg.goal_info.stamp.nanosec = c_status_msg.goal_info.stamp.nanosec; + + status_msg->status_list.push_back(msg); + } // Publish the message through the status publisher - rcl_ret_t ret = rcl_action_publish_status(pimpl_->action_server_.get(), status_msg.get()); + ret = rcl_action_publish_status(pimpl_->action_server_.get(), status_msg.get()); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret);