|
def parameter_dict_from_yaml_file( |
|
parameter_file: str, |
|
use_wildcard: bool = False, |
|
target_nodes: Optional[List[str]] = None, |
|
namespace: str = '' |
|
) -> Dict[str, ParameterMsg]: |
|
""" |
|
Build a dict of parameters from a YAML file. |
|
|
|
Will load all parameters if ``target_nodes`` is None or empty. |
|
|
|
:raises RuntimeError: if a target node is not in the file |
|
:raises RuntimeError: if the is not a valid ROS parameter file |
|
|
|
:param parameter_file: Path to the YAML file to load parameters from. |
|
:param use_wildcard: Use wildcard matching for the target nodes. |
|
:param target_nodes: List of nodes in the YAML file to load parameters from. |
|
:param namespace: Namespace to prepend to all parameters. |
|
:return: A dict of Parameter messages keyed by the parameter names |
|
""" |
|
with open(parameter_file, 'r') as f: |
|
param_file = yaml.safe_load(f) |
|
param_dict = {} |
|
|
|
# check and add if wildcard is available in 1st keys |
|
# wildcard key must go to the front of param_keys so that |
|
# node-namespaced parameters will override the wildcard parameters |
|
if use_wildcard: |
|
if '/**' in param_file: |
|
value = param_file['/**'] |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
'YAML file is not a valid ROS parameter file for wildcard(/**)') |
|
param_dict.update(value['ros__parameters']) |
|
|
|
# If target_nodes is None or empty, parse '/*' and '/**/node_name' wildcards |
|
if not target_nodes: |
|
# /* |
|
# node_name: |
|
# ros__parameters: |
|
# ... |
|
if '/*' in param_file: |
|
value = param_file['/*'] |
|
if not isinstance(value, dict): |
|
raise RuntimeError( |
|
'YAML file is not a valid ROS parameter file for wildcard(/*)') |
|
# add all node_name under wildcard /* |
|
for node_name, node_value in value.items(): |
|
if not isinstance(node_value, dict) or 'ros__parameters' not in node_value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for wildcard(/*) ' |
|
f'node {node_name}') |
|
param_dict.update(node_value['ros__parameters']) |
|
|
|
# /**/node_name |
|
# ros__parameters: |
|
# ... |
|
for k, v in param_file.items(): |
|
if k.startswith('/**/'): |
|
value = v |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for wildcard {k}') |
|
param_dict.update(value['ros__parameters']) |
|
|
|
# parse parameter yaml file based on target node namespace and name |
|
if target_nodes: |
|
for n in target_nodes: |
|
abs_name = _get_absolute_node_name(n) |
|
if abs_name is None: |
|
continue |
|
|
|
# target node is '/node_name' without namespace |
|
if '/' not in abs_name[1:]: |
|
# Match definition in param file |
|
# /node_name: |
|
# ros__parameters: |
|
# ... |
|
if abs_name in param_file: |
|
value = param_file[abs_name] |
|
if (not isinstance(value, dict) or 'ros__parameters' not in value): |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
# Match definition in param file |
|
# node_name: |
|
# ros__parameters: |
|
# ... |
|
if abs_name[1:] in param_file: |
|
value = param_file[abs_name[1:]] |
|
if (not isinstance(value, dict) or 'ros__parameters' not in value): |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
continue |
|
|
|
# target node include namespaces, e.g. /namespace/node_name |
|
# Matched definition in param file |
|
# /namespace |
|
# node_name: |
|
# ros__parameters: |
|
# ... |
|
# or |
|
# /namespace/node_name: |
|
# ros__parameters: |
|
# ... |
|
ns, node_basename = abs_name.rsplit('/', 1) |
|
|
|
# If ns is single level namespace and wildcard is true |
|
# Match definition in param file |
|
# /* |
|
# node_name: |
|
# ros__parameters: |
|
# ... |
|
if use_wildcard and '/' not in ns[1:] and '/*' in param_file: |
|
if not isinstance(param_file['/*'], dict): |
|
raise RuntimeError( |
|
'YAML file is not a valid ROS parameter file for namespace "/*"') |
|
|
|
if node_basename in param_file['/*']: |
|
value = param_file['/*'][node_basename] |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
|
|
# If 'ns' in target node is single level or multi level namespace and |
|
# wildcard is true, |
|
# Match definition in param file |
|
# /**/node_name: |
|
# ros__parameters: |
|
# ... |
|
if use_wildcard and '/**/' + node_basename in param_file: |
|
value = param_file['/**/' + node_basename] |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
|
|
# If 'ns' in target node is single level or multi level namespace, |
|
# Match definition in param file. |
|
# /namespace |
|
# node_name: |
|
# ros__parameters: |
|
# ... |
|
if ns in param_file and isinstance(param_file[ns], dict): |
|
if node_basename in param_file[ns]: |
|
value = param_file[ns][node_basename] |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
|
|
# If 'ns' in target node is single level or multi level namespace, |
|
# Match definition in param file. |
|
# /namespace/node_name: |
|
# ros__parameters: |
|
# ... |
|
if abs_name in param_file: |
|
value = param_file[abs_name] |
|
if not isinstance(value, dict) or 'ros__parameters' not in value: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {n}') |
|
param_dict.update(value['ros__parameters']) |
|
else: |
|
# Except the wildcard key, add all other keys |
|
for k, v in param_file.items(): |
|
if k == '/**' or k == '/*' or k.startswith('/**/'): |
|
continue |
|
if isinstance(v, dict): |
|
# k is node_name or /node_name or /namespace/node_name |
|
if 'ros__parameters' in v: |
|
param_dict.update(v['ros__parameters']) |
|
continue |
|
# k is namespace |
|
for node_name, node_value in v.items(): |
|
if isinstance(node_value, dict) and 'ros__parameters' in node_value: |
|
param_dict.update(node_value['ros__parameters']) |
|
else: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node' |
|
f'{k}/{node_name}') |
|
else: |
|
raise RuntimeError( |
|
f'YAML file is not a valid ROS parameter file for node {k}') |
|
|
|
if not param_dict: |
|
raise RuntimeError('Param file does not contain any valid parameters') |
|
|
|
return _unpack_parameter_dict(namespace, param_dict) |
Generated by Generative AI
No response
Operating System:
Linux tomoyafujita-B760M-Pro-RS-D4 6.17.0-14-generic #14~24.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Jan 15 15:52:10 UTC 2 x86_64 x86_64 x86_64 GNU/LinuxROS version or commit hash:
rolling
RMW implementation (if applicable):
Any
RMW Configuration (if applicable):
No response
Client library (if applicable):
rclpy
'ros2 doctor --report' output
ros2 doctor --report
<COPY OUTPUT HERE>Steps to reproduce issue
see
rclpy/rclpy/rclpy/parameter.py
Lines 363 to 552 in e056ea6
it implements the yaml parsing process in rclpy, it does not call on
rcl_parse_yaml_file. (this is why rclpy has a dependency to python3-yaml)i believe this breaks the basic design between
rclandrclpyresponsibility.rcl_yaml_param_parserpackage is meant to provide the consistent behavior for all client libraries such as rclcpp and rclpy. actuallyrclcppdoes usercl_parse_yaml_filewhen it loads the yaml file to the node parameters.Expected behavior
i believe we should do,
parameter_map_from_yaml_file.rcl_yaml_param_parserpackage to callrcl_parse_yaml_filewhen it loads the yaml file to the node parameters.this provides the same behavior with
rclcpp.Actual behavior
the behavior of load_parameter(s) between rclpy and rclcpp are different.
Additional information
No response