Skip to content

Commit

Permalink
fix(components): try catch subscription callbacks (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
domire8 authored Dec 13, 2024
1 parent e5535d4 commit 935db82
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 10 deletions.
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ Release Versions:

## Upcoming changes

- fix(components): remove incorrect log line (#166)
- fix(controllers): move predicate publishing rate parameter to BaseControllerInterface (#168)
- feat(components): get clproto message type from attribute (#175)
- fix(components): add missing test case (#181)
- fix(components): clean up lifecycle nodes properly (#178)
- fix(components): remove incorrect log line (#166)
- fix(controllers): move predicate publishing rate parameter to BaseControllerInterface (#168)
- feat(components): get clproto message type from attribute (#175)
- fix(components): add missing test case (#181)
- fix(components): clean up lifecycle nodes properly (#178)
- fix(components): try catch subscription callbacks (#167)

## 5.0.2

Expand Down
2 changes: 1 addition & 1 deletion aica-package.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#syntax=ghcr.io/aica-technology/package-builder:v1.3.0

[metadata]
version = "5.1.0-rc0002"
version = "5.1.0-rc0003"
description = "Modular ROS 2 extension library for dynamic composition of components and controllers with the AICA robotics framework"

[metadata.collection]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,17 @@ inline void ComponentInterface::add_input(
RCLCPP_DEBUG_STREAM(
this->node_logging_->get_logger(),
"Adding input '" << parsed_signal_name << "' with topic name '" << topic_name << "'.");
auto subscription =
rclcpp::create_subscription<MsgT>(this->node_parameters_, this->node_topics_, topic_name, this->qos_, callback);
auto subscription = rclcpp::create_subscription<MsgT>(
this->node_parameters_, this->node_topics_, topic_name, this->qos_,
[this, signal_name, callback](const std::shared_ptr<MsgT> message) {
try {
callback(message);
} catch (const std::exception& ex) {
RCLCPP_WARN_STREAM_THROTTLE(
this->node_logging_->get_logger(), *this->node_clock_->get_clock(), 1000,
"Unhandled exception in callback of input '" << signal_name << "': " << ex.what());
}
});
auto subscription_interface =
std::make_shared<SubscriptionHandler<MsgT>>()->create_subscription_interface(subscription);
this->inputs_.insert_or_assign(parsed_signal_name, subscription_interface);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ def __subscription_callback(
self.get_logger().error(f"Failed to execute user callback in subscription for attribute"
f" '{attribute_name}': {e}", throttle_duration_sec=1.0)

def __safe_callback(self, message: MsgT, signal_name: str, callback: Callable):
try:
callback(message)
except Exception as e:
self.get_logger().warn(f"Unhandled exception in callback of input '{
signal_name}': {e}", throttle_duration_sec=1.0)

def __declare_signal(self, signal_name: str, signal_type: str, default_topic="", fixed_topic=False):
"""
Declare an input to create the topic parameter without adding it to the map of inputs yet.
Expand Down Expand Up @@ -594,8 +601,8 @@ def add_input(self, signal_name: str, subscription: Union[str, Callable], messag
if user_callback:
self.get_logger().warn("Providing a callable for arguments 'subscription' and 'user_callback' is"
"not supported. The user callback will be ignored.")
self.__inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, subscription,
self.__qos)
self.__inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, partial(
self.__safe_callback, signal_name=parsed_signal_name, callback=subscription), self.__qos)
elif isinstance(subscription, str):
if callable(user_callback):
signature = inspect.signature(user_callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ class MinimalCartesianInput : public ComponentT {
std::promise<void> received_;
};

template<class ComponentT>
class ExceptionCartesianInput : public ComponentT {
public:
ExceptionCartesianInput(const rclcpp::NodeOptions& node_options, const std::string& topic)
: ComponentT(node_options, "exception_cartesian_input"), thrown_(false) {
this->received_future = this->received_.get_future();
this->template add_input<modulo_core::EncodedState>(
"cartesian_state",
[this](const std::shared_ptr<modulo_core::EncodedState>) {
if (!this->thrown_) {
this->thrown_ = true;
throw std::runtime_error("Error");
} else {
this->received_.set_value();
}
},
topic);
}

std::shared_future<void> received_future;

private:
bool thrown_;
std::promise<void> received_;
};

template<class ComponentT>
class MinimalTwistOutput : public ComponentT {
public:
Expand Down
11 changes: 11 additions & 0 deletions source/modulo_components/test/cpp/test_component_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ TEST_F(ComponentCommunicationTest, InputOutput) {
EXPECT_THROW(output_node->publish(), modulo_core::exceptions::CoreException);
}

TEST_F(ComponentCommunicationTest, ExceptionInputOutput) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<ExceptionCartesianInput<Component>>(rclcpp::NodeOptions(), "/topic");
auto output_node =
std::make_shared<MinimalCartesianOutput<Component>>(rclcpp::NodeOptions(), "/topic", cartesian_state, true);
this->exec_->add_node(input_node);
this->exec_->add_node(output_node);
auto return_code = this->exec_->spin_until_future_complete(input_node->received_future, 500ms);
ASSERT_EQ(return_code, rclcpp::FutureReturnCode::SUCCESS);
}

TEST_F(ComponentCommunicationTest, InputOutputManual) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<MinimalCartesianInput<Component>>(rclcpp::NodeOptions(), "/topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ TEST_F(LifecycleComponentCommunicationTest, InputOutput) {
EXPECT_THROW(output_node->publish(), modulo_core::exceptions::CoreException);
}

TEST_F(LifecycleComponentCommunicationTest, ExceptionInputOutput) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<ExceptionCartesianInput<LifecycleComponent>>(rclcpp::NodeOptions(), "/topic");
auto output_node = std::make_shared<MinimalCartesianOutput<LifecycleComponent>>(
rclcpp::NodeOptions(), "/topic", cartesian_state, true);
add_configure_activate(this->exec_, input_node);
add_configure_activate(this->exec_, output_node);
auto return_code = this->exec_->spin_until_future_complete(input_node->received_future, 500ms);
ASSERT_EQ(return_code, rclcpp::FutureReturnCode::SUCCESS);
}

TEST_F(LifecycleComponentCommunicationTest, InputOutputManual) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<MinimalCartesianInput<LifecycleComponent>>(rclcpp::NodeOptions(), "/topic");
Expand Down
21 changes: 21 additions & 0 deletions source/modulo_components/test/python/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import clproto
import pytest
import state_representation as sr
from types import MethodType
from modulo_components.component import Component
from modulo_core import EncodedState
from rclpy.task import Future
Expand Down Expand Up @@ -121,6 +122,26 @@ def _make_minimal_cartesian_input(component_type, topic):
yield _make_minimal_cartesian_input(request.param[0], request.param[1])


@pytest.fixture
def exception_cartesian_input(request):
def _make_exception_cartesian_input(component_type, topic):
def callback(self):
if not self.thrown:
self.thrown = True
raise RuntimeError()
else:
self.received_future.set_result(True)

component = component_type("exception_cartesian_input")
component.received_future = Future()
component.thrown = False
component.callback = MethodType(callback, component)
component.add_input("cartesian_pose", lambda msg: component.callback(), EncodedState, topic)
return component

yield _make_exception_cartesian_input(request.param[0], request.param[1])


@pytest.fixture
def minimal_sensor_input(request):
def _make_minimal_sensor_input(component_type, topic):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def test_input_output(ros_exec, random_pose, minimal_cartesian_output, minimal_c
minimal_cartesian_output.publish()


@pytest.mark.parametrize("exception_cartesian_input", [[Component, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[Component, "/topic", True]], indirect=True)
def test_exception_input_output(ros_exec, minimal_cartesian_output, exception_cartesian_input):
ros_exec.add_node(exception_cartesian_input)
ros_exec.add_node(minimal_cartesian_output)
ros_exec.spin_until_future_complete(exception_cartesian_input.received_future, timeout_sec=0.5)
assert exception_cartesian_input.received_future.done() and exception_cartesian_input.received_future.result()


@pytest.mark.parametrize("minimal_cartesian_input", [[Component, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[Component, "/topic", False]], indirect=True)
def test_input_output_manual(ros_exec, random_pose, minimal_cartesian_output, minimal_cartesian_input):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ def test_input_output(ros_exec, make_lifecycle_change_client, random_pose, minim
minimal_cartesian_output.publish()


@pytest.mark.parametrize("exception_cartesian_input", [[LifecycleComponent, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[LifecycleComponent, "/topic", True]], indirect=True)
def test_exception_input_output(
ros_exec, make_lifecycle_change_client, minimal_cartesian_output, exception_cartesian_input):
input_change_client = make_lifecycle_change_client("exception_cartesian_input")
output_change_client = make_lifecycle_change_client("minimal_cartesian_output")
ros_exec.add_node(input_change_client)
ros_exec.add_node(output_change_client)
ros_exec.add_node(exception_cartesian_input)
ros_exec.add_node(minimal_cartesian_output)
input_change_client.configure(ros_exec)
output_change_client.configure(ros_exec)
input_change_client.activate(ros_exec)
output_change_client.activate(ros_exec)
ros_exec.spin_until_future_complete(exception_cartesian_input.received_future, timeout_sec=0.5)
assert exception_cartesian_input.received_future.done() and exception_cartesian_input.received_future.result()


@pytest.mark.parametrize("minimal_cartesian_input", [[LifecycleComponent, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[LifecycleComponent, "/topic", False]], indirect=True)
def test_input_output_manual(ros_exec, make_lifecycle_change_client, random_pose, minimal_cartesian_output,
Expand Down

0 comments on commit 935db82

Please sign in to comment.