diff --git a/source/modulo_components/modulo_components/component_interface.py b/source/modulo_components/modulo_components/component_interface.py index 594a700f..030fe93e 100644 --- a/source/modulo_components/modulo_components/component_interface.py +++ b/source/modulo_components/modulo_components/component_interface.py @@ -10,6 +10,7 @@ import modulo_core.translators.message_writers as modulo_writers import state_representation as sr from geometry_msgs.msg import TransformStamped +from modulo_interfaces.msg import Assignment as AssignmentMsg from modulo_interfaces.msg import Predicate as PredicateMsg from modulo_interfaces.msg import PredicateCollection from modulo_interfaces.srv import EmptyTrigger, StringTrigger @@ -46,6 +47,7 @@ def __init__(self, node_name: str, *args, **kwargs): super().__init__(node_name, *args, **node_kwargs) self.__step_lock = Lock() self.__parameter_dict: Dict[str, Union[str, sr.Parameter]] = {} + self.__assignment_dict: Dict[str, sr.Parameter] = {} self.__read_only_parameters: Dict[str, bool] = {} self.__pre_set_parameters_callback_called = False self.__set_parameters_result = SetParametersResult() @@ -68,6 +70,9 @@ def __init__(self, node_name: str, *args, **kwargs): self.add_parameter(sr.Parameter("rate", 10.0, sr.ParameterType.DOUBLE), "The rate in Hertz for all periodic callbacks") + self.__assignment_publisher = self.create_publisher(AssignmentMsg, "/assignments", self.__qos) + self.__assignment_message = AssignmentMsg() + self.__assignment_message.node = self.get_fully_qualified_name() self.__predicate_publisher = self.create_publisher(PredicateCollection, "/predicates", self.__qos) self.__predicate_message = PredicateCollection() self.__predicate_message.node = self.get_fully_qualified_name() @@ -342,6 +347,70 @@ def add_predicate(self, name: str, predicate: Union[bool, Callable[[], bool]]) - except Exception as e: self.get_logger().error(f"Failed to add predicate '{name}': {e}") + def add_assignment(self, name: str, type: sr.ParameterType) -> None: + """ + Add an assignment to the dictionary of assignments. + + :param name: The name of the assignment + :param type: The type of the assignment + """ + parsed_name = parse_topic_name(name) + if not parsed_name: + self.get_logger().error( + f"The parsed name for assignment '{name}' is empty. Provide a " + "string with valid characters for the assignment name ([a-z0-9_]).") + return + if parsed_name != name: + self.get_logger().error( + f"The parsed name for assignment '{name}' is '{parsed_name}'. Use the parsed name " + "to refer to this assignment.") + if parsed_name in self.__assignment_dict.keys(): + self.get_logger().warn(f"Assignment with name '{parsed_name}' already exists, overwriting.") + else: + self.get_logger().debug(f"Adding assignment '{parsed_name}'.") + try: + self.__assignment_dict[parsed_name] = sr.Parameter(parsed_name, type) + except Exception as e: + self.get_logger().error(f"Failed to add assignment '{parsed_name}': {e}") + + def get_assignment(self, name: str) -> T: + """ + Get the assignment value from the assignment dictionary by its name. + + :param name: The name of the assignment + :raises InvalidAssignmentError: if the assignment does not exist + :raises EmptyStateError: if the assignment has not been set yet + :return: The value of the assignment, if the assignment exists and has been assigned + """ + if name not in self.__assignment_dict.keys(): + raise InvalidAssignmentError(f"Assignment '{name}' is not in the dict of assignments") + if self.__assignment_dict[name].is_empty(): + # TODO: remove after control libraries v9.3.1 + raise sr.exceptions.EmptyStateError(f"{name} state is empty") + return self.__assignment_dict[name].get_value() + + def set_assignment(self, name: str, value: T) -> None: + """ + Set the value of an assignment. The assignment must have been previously declared. + + :param name: The name of the assignment + :param value: The value of the assignment + """ + if name not in self.__assignment_dict.keys(): + self.get_logger().error( + f"Failed to set assignment '{name}': Assignment does not exist.", throttle_duration_sec=1.0) + return + try: + self.__assignment_dict[name].set_value(value) + ros_param = write_parameter(self.__assignment_dict[name]) + except Exception as e: + self.get_logger().error(f"Failed to set assignment '{name}': {e}", throttle_duration_sec=1.0) + return + + message = copy.copy(self.__assignment_message) + message.assignment = ros_param.to_parameter_msg() + self.__assignment_publisher.publish(message) + def get_predicate(self, name: str) -> bool: """ Get the value of the predicate given as parameter. If the predicate is not found or the callable function fails, diff --git a/source/modulo_components/test/python/test_component_interface.py b/source/modulo_components/test/python/test_component_interface.py index d1fba9ae..94b555cb 100644 --- a/source/modulo_components/test/python/test_component_interface.py +++ b/source/modulo_components/test/python/test_component_interface.py @@ -7,7 +7,7 @@ import state_representation as sr from modulo_interfaces.srv import EmptyTrigger, StringTrigger from modulo_components.component_interface import ComponentInterface -from modulo_core.exceptions import CoreError, LookupTransformError +from modulo_core.exceptions import CoreError, LookupTransformError, InvalidAssignmentError from rclpy.qos import QoSProfile from std_msgs.msg import Bool, String from sensor_msgs.msg import JointState @@ -71,6 +71,38 @@ def test_set_predicate(component_interface): assert not component_interface.get_predicate('bar') +def test_add_assignment(component_interface): + component_interface.add_assignment('string_assignment', sr.ParameterType.STRING) + assert len(component_interface._ComponentInterface__assignment_dict) == 1 + # adding an empty assignment should fail + component_interface.add_assignment('', sr.ParameterType.STRING) + assert len(component_interface._ComponentInterface__assignment_dict) == 1 + # adding the assignment again should just overwrite + component_interface.add_assignment('string_assignment', sr.ParameterType.STRING) + assert len(component_interface._ComponentInterface__assignment_dict) == 1 + # names should be cleaned up + component_interface.add_assignment('7cleEaGn_AaSssiGNgn#ment', sr.ParameterType.STRING) + assert 'clean_assignment' in component_interface._ComponentInterface__assignment_dict.keys() + assert len(component_interface._ComponentInterface__assignment_dict) == 2 + # names without valid characters should fail + component_interface.add_assignment('@@@@@@@', sr.ParameterType.STRING) + assert len(component_interface._ComponentInterface__assignment_dict) == 2 + + +def test_get_set_assignment(component_interface): + component_interface.add_assignment('int_assignment', sr.ParameterType.INT) + with pytest.raises(InvalidAssignmentError): + component_interface.get_assignment('string_assignment') + component_interface.set_assignment('string_assignment', 'test') + with pytest.raises(sr.exceptions.EmptyStateError): + component_interface.get_assignment('int_assignment') + # setting the wrong type of value should fail + component_interface.set_assignment('int_assignment', 'test') + assert component_interface._ComponentInterface__assignment_dict['int_assignment'].is_empty() + component_interface.set_assignment('int_assignment', 5) + assert component_interface.get_assignment('int_assignment') == 5 + + def test_declare_signal(component_interface): component_interface.declare_input("input", "test") assert component_interface.get_parameter_value("input_topic") == "test" diff --git a/source/modulo_core/modulo_core/exceptions.py b/source/modulo_core/modulo_core/exceptions.py index 08cada96..c901c710 100644 --- a/source/modulo_core/modulo_core/exceptions.py +++ b/source/modulo_core/modulo_core/exceptions.py @@ -74,3 +74,11 @@ class LookupJointPositionsException(CoreError): def __init__(self, message: str): super().__init__(message, "LookupJointPositionsException") + +class InvalidAssignmentError(CoreError): + """ + An exception class to notify errors when getting the value of an assignment. + """ + + def __init__(self, message: str): + super().__init__(message, "InvalidAssignmentError")