Skip to content

Commit 3e2bb2f

Browse files
author
Victor Lopez
committed
Add test_verb_load
1 parent f2ffb6a commit 3e2bb2f

1 file changed

Lines changed: 284 additions & 0 deletions

File tree

ros2param/test/test_verb_load.py

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
# Copyright 2021 PAL Robotics S.L.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import contextlib
16+
import os
17+
import sys
18+
import tempfile
19+
import time
20+
import unittest
21+
import xmlrpc
22+
23+
from launch import LaunchDescription
24+
from launch.actions import ExecuteProcess
25+
from launch_ros.actions import Node
26+
import launch_testing
27+
import launch_testing.actions
28+
import launch_testing.asserts
29+
import launch_testing.markers
30+
import launch_testing.tools
31+
import launch_testing_ros.tools
32+
33+
import pytest
34+
35+
import rclpy
36+
from rclpy.utilities import get_available_rmw_implementations
37+
38+
from ros2cli.node.strategy import NodeStrategy
39+
40+
TEST_NODE = 'test_node'
41+
TEST_NAMESPACE = '/foo'
42+
43+
TEST_TIMEOUT = 20.0
44+
45+
INPUT_PARAMETER_FILE = (
46+
f'{TEST_NODE}:\n'
47+
' ros__parameters:\n'
48+
' bool_array_param:\n'
49+
' - true\n'
50+
' - false\n'
51+
' - false\n'
52+
' bool_param: false\n'
53+
' double_array_param:\n'
54+
' - 2.125\n'
55+
' - 1.25\n'
56+
' - 2.5\n'
57+
' double_param: 1.3\n'
58+
' foo:\n'
59+
' bar:\n'
60+
' str_param: foo_bar\n'
61+
' str_param: foobar\n'
62+
' int_array_param:\n'
63+
' - 42\n'
64+
' - 3\n'
65+
' - 3\n'
66+
' int_param: -42\n'
67+
' str_array_param:\n'
68+
' - a_foo\n'
69+
' - a_bar\n'
70+
' - baz\n'
71+
' str_param: Bye World\n'
72+
' use_sim_time: false\n'
73+
)
74+
75+
# Skip cli tests on Windows while they exhibit pathological behavior
76+
# https://github.com/ros2/build_farmer/issues/248
77+
if sys.platform.startswith('win'):
78+
pytest.skip(
79+
'CLI tests can block for a pathological amount of time on Windows.',
80+
allow_module_level=True)
81+
82+
83+
@pytest.mark.rostest
84+
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
85+
def generate_test_description(rmw_implementation):
86+
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
87+
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
88+
89+
# Parameter node test fixture
90+
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
91+
parameter_node = Node(
92+
executable=sys.executable,
93+
name=TEST_NODE,
94+
namespace=TEST_NAMESPACE,
95+
arguments=[path_to_parameter_node_script],
96+
additional_env=additional_env
97+
)
98+
99+
return LaunchDescription([
100+
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
101+
ExecuteProcess(
102+
cmd=['ros2', 'daemon', 'stop'],
103+
name='daemon-stop',
104+
on_exit=[
105+
ExecuteProcess(
106+
cmd=['ros2', 'daemon', 'start'],
107+
name='daemon-start',
108+
on_exit=[
109+
parameter_node,
110+
launch_testing.actions.ReadyToTest(),
111+
],
112+
additional_env=additional_env
113+
)
114+
]
115+
),
116+
])
117+
118+
119+
class TestVerbDump(unittest.TestCase):
120+
121+
@classmethod
122+
def setUpClass(
123+
cls,
124+
launch_service,
125+
proc_info,
126+
proc_output,
127+
rmw_implementation
128+
):
129+
rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
130+
filtered_rmw_implementation=rmw_implementation
131+
)
132+
133+
@contextlib.contextmanager
134+
def launch_param_load_command(self, arguments):
135+
param_load_command_action = ExecuteProcess(
136+
cmd=['ros2', 'param', 'load', *arguments],
137+
additional_env={
138+
'RMW_IMPLEMENTATION': rmw_implementation,
139+
},
140+
name='ros2param-load-cli',
141+
output='screen'
142+
)
143+
with launch_testing.tools.launch_process(
144+
launch_service, param_load_command_action, proc_info, proc_output,
145+
output_filter=rmw_implementation_filter
146+
) as param_load_command:
147+
yield param_load_command
148+
cls.launch_param_load_command = launch_param_load_command
149+
150+
@contextlib.contextmanager
151+
def launch_param_dump_command(self, arguments):
152+
param_dump_command_action = ExecuteProcess(
153+
cmd=['ros2', 'param', 'dump', *arguments],
154+
additional_env={
155+
'RMW_IMPLEMENTATION': rmw_implementation,
156+
},
157+
name='ros2param-dump-cli',
158+
output='screen'
159+
)
160+
with launch_testing.tools.launch_process(
161+
launch_service, param_dump_command_action, proc_info, proc_output,
162+
output_filter=rmw_implementation_filter
163+
) as param_dump_command:
164+
yield param_dump_command
165+
cls.launch_param_dump_command = launch_param_dump_command
166+
167+
def setUp(self):
168+
# Ensure the daemon node is running and discovers the test node
169+
start_time = time.time()
170+
timed_out = True
171+
with NodeStrategy(None) as node:
172+
while (time.time() - start_time) < TEST_TIMEOUT:
173+
# TODO(jacobperron): Create a generic 'CliNodeError' so we can treat errors
174+
# from DirectNode and DaemonNode the same
175+
try:
176+
services = node.get_service_names_and_types_by_node(TEST_NODE, TEST_NAMESPACE)
177+
except rclpy.node.NodeNameNonExistentError:
178+
continue
179+
except xmlrpc.client.Fault as e:
180+
if 'NodeNameNonExistentError' in e.faultString:
181+
continue
182+
raise
183+
184+
service_names = [name_type_tuple[0] for name_type_tuple in services]
185+
if (
186+
len(service_names) > 0
187+
and f'{TEST_NAMESPACE}/{TEST_NODE}/get_parameters' in service_names
188+
):
189+
timed_out = False
190+
break
191+
if timed_out:
192+
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')
193+
194+
def _write_param_file(self, tmpdir, filename):
195+
yaml_path = os.path.join(tmpdir, filename)
196+
with open(yaml_path, 'w') as f:
197+
f.write(INPUT_PARAMETER_FILE)
198+
return yaml_path
199+
200+
def _output_file(self):
201+
return f'{TEST_NAMESPACE}/{TEST_NODE}'.lstrip('/').replace('/', '__') + '.yaml'
202+
203+
def test_verb_load_missing_args(self):
204+
with self.launch_param_load_command(arguments=[]) as param_load_command:
205+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
206+
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
207+
assert launch_testing.tools.expect_output(
208+
expected_lines=['ros2 param load: error: the following arguments are required: '
209+
'node_name, parameter_file'],
210+
text=param_load_command.output,
211+
strict=False
212+
)
213+
with self.launch_param_load_command(arguments=['some_node']) as param_load_command:
214+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
215+
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
216+
assert launch_testing.tools.expect_output(
217+
expected_lines=['ros2 param load: error: the following arguments are required: '
218+
'parameter_file'],
219+
text=param_load_command.output,
220+
strict=False
221+
)
222+
223+
# TODO
224+
def test_verb_load_invalid_node(self):
225+
with tempfile.TemporaryDirectory() as tmpdir:
226+
filepath = self._write_param_file(tmpdir, 'params.yaml')
227+
with self.launch_param_load_command(
228+
arguments=['invalid_node', filepath]
229+
) as param_load_command:
230+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
231+
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
232+
assert launch_testing.tools.expect_output(
233+
expected_lines=['Node not found'],
234+
text=param_load_command.output,
235+
strict=True
236+
)
237+
with self.launch_param_load_command(
238+
arguments=[f'invalid_ns/{TEST_NODE}', filepath]
239+
) as param_load_command:
240+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
241+
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
242+
assert launch_testing.tools.expect_output(
243+
expected_lines=['Node not found'],
244+
text=param_load_command.output,
245+
strict=True
246+
)
247+
248+
# TODO
249+
def test_verb_load_invalid_path(self):
250+
with self.launch_param_load_command(
251+
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', 'invalid_path']
252+
) as param_load_command:
253+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
254+
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
255+
assert launch_testing.tools.expect_output(
256+
expected_lines=['No such file or directory'],
257+
text=param_load_command.output,
258+
strict=False
259+
)
260+
261+
def test_verb_load(self):
262+
with tempfile.TemporaryDirectory() as tmpdir:
263+
filepath = self._write_param_file(tmpdir, 'params.yaml')
264+
with self.launch_param_load_command(
265+
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', filepath]
266+
) as param_load_command:
267+
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
268+
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
269+
assert launch_testing.tools.expect_output(
270+
expected_lines=[''],
271+
text=param_load_command.output,
272+
strict=True
273+
)
274+
# Dump with ros2 param dump and compare that output matches input file
275+
with self.launch_param_dump_command(
276+
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
277+
) as param_dump_command:
278+
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
279+
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
280+
assert launch_testing.tools.expect_output(
281+
expected_text=INPUT_PARAMETER_FILE + '\n',
282+
text=param_dump_command.output,
283+
strict=True
284+
)

0 commit comments

Comments
 (0)