From 309f9c558e03c2072280be906e21e52ec85a8f43 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 22 Oct 2019 17:06:24 -0300 Subject: [PATCH] End-to-end test coverage for CLI commands output (#304) * Add end-to-end CLI output tests for ros2: - ros2action - ros2service - ros2topic - ros2msg - ros2srv - ros2interface - ros2node - ros2pkg Signed-off-by: Michel Hidalgo --- ros2action/package.xml | 1 + .../test/fixtures/fibonacci_action_server.py | 71 ++ ros2action/test/test_cli.py | 313 +++++++++ ros2interface/package.xml | 2 + ros2interface/test/test_cli.py | 392 +++++++++-- ros2msg/package.xml | 1 + ros2msg/test/test_cli.py | 188 +++-- ros2node/package.xml | 3 + ros2node/test/fixtures/complex_node.py | 76 ++ ros2node/test/test_cli.py | 181 +++++ ros2pkg/package.xml | 1 + ros2pkg/test/test_cli.py | 193 +++++- ros2service/package.xml | 2 + ros2service/test/fixtures/echo_server.py | 54 ++ ros2service/test/test_cli.py | 313 +++++++++ ros2srv/package.xml | 1 + ros2srv/test/test_cli.py | 214 ++++-- ros2topic/package.xml | 3 + ros2topic/test/fixtures/controller_node.py | 58 ++ ros2topic/test/fixtures/listener_node.py | 54 ++ ros2topic/test/fixtures/repeater_node.py | 73 ++ ros2topic/test/fixtures/talker_node.py | 54 ++ ros2topic/test/test_cli.py | 652 ++++++++++++++++++ test_ros2cli/CMakeLists.txt | 85 --- test_ros2cli/package.xml | 25 - test_ros2cli/test/config_ros2action_test.py | 132 ---- test_ros2cli/test/config_ros2msg_test.py | 54 -- test_ros2cli/test/test_config.py | 62 -- .../test_process_output_customizable.py.in | 79 --- 29 files changed, 2754 insertions(+), 583 deletions(-) create mode 100644 ros2action/test/fixtures/fibonacci_action_server.py create mode 100644 ros2action/test/test_cli.py create mode 100644 ros2node/test/fixtures/complex_node.py create mode 100644 ros2node/test/test_cli.py create mode 100644 ros2service/test/fixtures/echo_server.py create mode 100644 ros2service/test/test_cli.py create mode 100644 ros2topic/test/fixtures/controller_node.py create mode 100644 ros2topic/test/fixtures/listener_node.py create mode 100644 ros2topic/test/fixtures/repeater_node.py create mode 100644 ros2topic/test/fixtures/talker_node.py create mode 100644 ros2topic/test/test_cli.py delete mode 100644 test_ros2cli/CMakeLists.txt delete mode 100644 test_ros2cli/package.xml delete mode 100644 test_ros2cli/test/config_ros2action_test.py delete mode 100644 test_ros2cli/test/config_ros2msg_test.py delete mode 100644 test_ros2cli/test/test_config.py delete mode 100644 test_ros2cli/test/test_process_output_customizable.py.in diff --git a/ros2action/package.xml b/ros2action/package.xml index c331d4a7b..b8662d57d 100644 --- a/ros2action/package.xml +++ b/ros2action/package.xml @@ -23,6 +23,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing test_msgs diff --git a/ros2action/test/fixtures/fibonacci_action_server.py b/ros2action/test/fixtures/fibonacci_action_server.py new file mode 100644 index 000000000..19f3948f2 --- /dev/null +++ b/ros2action/test/fixtures/fibonacci_action_server.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import time + +import sys + +import rclpy +from rclpy.action import ActionServer +from rclpy.node import Node + +from test_msgs.action import Fibonacci + + +class FibonacciActionServer(Node): + + def __init__(self): + super().__init__('fibonacci_action_server') + self._action_server = ActionServer( + self, + Fibonacci, + 'fibonacci', + self.execute_callback) + + def destroy_node(self): + self._action_server.destroy() + super().destroy_node() + + def execute_callback(self, goal_handle): + feedback = Fibonacci.Feedback() + feedback.sequence = [0, 1] + + for i in range(1, goal_handle.request.order): + feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1]) + goal_handle.publish_feedback(feedback) + + goal_handle.succeed() + + result = Fibonacci.Result() + result.sequence = feedback.sequence + return result + + +def main(args=None): + rclpy.init(args=args) + + node = FibonacciActionServer() + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('server stopped cleanly') + except BaseException: + print('exception in server:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2action/test/test_cli.py b/ros2action/test/test_cli.py new file mode 100644 index 000000000..90a4cd390 --- /dev/null +++ b/ros2action/test/test_cli.py @@ -0,0 +1,313 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + +import yaml + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_action_server_executable = os.path.join( + os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py' + ) + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + ExecuteProcess( + cmd=[sys.executable, path_to_action_server_executable], + additional_env={'RMW_IMPLEMENTATION': rmw_implementation} + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env={'RMW_IMPLEMENTATION': rmw_implementation} + ) + ] + ), + ]) + + +def get_fibonacci_send_goal_output(*, order=1, with_feedback=False): + assert order > 0 + output = [ + 'Waiting for an action server to become available...', + 'Sending goal:', + ' order: {}'.format(order), + '', + re.compile('Goal accepted with ID: [a-f0-9]+'), + '', + ] + sequence = [0, 1] + for _ in range(order - 1): + sequence.append(sequence[-1] + sequence[-2]) + if with_feedback: + output.append('Feedback:') + output.extend((' ' + yaml.dump({ + 'sequence': sequence + })).splitlines()) + output.append('') + output.append('Result:'), + output.extend((' ' + yaml.dump({ + 'sequence': sequence + })).splitlines()) + output.append('') + output.append('Goal finished with status: SUCCEEDED') + return output + + +class TestROS2ActionCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_action_command(self, arguments): + action_command_action = ExecuteProcess( + cmd=['ros2', 'action', *arguments], + name='ros2action-cli', output='screen', + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + } + ) + with launch_testing.tools.launch_process( + launch_service, action_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=rmw_implementation + ) + ) as action_command: + yield action_command + cls.launch_action_command = launch_action_command + + def test_info_on_nonexistent_action(self): + with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /not_an_action', + 'Action clients: 0', + 'Action servers: 0', + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info(self): + with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ' /fibonacci_action_server' + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info_with_types(self): + with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ' /fibonacci_action_server [test_msgs/action/Fibonacci]' + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info_count(self): + with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list(self): + with self.launch_action_command(arguments=['list']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/fibonacci'], + text=action_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_action_command(arguments=['list', '-t']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'], + text=action_command.output, strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_action_command(arguments=['list', '-c']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + command_output_lines = action_command.output.splitlines() + assert len(command_output_lines) == 1 + assert int(command_output_lines[0]) == 1 + + @launch_testing.markers.retry_on_failure(times=5) + def test_send_fibonacci_goal(self): + with self.launch_action_command( + arguments=[ + 'send_goal', + '/fibonacci', + 'test_msgs/action/Fibonacci', + '{order: 5}' + ], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_fibonacci_send_goal_output(order=5), + text=action_command.output, strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_send_fibonacci_goal_with_feedback(self): + with self.launch_action_command( + arguments=[ + 'send_goal', + '-f', + '/fibonacci', + 'test_msgs/action/Fibonacci', + '{order: 5}' + ], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_fibonacci_send_goal_output( + order=5, with_feedback=True + ), + text=action_command.output, strict=True + ) + + def test_show_fibonacci(self): + with self.launch_action_command( + arguments=['show', 'test_msgs/action/Fibonacci'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'int32 order', + '---', + 'int32[] sequence', + '---', + 'int32[] sequence' + ], + text=action_command.output, + strict=False + ) + + def test_show_not_a_package(self): + with self.launch_action_command( + arguments=['show', 'not_a_package/action/Fibonacci'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=action_command.output, strict=True + ) + + # TODO(hidmic): make 'ros2 action show' fail accordingly + # def test_show_not_an_action_ns(self): + # with self.launch_action_command( + # arguments=['show', 'test_msgs/foo/Fibonacci'], + # ) as action_command: + # assert action_command.wait_for_shutdown(timeout=2) + # assert action_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown action type'], + # text=action_command.output, strict=True + # ) + + def test_show_not_an_action_typename(self): + with self.launch_action_command( + arguments=['show', 'test_msgs/action/NotAnActionTypeName'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown action type'], + text=action_command.output, strict=True + ) + + def test_show_not_an_action_type(self): + with self.launch_action_command( + arguments=['show', 'not_an_action_type'] + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed action type is invalid'], + text=action_command.output, strict=True + ) diff --git a/ros2interface/package.xml b/ros2interface/package.xml index 3af3b8989..fa500ed6a 100644 --- a/ros2interface/package.xml +++ b/ros2interface/package.xml @@ -19,8 +19,10 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs + test_msgs ament_python diff --git a/ros2interface/test/test_cli.py b/ros2interface/test/test_cli.py index d161cc11f..3d962f1dc 100644 --- a/ros2interface/test/test_cli.py +++ b/ros2interface/test/test_cli.py @@ -12,42 +12,356 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'interface', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_msgs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'interface', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - message_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in message_types) - count += len(message_types) - - if package_name != 'std_msgs': - continue - for message_name in [t[len(package_name) + 1:] for t in message_types]: - show_cmd = [ - 'ros2', 'interface', 'show', package_name + '/' + message_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if message_name == 'String': - assert show_result.stdout.rstrip() == b'string data' - - package_cmd = ['ros2', 'interface', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'interface', 'show', 'std_msgs/_not_existing_message_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import itertools +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +some_messages_from_std_msgs = [ + 'std_msgs/msg/Bool', + 'std_msgs/msg/Float32', + 'std_msgs/msg/Float64', +] + +some_services_from_std_srvs = [ + 'std_srvs/srv/Empty', + 'std_srvs/srv/SetBool', + 'std_srvs/srv/Trigger', +] + +some_actions_from_test_msgs = [ + 'test_msgs/action/Fibonacci' +] + +some_interfaces = ( + some_messages_from_std_msgs + + some_services_from_std_srvs + + some_actions_from_test_msgs +) + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +class TestROS2InterfaceCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_interface_command(self, arguments): + interface_command_action = ExecuteProcess( + cmd=['ros2', 'interface', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2interface-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, interface_command_action, proc_info, proc_output + ) as interface_command: + yield interface_command + cls.launch_interface_command = launch_interface_command + + def test_list_interfaces(self): + with self.launch_interface_command(arguments=['list']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + filter_ = launch_testing.tools.basic_output_filter( + filtered_prefixes=['Messages:', 'Services:', 'Actions:'] + ) + output_lines = filter_(interface_command.output).splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + some_interfaces_without_ns = [] + for ifc in some_interfaces: + parts = ifc.split('/') + some_interfaces_without_ns.append( + '/'.join([parts[0], *parts[2:]]) + ) + assert launch_testing.tools.expect_output( + expected_lines=some_interfaces_without_ns, + lines=output_lines, + strict=False + ) + + def test_list_messages(self): + with self.launch_interface_command(arguments=['list', '-m']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Messages:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + some_messages_from_std_msgs_without_ns = [] + for msg in some_messages_from_std_msgs: + parts = msg.split('/') + some_messages_from_std_msgs_without_ns.append( + '/'.join([parts[0], *parts[2:]]) + ) + assert launch_testing.tools.expect_output( + expected_lines=some_messages_from_std_msgs_without_ns, + lines=output_lines, + strict=False + ) + + def test_list_services(self): + with self.launch_interface_command(arguments=['list', '-s']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Services:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + some_services_from_std_srvs_without_ns = [] + for srv in some_services_from_std_srvs: + parts = srv.split('/') + some_services_from_std_srvs_without_ns.append( + '/'.join([parts[0], *parts[2:]]) + ) + assert launch_testing.tools.expect_output( + expected_lines=some_services_from_std_srvs_without_ns, + lines=output_lines, + strict=False + ) + + def test_list_actions(self): + with self.launch_interface_command(arguments=['list', '-a']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Actions:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + some_actions_from_test_msgs_without_ns = [] + for action in some_actions_from_test_msgs: + parts = action.split('/') + some_actions_from_test_msgs_without_ns.append( + '/'.join([parts[0], *parts[2:]]) + ) + assert launch_testing.tools.expect_output( + expected_lines=some_actions_from_test_msgs_without_ns, + lines=output_lines, + strict=False + ) + + def test_package_on_nonexistent_package(self): + with self.launch_interface_command( + arguments=['package', 'not_a_package'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=["Unknown package 'not_a_package'"], + text=interface_command.output, + strict=True + ) + + def test_package_on_std_msgs(self): + with self.launch_interface_command( + arguments=['package', 'std_msgs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'std_msgs/msg/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + + def test_package_on_std_srvs(self): + with self.launch_interface_command( + arguments=['package', 'std_srvs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'std_srvs/srv/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(srv in output_lines for srv in some_services_from_std_srvs) + + def test_package_on_test_msgs(self): + with self.launch_interface_command( + arguments=['package', 'test_msgs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'test_msgs/(msg|srv|action)/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(action in output_lines for action in some_actions_from_test_msgs) + + def test_packages(self): + with self.launch_interface_command(arguments=['packages']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' in output_lines + assert 'std_srvs' in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_messages(self): + with self.launch_interface_command( + arguments=['packages', '-m'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' in output_lines + assert 'std_srvs' not in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_services(self): + with self.launch_interface_command( + arguments=['packages', '-s'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' not in output_lines + assert 'std_srvs' in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_actions(self): + with self.launch_interface_command( + arguments=['packages', '-a'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' not in output_lines + assert 'std_srvs' not in output_lines + assert 'test_msgs' in output_lines + + def test_show_message(self): + with self.launch_interface_command( + arguments=['show', 'std_msgs/msg/String'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'string data' + ], + text=interface_command.output, + strict=True + ) + + def test_show_service(self): + with self.launch_interface_command( + arguments=['show', 'std_srvs/srv/SetBool'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'bool data # e.g. for hardware enabling / disabling', + '---', + 'bool success # indicate successful run of triggered service', + 'string message # informational, e.g. for error messages' + ], + text=interface_command.output, + strict=True + ) + + def test_show_action(self): + with self.launch_interface_command( + arguments=['show', 'test_msgs/action/Fibonacci'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '#goal definition', + 'int32 order', + '---', + '#result definition', + 'int32[] sequence', + '---', + '#feedback', + 'int32[] sequence', + ], + text=interface_command.output, + strict=True + ) + + def test_show_not_a_package(self): + with self.launch_interface_command( + arguments=['show', 'not_a_package/msg/String'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=["Unknown package 'not_a_package'"], + text=interface_command.output, + strict=True + ) + + def test_show_not_an_interface(self): + with self.launch_interface_command( + arguments=['show', 'std_msgs/msg/NotAMessageTypeName'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=[re.compile( + r"Could not find the interface '.+NotAMessageTypeName\.idl'" + )], + text=interface_command.output, + strict=True + ) diff --git a/ros2msg/package.xml b/ros2msg/package.xml index 05474434c..d0db63ed2 100644 --- a/ros2msg/package.xml +++ b/ros2msg/package.xml @@ -18,6 +18,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs diff --git a/ros2msg/test/test_cli.py b/ros2msg/test/test_cli.py index 8bd92f7d2..d06a85e43 100644 --- a/ros2msg/test/test_cli.py +++ b/ros2msg/test/test_cli.py @@ -1,4 +1,4 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. +# Copyright 2019 Open Source Robotics Foundation, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,47 +12,145 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'msg', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_msgs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'msg', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - message_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in message_types) - count += len(message_types) - - if package_name != 'std_msgs': - continue - for message_name in [t[len(package_name) + 1:] for t in message_types]: - show_cmd = [ - 'ros2', 'msg', 'show', package_name + '/' + message_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if message_name == 'String': - assert show_result.stdout.rstrip() == b'string data' - - list_cmd = ['ros2', 'msg', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - message_types = list_result.stdout.decode().splitlines() - assert len(message_types) == count - - package_cmd = ['ros2', 'msg', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'msg', 'show', 'std_msgs/_not_existing_message_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +some_messages_from_std_msgs = [ + 'std_msgs/msg/Bool', + 'std_msgs/msg/Float32', + 'std_msgs/msg/Float64', +] + + +class TestROS2MsgCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_msg_command(self, arguments): + msg_command_action = ExecuteProcess( + cmd=['ros2', 'msg', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2msg-cli', output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, msg_command_action, proc_info, proc_output, + output_filter=launch_testing.tools.basic_output_filter( + filtered_patterns=[ + r".*'ros2 msg' is deprecated and will be removed in a future ROS release.*" + ] + ) + ) as msg_command: + yield msg_command + cls.launch_msg_command = launch_msg_command + + def test_list_messages(self): + with self.launch_msg_command(arguments=['list']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = msg_command.output.splitlines() + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + assert all(re.match(r'.*/msg/.*', line) is not None for line in output_lines) + + def test_package_messages(self): + with self.launch_msg_command(arguments=['package', 'std_msgs']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = msg_command.output.splitlines() + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + assert all(re.match(r'std_msgs/msg/.*', line) is not None for line in output_lines) + + def test_not_a_package_messages(self): + with self.launch_msg_command(arguments=['package', 'not_a_package']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=msg_command.output, strict=True + ) + + def test_list_packages_with_messages(self): + with self.launch_msg_command(arguments=['packages']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + assert 'std_msgs' in msg_command.output.splitlines() + + def test_show_message(self): + with self.launch_msg_command( + arguments=['show', 'std_msgs/msg/String'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['string data'], + text=msg_command.output, strict=False + ) + + def test_show_not_a_message_typename(self): + with self.launch_msg_command( + arguments=['show', 'std_msgs/msg/NotAMessageTypeName'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown message name'], + text=msg_command.output, strict=True + ) + + # TODO(hidmic): make 'ros2 msg show' fail accordingly + # def test_show_not_a_message_ns(self): + # with self.launch_msg_command( + # arguments=['show', 'std_msgs/foo/String'] + # ) as msg_command: + # assert msg_command.wait_for_shutdown(timeout=10) + # assert msg_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown message name'], + # text=msg_command.output, strict=True + # ) + + def test_show_not_a_package(self): + with self.launch_msg_command( + arguments=['show', 'not_a_package/msg/String'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=msg_command.output, strict=True + ) + + def test_show_not_a_message_type(self): + with self.launch_msg_command( + arguments=['show', 'not_a_message_type'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed message type is invalid'], + text=msg_command.output, strict=True + ) diff --git a/ros2node/package.xml b/ros2node/package.xml index ef8a056e0..b6748f6e2 100644 --- a/ros2node/package.xml +++ b/ros2node/package.xml @@ -16,6 +16,9 @@ ament_pep257 ament_xmllint python3-pytest + rclpy + ros_testing + test_msgs ament_python diff --git a/ros2node/test/fixtures/complex_node.py b/ros2node/test/fixtures/complex_node.py new file mode 100644 index 000000000..d715e218b --- /dev/null +++ b/ros2node/test/fixtures/complex_node.py @@ -0,0 +1,76 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.action import ActionServer +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + +from test_msgs.action import Fibonacci +from test_msgs.msg import Arrays +from test_msgs.msg import Strings +from test_msgs.srv import BasicTypes + + +class ComplexNode(Node): + + def __init__(self): + super().__init__('complex_node') + self.publisher = self.create_publisher(Arrays, 'arrays', qos_profile_system_default) + self.subscription = self.create_subscription( + Strings, 'strings', lambda msg: None, qos_profile_system_default + ) + self.server = self.create_service(BasicTypes, 'basic', lambda req, res: res) + self.action_server = ActionServer( + self, Fibonacci, 'fibonacci', self.action_callback + ) + self.timer = self.create_timer(1.0, self.pub_callback) + + def destroy_node(self): + self.timer.destroy() + self.publisher.destroy() + self.subscription.destroy() + self.server.destroy() + self.action_server.destroy() + super().destroy_node() + + def pub_callback(self): + self.publisher.publish(Arrays()) + + def action_callback(self, goal_handle): + goal_handle.succeed() + return Fibonacci.Result() + + +def main(args=None): + rclpy.init(args=args) + + node = ComplexNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('node stopped cleanly') + except BaseException: + print('exception in node:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2node/test/test_cli.py b/ros2node/test/test_cli.py new file mode 100644 index 000000000..0fdb586d9 --- /dev/null +++ b/ros2node/test/test_cli.py @@ -0,0 +1,181 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import itertools +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_complex_node_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'complex_node.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_complex_node_script], + node_name='complex_node', + additional_env=additional_env + ), + Node( + node_executable=sys.executable, + arguments=[path_to_complex_node_script], + node_name='_hidden_complex_node', + additional_env=additional_env + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2NodeCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_node_command(self, arguments): + node_command_action = ExecuteProcess( + cmd=['ros2', 'node', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2node-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, node_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + # ignore launch_ros and ros2cli daemon nodes + filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'], + filtered_rmw_implementation=rmw_implementation + ) + ) as node_command: + yield node_command + cls.launch_node_command = launch_node_command + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_nodes(self): + with self.launch_node_command(arguments=['list']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/complex_node'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_nodes(self): + with self.launch_node_command(arguments=['list', '-a']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/_hidden_complex_node', + '/complex_node' + ], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_node_count(self): + with self.launch_node_command(arguments=['list', '-c']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = node_command.output.splitlines() + assert len(output_lines) == 1 + # Fixture nodes that are not hidden plus launch_ros node. + assert int(output_lines[0]) == 2 + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_nodes_count(self): + with self.launch_node_command(arguments=['list', '-c', '-a']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = node_command.output.splitlines() + assert len(output_lines) == 1 + # All fixture nodes plus launch_ros and ros2cli daemon nodes. + assert int(output_lines[0]) == 4 + + @launch_testing.markers.retry_on_failure(times=5) + def test_info_node(self): + with self.launch_node_command(arguments=['info', '/complex_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + # TODO(hidmic): only optionally show hidden topics and services + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain([ + '/complex_node', + ' Subscribers:', + ' /strings: test_msgs/msg/Strings', + ' Publishers:', + ' /arrays: test_msgs/msg/Arrays', + ' /parameter_events: rcl_interfaces/msg/ParameterEvent', + ' /rosout: rcl_interfaces/msg/Log', + ' Services:', + ' /basic: test_msgs/srv/BasicTypes', + ], itertools.repeat(re.compile( + r'\s*/complex_node/.*parameter.*: rcl_interfaces/srv/.*Parameter.*' + ), 6), [ + ' Action Servers:', + ' /fibonacci: test_msgs/action/Fibonacci', + ' Action Clients:', + '' + ]), + text=node_command.output, strict=False + ), 'Output does not match:\n' + node_command.output diff --git a/ros2pkg/package.xml b/ros2pkg/package.xml index 8071f4c22..cb5899f04 100644 --- a/ros2pkg/package.xml +++ b/ros2pkg/package.xml @@ -21,6 +21,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing ament_python diff --git a/ros2pkg/test/test_cli.py b/ros2pkg/test/test_cli.py index 0a71854fa..c224772f8 100644 --- a/ros2pkg/test/test_cli.py +++ b/ros2pkg/test/test_cli.py @@ -12,26 +12,185 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib import os -import subprocess +import tempfile +import unittest +import xml.etree.ElementTree as ET +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction -def test_cli(): - list_cmd = ['ros2', 'pkg', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - package_names = list_result.stdout.decode().splitlines() +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools - # explicit dependencies of this package will for sure be available - assert 'ros2cli' in package_names +import pytest - prefix_cmd = ['ros2', 'pkg', 'prefix', 'ros2cli'] - prefix_result = subprocess.run( - prefix_cmd, stdout=subprocess.PIPE, check=True) - prefix_path = prefix_result.stdout.decode().splitlines() - assert len(prefix_path) == 1 - assert os.path.isdir(prefix_path[0]) - prefix_cmd = ['ros2', 'pkg', 'prefix', '_not_existing_package_name'] - prefix_result = subprocess.run( - prefix_cmd, stdout=subprocess.PIPE) - assert prefix_result.returncode +some_cli_packages = [ + 'ros2cli', + 'ros2pkg' +] + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +class TestROS2PkgCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_pkg_command(self, arguments, **kwargs): + pkg_command_action = ExecuteProcess( + cmd=['ros2', 'pkg', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2pkg-cli', + output='screen', + **kwargs + ) + with launch_testing.tools.launch_process( + launch_service, pkg_command_action, proc_info, proc_output + ) as pkg_command: + yield pkg_command + cls.launch_pkg_command = launch_pkg_command + + def test_list_packages(self): + with self.launch_pkg_command(arguments=['list']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = pkg_command.output.splitlines() + assert all(pkg in output_lines for pkg in some_cli_packages) + + def test_package_prefix(self): + with self.launch_pkg_command(arguments=['prefix', 'ros2cli']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = pkg_command.output.splitlines() + assert len(output_lines) == 1 + prefix_path = output_lines[0] + assert os.path.isdir(prefix_path) + + def test_not_a_package_prefix(self): + with self.launch_pkg_command(arguments=['prefix', 'not_a_package']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Package not found'], + text=pkg_command.output, + strict=True + ) + + def test_xml(self): + with self.launch_pkg_command(arguments=['xml', 'ros2cli']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + root = ET.XML(pkg_command.output) + assert root.tag == 'package' + assert root.find('name').text == 'ros2cli' + + def test_not_a_package_xml(self): + with self.launch_pkg_command(arguments=['xml', 'not_a_package']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Package not found'], + text=pkg_command.output, + strict=True + ) + + def test_create_package(self): + with tempfile.TemporaryDirectory() as tmpdir: + with self.launch_pkg_command( + arguments=[ + 'create', 'a_test_package', + '--package-format', '3', + '--description', 'A test package dummy description', + '--license', 'Apache License 2.0', + '--build-type', 'ament_cmake', + '--dependencies', 'ros2pkg', + '--maintainer-email', 'nobody@nowhere.com', + '--maintainer-name', 'Nobody', + '--node-name', 'test_node', + '--library-name', 'test_library' + ], cwd=tmpdir + ) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=5) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'going to create a new package', + 'package name: a_test_package', + 'destination directory: ' + os.path.realpath(tmpdir), + 'package format: 3', + 'version: 0.0.0', + 'description: A test package dummy description', + "maintainer: ['Nobody ']", + "licenses: ['Apache License 2.0']", + 'build type: ament_cmake', + "dependencies: ['ros2pkg']", + 'node_name: test_node', + 'library_name: test_library', + 'creating folder ' + os.path.join('.', 'a_test_package'), + 'creating ' + os.path.join('.', 'a_test_package', 'package.xml'), + 'creating source and include folder', + 'creating folder ' + os.path.join('.', 'a_test_package', 'src'), + 'creating folder ' + os.path.join( + '.', 'a_test_package', 'include', 'a_test_package' + ), + 'creating ' + os.path.join('.', 'a_test_package', 'CMakeLists.txt'), + 'creating ' + os.path.join( + '.', 'a_test_package', 'src', 'test_node.cpp' + ), + 'creating ' + os.path.join( + '.', 'a_test_package', 'include', 'a_test_package', 'test_library.hpp' + ), + 'creating ' + os.path.join( + '.', 'a_test_package', 'src', 'test_library.cpp' + ), + 'creating ' + os.path.join( + '.', 'a_test_package', 'include', 'a_test_package', 'visibility_control.h' + ), + ], + text=pkg_command.output, + strict=True + ) + # Check layout + assert os.path.isdir(os.path.join(tmpdir, 'a_test_package')) + assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'package.xml')) + assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'CMakeLists.txt')) + assert os.path.isfile( + os.path.join(tmpdir, 'a_test_package', 'src', 'test_node.cpp') + ) + assert os.path.isfile( + os.path.join(tmpdir, 'a_test_package', 'src', 'test_library.cpp') + ) + assert os.path.isfile(os.path.join( + tmpdir, 'a_test_package', 'include', 'a_test_package', 'test_library.hpp' + )) + assert os.path.isfile(os.path.join( + tmpdir, 'a_test_package', 'include', 'a_test_package', 'visibility_control.h' + )) + # Check package.xml + tree = ET.parse(os.path.join(tmpdir, 'a_test_package', 'package.xml')) + root = tree.getroot() + assert root.tag == 'package' + assert root.attrib['format'] == '3' + assert root.find('name').text == 'a_test_package' + assert root.find('description').text == 'A test package dummy description' + assert root.find('maintainer').text == 'Nobody' + assert root.find('maintainer').attrib['email'] == 'nobody@nowhere.com' + assert root.find('license').text == 'Apache License 2.0' + assert root.find('depend').text == 'ros2pkg' + assert root.find('.//build_type').text == 'ament_cmake' diff --git a/ros2service/package.xml b/ros2service/package.xml index d49759e75..519faf228 100644 --- a/ros2service/package.xml +++ b/ros2service/package.xml @@ -20,6 +20,8 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing + test_msgs ament_python diff --git a/ros2service/test/fixtures/echo_server.py b/ros2service/test/fixtures/echo_server.py new file mode 100644 index 000000000..ca3584d7c --- /dev/null +++ b/ros2service/test/fixtures/echo_server.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node + +from test_msgs.srv import BasicTypes + + +class EchoServer(Node): + + def __init__(self): + super().__init__('echo_server') + self.server = self.create_service(BasicTypes, 'echo', self.callback) + + def callback(self, request, response): + for field_name in request.get_fields_and_field_types(): + setattr(response, field_name, getattr(request, field_name)) + return response + + +def main(args=None): + rclpy.init(args=args) + + node = EchoServer() + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('server stopped cleanly') + except BaseException: + print('exception in server:', file=sys.stderr) + raise + finally: + # Destroy the node explicitly + # (optional - Done automatically when node is garbage collected) + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2service/test/test_cli.py b/ros2service/test/test_cli.py new file mode 100644 index 000000000..c0ff38f2c --- /dev/null +++ b/ros2service/test/test_cli.py @@ -0,0 +1,313 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import itertools +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + +from test_msgs.srv import BasicTypes + + +def get_echo_call_output(**kwargs): + request = BasicTypes.Request() + for field_name, field_value in kwargs.items(): + setattr(request, field_name, field_value) + response = BasicTypes.Response() + for field_name, field_value in kwargs.items(): + setattr(response, field_name, field_value) + return [ + 'requester: making request: ' + repr(request), + '', + 'response:', + repr(response), + '' + ] + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_echo_server_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'echo_server.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_echo_server_script], + node_name='echo_server', + node_namespace='my_ns', + additional_env=additional_env, + ), + Node( + node_executable=sys.executable, + arguments=[path_to_echo_server_script], + node_name='_hidden_echo_server', + node_namespace='my_ns', + remappings=[('echo', '_echo')], + additional_env=additional_env, + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2ServiceCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_service_command(self, arguments): + service_command_action = ExecuteProcess( + cmd=['ros2', 'service', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2service-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, service_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_prefixes=[ + 'waiting for service to become available...', + '/launch_ros' # cope with launch_ros internal node. + ], + filtered_rmw_implementation=rmw_implementation + ) + ) as service_command: + yield service_command + cls.launch_service_command = launch_service_command + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_services(self): + with self.launch_service_command(arguments=['list']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/echo'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_hidden(self): + with self.launch_service_command( + arguments=['--include-hidden-services', 'list'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/_echo'], + itertools.repeat(re.compile( + r'/my_ns/_hidden_echo_server/.*parameter.*' + ), 6), + ['/my_ns/echo'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_service_command(arguments=['list', '-t']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/echo [test_msgs/srv/BasicTypes]'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + r' \[rcl_interfaces/srv/.*Parameter.*\]' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_service_command(arguments=['list', '-c']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = service_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 7 + 6 # cope with launch_ros internal node. + + @launch_testing.markers.retry_on_failure(times=5) + def test_find(self): + with self.launch_service_command( + arguments=['find', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/my_ns/echo'], + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_hidden(self): + with self.launch_service_command( + arguments=['find', '--include-hidden-services', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/my_ns/_echo', '/my_ns/echo'], + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_count(self): + with self.launch_service_command( + arguments=['find', '-c', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = service_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 1 + + def test_find_not_a_service_type(self): + with self.launch_service_command( + arguments=['find', 'not_a_service_type'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert service_command.output == '' + + def test_type(self): + with self.launch_service_command( + arguments=['type', '/my_ns/echo'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['test_msgs/srv/BasicTypes'], + text=service_command.output, + strict=True + ) + + def test_type_on_not_a_service(self): + with self.launch_service_command( + arguments=['type', '/not_a_service'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == 1 + assert service_command.output == '' + + @launch_testing.markers.retry_on_failure(times=5) + def test_call_no_args(self): + with self.launch_service_command( + arguments=['call', '/my_ns/echo', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_echo_call_output(), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_call(self): + with self.launch_service_command( + arguments=[ + 'call', + '/my_ns/echo', + 'test_msgs/srv/BasicTypes', + '{bool_value: false, int32_value: -1, float64_value: 0.1, string_value: bazbar}' + ] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_echo_call_output( + bool_value=False, + int32_value=-1, + float64_value=0.1, + string_value='bazbar' + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_repeated_call(self): + with self.launch_service_command( + arguments=[ + 'call', + '-r', '1', + '/my_ns/echo', + 'test_msgs/srv/BasicTypes', + '{bool_value: true, int32_value: 1, float64_value: 1.0, string_value: foobar}' + ], + ) as service_command: + assert service_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=2 * get_echo_call_output( + bool_value=True, int32_value=1, float64_value=1.0, string_value='foobar' + ) + ), timeout=10) diff --git a/ros2srv/package.xml b/ros2srv/package.xml index a7469c30f..d6603ff50 100644 --- a/ros2srv/package.xml +++ b/ros2srv/package.xml @@ -18,6 +18,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs diff --git a/ros2srv/test/test_cli.py b/ros2srv/test/test_cli.py index de0906840..b6322eafa 100644 --- a/ros2srv/test/test_cli.py +++ b/ros2srv/test/test_cli.py @@ -1,4 +1,4 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. +# Copyright 2019 Open Source Robotics Foundation, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,47 +12,171 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'srv', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_srvs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'srv', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - service_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in service_types) - count += len(service_types) - - if package_name != 'std_srvs': - continue - for service_name in [t[len(package_name) + 1:] for t in service_types]: - show_cmd = [ - 'ros2', 'srv', 'show', package_name + '/' + service_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if service_name == 'Empty': - assert show_result.stdout.rstrip() == b'---' - - list_cmd = ['ros2', 'srv', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - service_types = list_result.stdout.decode().splitlines() - assert len(service_types) == count - - package_cmd = ['ros2', 'srv', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'srv', 'show', 'std_srvs/_not_existing_service_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +some_services_from_std_srvs = [ + 'std_srvs/srv/Empty', + 'std_srvs/srv/SetBool', + 'std_srvs/srv/Trigger', +] + + +class TestROS2SrvCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_srv_command(self, arguments): + srv_command_action = ExecuteProcess( + cmd=['ros2', 'srv', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2srv-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, srv_command_action, proc_info, proc_output, + output_filter=launch_testing.tools.basic_output_filter( + filtered_patterns=[ + r".*'ros2 srv' is deprecated and will be removed in a future ROS release.*" + ] + ) + ) as srv_command: + yield srv_command + cls.launch_srv_command = launch_srv_command + + def test_list_service_types(self): + with self.launch_srv_command(arguments=['list']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = srv_command.output.splitlines() + assert all(srv in output_lines for srv in some_services_from_std_srvs) + assert all(re.match(r'.*/srv/.*', line) is not None for line in output_lines) + + def test_list_service_types_in_a_package(self): + with self.launch_srv_command(arguments=['package', 'std_srvs']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = srv_command.output.splitlines() + assert all(srv in output_lines for srv in some_services_from_std_srvs) + assert all(re.match(r'std_srvs/srv/.*', line) is not None for line in output_lines) + + def test_list_service_types_in_not_a_package(self): + with self.launch_srv_command(arguments=['package', 'not_a_package']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=srv_command.output + ) + + def test_list_packages_with_service_types(self): + with self.launch_srv_command(arguments=['packages']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert 'std_srvs' in srv_command.output.splitlines() + + def test_show_service_type(self): + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/SetBool'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'bool data', + '---', + 'bool success', + 'string message' + ], + text=srv_command.output, + strict=False + ) + + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/Trigger'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '---', + 'bool success', + 'string message' + ], + text=srv_command.output, + strict=False + ) + + def test_show_not_a_service_typename(self): + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/NotAServiceTypeName'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown service name'], + text=srv_command.output, + strict=True + ) + + # TODO(hidmic): make 'ros2 srv show' more robust + # def test_show_not_a_service_ns(self): + # with self.launch_srv_command( + # arguments=['show', 'std_srvs/foo/Empty'] + # ) as srv_command: + # assert srv_command.wait_for_shutdown(timeout=10) + # assert srv_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown service name'], + # text=srv_command.output, + # strict=True + # ) + + def test_show_not_a_package(self): + with self.launch_srv_command( + arguments=['show', 'not_a_package/srv/Empty'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=srv_command.output, + strict=True + ) + + def test_show_not_a_service_type(self): + with self.launch_srv_command( + arguments=['show', 'not_a_service_type'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed service type is invalid'], + text=srv_command.output, + strict=True + ) diff --git a/ros2topic/package.xml b/ros2topic/package.xml index c2c9ce70d..598b9289f 100644 --- a/ros2topic/package.xml +++ b/ros2topic/package.xml @@ -21,8 +21,11 @@ ament_flake8 ament_pep257 ament_xmllint + geometry_msgs python3-pytest + ros_testing std_msgs + test_msgs ament_python diff --git a/ros2topic/test/fixtures/controller_node.py b/ros2topic/test/fixtures/controller_node.py new file mode 100644 index 000000000..1dd926597 --- /dev/null +++ b/ros2topic/test/fixtures/controller_node.py @@ -0,0 +1,58 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from geometry_msgs.msg import TwistStamped + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + + +class ControllerNode(Node): + + def __init__(self): + super().__init__('controller') + self.pub = self.create_publisher( + TwistStamped, 'cmd_vel', qos_profile_system_default + ) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + msg = TwistStamped() + msg.header.stamp = self.get_clock().now().to_msg() + msg.twist.linear.x = 1.0 + self.pub.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + + node = ControllerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('controller stopped cleanly') + except BaseException: + print('exception in controller:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/fixtures/listener_node.py b/ros2topic/test/fixtures/listener_node.py new file mode 100644 index 000000000..8ee4fddfa --- /dev/null +++ b/ros2topic/test/fixtures/listener_node.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + +from std_msgs.msg import String + + +class ListenerNode(Node): + + def __init__(self): + super().__init__('listener') + self.sub = self.create_subscription( + String, 'chatter', self.callback, qos_profile_system_default + ) + + def callback(self, msg): + self.get_logger().info('I heard: [%s]' % msg.data) + + +def main(args=None): + rclpy.init(args=args) + + node = ListenerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('listener stopped cleanly') + except BaseException: + print('exception in listener:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/fixtures/repeater_node.py b/ros2topic/test/fixtures/repeater_node.py new file mode 100644 index 000000000..8fd4a41a6 --- /dev/null +++ b/ros2topic/test/fixtures/repeater_node.py @@ -0,0 +1,73 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default +from rclpy.utilities import remove_ros_args + +from ros2topic.api import import_message_type + + +class RepeaterNode(Node): + + def __init__(self, message_type): + super().__init__('repeater_node') + self.message_type = message_type + self.pub = self.create_publisher( + self.message_type, '~/output', qos_profile_system_default + ) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + self.pub.publish(self.message_type()) + + +def message_type(message_typename): + return import_message_type('~/output', message_typename) + + +def parse_arguments(args=None): + parser = argparse.ArgumentParser() + parser.add_argument( + 'message_type', type=message_type, + help='Message type for the repeater to publish.' + ) + return parser.parse_args(args=remove_ros_args(args)) + + +def main(args=None): + parsed_args = parse_arguments(args=args) + + rclpy.init(args=args) + + node = RepeaterNode(message_type=parsed_args.message_type) + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('repeater stopped cleanly') + except BaseException: + print('exception in repeater:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main(args=sys.argv[1:]) diff --git a/ros2topic/test/fixtures/talker_node.py b/ros2topic/test/fixtures/talker_node.py new file mode 100644 index 000000000..a29f6ecdc --- /dev/null +++ b/ros2topic/test/fixtures/talker_node.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node + +from std_msgs.msg import String + + +class TalkerNode(Node): + + def __init__(self): + super().__init__('talker_node') + self.count = 1 + self.pub = self.create_publisher(String, 'chatter', 10) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + self.pub.publish(String(data='Hello World: {0}'.format(self.count))) + self.count += 1 + + +def main(args=None): + rclpy.init(args=args) + + node = TalkerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('talker stopped cleanly') + except BaseException: + print('exception in talker:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py new file mode 100644 index 000000000..7d704fdc5 --- /dev/null +++ b/ros2topic/test/test_cli.py @@ -0,0 +1,652 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import math +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures') + additional_env = { + 'RMW_IMPLEMENTATION': rmw_implementation, 'PYTHONUNBUFFERED': '1' + } + + path_to_talker_node_script = os.path.join(path_to_fixtures, 'talker_node.py') + path_to_listener_node_script = os.path.join(path_to_fixtures, 'listener_node.py') + + hidden_talker_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_talker_node_script], + remappings=[('chatter', '_hidden_chatter')], + additional_env=additional_env + ) + talker_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_talker_node_script], + additional_env=additional_env + ) + listener_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_listener_node_script], + remappings=[('chatter', 'chit_chatter')], + additional_env=additional_env + ) + + path_to_repeater_node_script = os.path.join(path_to_fixtures, 'repeater_node.py') + + array_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_repeater_node_script, 'test_msgs/msg/Arrays'], + node_name='array_repeater', + remappings=[('/array_repeater/output', '/arrays')], + output='screen', + additional_env=additional_env + ) + defaults_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_repeater_node_script, 'test_msgs/msg/Defaults'], + node_name='defaults_repeater', + remappings=[('/defaults_repeater/output', '/defaults')], + additional_env=additional_env, + ) + bounded_sequences_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[ + path_to_repeater_node_script, 'test_msgs/msg/BoundedSequences' + ], + node_name='bounded_sequences_repeater', + remappings=[('/bounded_sequences_repeater/output', '/bounded_sequences')], + additional_env=additional_env + ) + unbounded_sequences_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[ + path_to_repeater_node_script, 'test_msgs/msg/UnboundedSequences' + ], + node_name='unbounded_sequences_repeater', + remappings=[('/unbounded_sequences_repeater/output', '/unbounded_sequences')], + additional_env=additional_env + ) + + path_to_controller_node_script = os.path.join(path_to_fixtures, 'controller_node.py') + + cmd_vel_controller_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_controller_node_script], + additional_env=additional_env + ) + + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add talker/listener pair. + talker_node_action, + listener_node_action, + # Add hidden talker. + hidden_talker_node_action, + # Add topic repeaters. + array_repeater_node_action, + defaults_repeater_node_action, + bounded_sequences_repeater_node_action, + unbounded_sequences_repeater_node_action, + # Add stamped data publisher. + cmd_vel_controller_node_action, + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]), locals() + + +class TestROS2TopicCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation, + listener_node_action + ): + rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter( + filtered_patterns=['WARNING: topic .* does not appear to be published yet'], + filtered_rmw_implementation=rmw_implementation + ) + + @contextlib.contextmanager + def launch_topic_command(self, arguments): + topic_command_action = ExecuteProcess( + cmd=['ros2', 'topic', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2topic-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, topic_command_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) as topic_command: + yield topic_command + cls.launch_topic_command = launch_topic_command + + cls.listener_node = launch_testing.tools.ProcessProxy( + listener_node_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_topics(self): + with self.launch_topic_command(arguments=['list']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/arrays', + '/bounded_sequences', + '/chatter', + '/chit_chatter', + '/cmd_vel', + '/defaults', + '/parameter_events', + '/rosout', + '/unbounded_sequences', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_topics(self): + with self.launch_topic_command( + arguments=['list', '--include-hidden-topics'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/_hidden_chatter', + '/arrays', + '/bounded_sequences', + '/chatter', + '/chit_chatter', + '/cmd_vel', + '/defaults', + '/parameter_events', + '/rosout', + '/unbounded_sequences', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_topic_command(arguments=['list', '-t']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/arrays [test_msgs/msg/Arrays]', + '/bounded_sequences [test_msgs/msg/BoundedSequences]', + '/chatter [std_msgs/msg/String]', + '/chit_chatter [std_msgs/msg/String]', + '/cmd_vel [geometry_msgs/msg/TwistStamped]', + '/defaults [test_msgs/msg/Defaults]', + '/parameter_events [rcl_interfaces/msg/ParameterEvent]', + '/rosout [rcl_interfaces/msg/Log]', + '/unbounded_sequences [test_msgs/msg/UnboundedSequences]', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_topic_command(arguments=['list', '-c']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = topic_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 9 + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_info(self): + with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Topic: /chatter', + 'Publisher count: 1', + 'Subscriber count: 0' + ], + text=topic_command.output, + strict=True + ) + + def test_info_on_unknown_topic(self): + with self.launch_topic_command(arguments=['info', '/unknown_topic']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Topic: /unknown_topic', + 'Publisher count: 0', + 'Subscriber count: 0' + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_type(self): + with self.launch_topic_command(arguments=['type', '/chatter']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['std_msgs/msg/String'], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_hidden_topic_type(self): + with self.launch_topic_command( + arguments=['type', '/_hidden_chatter'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == 1 + assert topic_command.output == '' + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_topic_type(self): + with self.launch_topic_command( + arguments=['find', 'rcl_interfaces/msg/Log'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/rosout'], text=topic_command.output, strict=True + ) + + def test_find_not_a_topic_typename(self): + with self.launch_topic_command( + arguments=['find', 'rcl_interfaces/msg/NotAMessageTypeName'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert not topic_command.output + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '/chatter'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r"data: 'Hello World: \d+'"), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_str_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--no-str', '/chatter'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r"data: '>'"), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_csv_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--csv', '/defaults'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "True,b'2',100,1.125,1.125,-50,200,-1000,2000,-30000,60000,-40000000,50000000" + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_array_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/arrays'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + "basic_types_values: ''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=False + ), timeout=10), 'Output does not match: ' + topic_command.output + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_seq_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/unbounded_sequences'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + "basic_types_values: ''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_bounded_seq_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/bounded_sequences'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + 'basic_types_values: ' + "''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_truncate_length_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--truncate-length', '5', '/chatter'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'data: Hello...'), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + def test_topic_pub(self): + with self.launch_topic_command( + arguments=['pub', '/chit_chatter', 'std_msgs/msg/String', '{data: foo}'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='foo')", + '' + ], strict=True + ), timeout=10) + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [foo]' + ] * 3, strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + def test_topic_pub_once(self): + with self.launch_topic_command( + arguments=[ + 'pub', '--once', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: bar}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='bar')", + '' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [bar]' + ], strict=True + ), timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + + def test_topic_pub_print_every_two(self): + with self.launch_topic_command( + arguments=[ + 'pub', + '-p', '2', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: fizz}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #2: std_msgs.msg.String(data='fizz')", + '', + "publishing #4: std_msgs.msg.String(data='fizz')", + '' + ], strict=True + ), timeout=10), 'Output does not match: ' + topic_command.output + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [fizz]' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_delay(self): + average_delay_line_pattern = re.compile(r'average delay: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command(arguments=['delay', '/cmd_vel']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_delay_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + head_line = topic_command.output.splitlines()[0] + average_delay = float(average_delay_line_pattern.match(head_line).group(1)) + assert math.isclose(average_delay, 0.0, abs_tol=10e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_hz(self): + average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command(arguments=['hz', '/chatter']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_rate_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + + head_line = topic_command.output.splitlines()[0] + average_rate = float(average_rate_line_pattern.match(head_line).group(1)) + assert math.isclose(average_rate, 1., rel_tol=1e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_filtered_topic_hz(self): + average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command( + arguments=[ + 'hz', + '--filter', + 'int(m.data.rpartition(\":\")[-1]) % 2 == 0', + '/chatter' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_rate_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10), 'Output does not match: ' + topic_command.output + assert topic_command.wait_for_shutdown(timeout=10) + + head_line = topic_command.output.splitlines()[0] + average_rate = float(average_rate_line_pattern.match(head_line).group(1)) + assert math.isclose(average_rate, 0.5, rel_tol=1e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_bw(self): + with self.launch_topic_command(arguments=['bw', '/defaults']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'Subscribed to [/defaults]', + re.compile(r'average: \d{2}\.\d{2}B/s'), + re.compile( + r'\s*mean: \d{2}\.\d{2}B/s min: \d{2}\.\d{2}B/s' + r' max: \d{2}\.\d{2}B/s window: \d+' + ) + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) diff --git a/test_ros2cli/CMakeLists.txt b/test_ros2cli/CMakeLists.txt deleted file mode 100644 index 7464b1b6f..000000000 --- a/test_ros2cli/CMakeLists.txt +++ /dev/null @@ -1,85 +0,0 @@ -cmake_minimum_required(VERSION 3.5) - -project(test_ros2cli) - -# Default to C++14 -if(NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 14) -endif() -if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) -endif() - -find_package(ament_cmake REQUIRED) - -if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - ament_lint_auto_find_test_dependencies() - - find_package(ros_testing REQUIRED) - find_package(rmw_implementation_cmake REQUIRED) - - macro(custom_test_process_output test_name config_file_name timeout delay) - # Set variables for configuring the test files. - set(TEST_NAME ${test_name}) # test name used in the test cases - set(TEST_CONFIGURATION_FILE ${config_file_name}) # config file to use - set(DELAY ${delay}) # Delay between the actions are launch and the ros2 command is run - - configure_file( - test/test_process_output_customizable.py.in - ${test_name}${target_suffix}.py.genexp - @ONLY - ) - file(GENERATE - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - INPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}.py.genexp" - ) - install( - FILES test/${config_file_name}.py - DESTINATION "${CMAKE_CURRENT_BINARY_DIR}" - ) - add_ros_test( - "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - TARGET ${test_name}${target_suffix} - ENV RMW_IMPLEMENTATION=${rmw_implementation} - APPEND_LIBRARY_DIRS "${append_library_dirs}" - TIMEOUT ${timeout} - ) - list( - APPEND generated_python_files - "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - ) - endmacro() - - set(generated_python_files) - macro(tests) - set(TEST_ROS2ACTION_DELAY 0.0) - if("${rmw_implementation}" STREQUAL "rmw_connext_cpp") - # Connext startup is too slow. It needs a few seconds until discovery starts working. - set(TEST_ROS2ACTION_DELAY 5.0) - endif() - custom_test_process_output(test_ros2action config_ros2action_test 180 ${TEST_ROS2ACTION_DELAY}) - custom_test_process_output(test_ros2msg config_ros2msg_test 60 0.0) - endmacro() - install( - FILES test/test_config.py - DESTINATION "${CMAKE_CURRENT_BINARY_DIR}" - ) - - set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}") - if(WIN32) - set(append_library_dirs "${append_library_dirs}/$") - endif() - - call_for_each_rmw_implementation(tests) - - find_package(ament_cmake_flake8 REQUIRED) - ament_flake8( - TESTNAME "flake8_generated_launch" - # the generated code might contain longer lines for templated types - MAX_LINE_LENGTH 999 - ${generated_python_files} - ) -endif() - -ament_package() diff --git a/test_ros2cli/package.xml b/test_ros2cli/package.xml deleted file mode 100644 index 26692d237..000000000 --- a/test_ros2cli/package.xml +++ /dev/null @@ -1,25 +0,0 @@ - - - - test_ros2cli - 0.8.2 - - Tests for ROS2 command line tools. - - Ivan Paunovic - Apache License 2.0 - - action_tutorials_interfaces - action_tutorials_cpp - ament_lint_auto - ament_lint_common - ros_testing - ros2action - ros2cli - ros2msg - std_msgs - - - ament_cmake - - diff --git a/test_ros2cli/test/config_ros2action_test.py b/test_ros2cli/test/config_ros2action_test.py deleted file mode 100644 index 895f69134..000000000 --- a/test_ros2cli/test/config_ros2action_test.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -from launch.substitutions import LaunchConfiguration - -from launch_ros.actions import Node - -sys.path.append(os.path.dirname(__file__)) - -from test_config import TestConfig # noqa - - -def get_action_server_node_action(): - return Node( - package='action_tutorials_cpp', - node_executable='fibonacci_action_server', - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=30) - ) - - -common_info_output = [ - 'Action: /fibonacci', - 'Action clients: 0', - 'Action servers: 1', -] - -common_send_goal_output = [ - 'Waiting for an action server to become available...', - 'Sending goal:', - 'order: 5', - 'Goal accepted with ID:', - 'Result:', - 'sequence:', - '0', - '1', - '2', - '3', - '5', - 'Goal finished with status: SUCCEEDED', -] - -configs = [ - TestConfig( - command='action', - arguments=['info', '/fibonacci'], - actions=[get_action_server_node_action()], - expected_output=common_info_output + ['/fibonacci_action_server'], - ), - TestConfig( - command='action', - arguments=['info', '-t', '/fibonacci'], - actions=[get_action_server_node_action()], - expected_output=common_info_output + [ - '/fibonacci_action_server [action_tutorials_interfaces/action/Fibonacci]' - ], - ), - TestConfig( - command='action', - arguments=['info', '-c', '/fibonacci'], - actions=[get_action_server_node_action()], - expected_output=common_info_output, - ), - TestConfig( - command='action', - arguments=['list'], - actions=[get_action_server_node_action()], - expected_output=['/fibonacci'], - ), - TestConfig( - command='action', - arguments=['list', '-t'], - actions=[get_action_server_node_action()], - expected_output=['/fibonacci [action_tutorials_interfaces/action/Fibonacci]'], - ), - TestConfig( - command='action', - arguments=['list', '-c'], - actions=[get_action_server_node_action()], - expected_output=['1'], - ), - TestConfig( - command='action', - arguments=[ - 'send_goal', - '/fibonacci', - 'action_tutorials_interfaces/action/Fibonacci', - '{order: 5}' - ], - actions=[get_action_server_node_action()], - expected_output=common_send_goal_output, - ), - TestConfig( - command='action', - arguments=[ - 'send_goal', - '-f', - '/fibonacci', - 'action_tutorials_interfaces/action/Fibonacci', - '{order: 5}' - ], - actions=[get_action_server_node_action()], - expected_output=common_send_goal_output + [ - 'Feedback:', - 'partial_sequence:', - ], - ), - TestConfig( - command='action', - arguments=['show', 'action_tutorials_interfaces/action/Fibonacci'], - expected_output=[ - 'int32 order', - '---', - 'int32[] sequence', - '---', - 'int32[] partial_sequence' - ], - ), -] diff --git a/test_ros2cli/test/config_ros2msg_test.py b/test_ros2cli/test/config_ros2msg_test.py deleted file mode 100644 index 8ab5adfe2..000000000 --- a/test_ros2cli/test/config_ros2msg_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -sys.path.append(os.path.dirname(__file__)) - -from test_config import TestConfig # noqa - -some_messages_from_std_msgs = [ - 'std_msgs/msg/Bool', - 'std_msgs/msg/Float32', - 'std_msgs/msg/Float64', -] - -configs = [ - TestConfig( - command='msg', - arguments=['list'], - expected_output=some_messages_from_std_msgs, - ), - TestConfig( - command='msg', - arguments=['package', 'std_msgs'], - expected_output=some_messages_from_std_msgs, - ), - TestConfig( - command='msg', - arguments=['packages'], - expected_output=['std_msgs'], - ), - TestConfig( - command='msg', - arguments=['show', 'std_msgs/msg/String'], - expected_output=['string data'], - ), - TestConfig( - command='msg', - arguments=['show', 'std_msgs/msg/String'], - expected_output=['string data'], - ), -] diff --git a/test_ros2cli/test/test_config.py b/test_ros2cli/test/test_config.py deleted file mode 100644 index 86c207453..000000000 --- a/test_ros2cli/test/test_config.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Iterable -from typing import Optional - -from launch import Action -from launch import SomeSubstitutionsType -from launch.utilities import normalize_to_list_of_substitutions - - -class TestConfig(): - """A class for configuring the `test_process_output_customizable.py.in` test.""" - - def __init__( - self, - *, - command: str, - arguments: Iterable[SomeSubstitutionsType], - actions: Iterable[Action] = [], - expected_output: Iterable[str] = [], - description: Optional[str] = None - ): - """ - Constructor. - - :param command: `ros2` command to be tested. - :param description: description of the test being done. command to be tested. - It usually contains the verb and arguments being tested. - :param arguments: A list of `SomeSubstitutionsType`, which are passed - as arguments to the command. - :param actions: A list of actions, which are launched before the `ros2` command. - :param expected_output: A list of str, which are checked to be contained in the output - of the `ros2` command. - """ - self.command = command - self.arguments = [normalize_to_list_of_substitutions(arg) for arg in arguments] - self.actions = actions - self.expected_output = expected_output - self.description = description - - def describe(some_subs: SomeSubstitutionsType): - return ''.join([sub.describe() for sub in some_subs]) - if description is None: - self.description = 'ros2 {} {}'.format( - command, - ' '.join([describe(some_subs) for some_subs in self.arguments])) - - def __repr__(self): - """Return the description.""" - return self.description diff --git a/test_ros2cli/test/test_process_output_customizable.py.in b/test_ros2cli/test/test_process_output_customizable.py.in deleted file mode 100644 index 72726e4c1..000000000 --- a/test_ros2cli/test/test_process_output_customizable.py.in +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -import unittest - -from launch import LaunchDescription -from launch.actions import ExecuteProcess -from launch.actions import OpaqueFunction -from launch.actions import TimerAction -from launch.substitutions import LaunchConfiguration - -import launch_testing -import launch_testing.asserts - -sys.path.append(os.path.dirname(__file__)) - -from @TEST_CONFIGURATION_FILE@ import configs # noqa - - -@launch_testing.parametrize('config', configs) -def generate_test_description(config, ready_fn): - cmd = ['ros2', config.command] - cmd.extend(config.arguments) - process_under_test = ExecuteProcess( - cmd=cmd, - name='@TEST_NAME@', - output='screen', - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=60) - ) - actions = config.actions - actions.append(TimerAction( - period=@DELAY@, - actions=[process_under_test], - )) - launch_description = LaunchDescription(actions) - launch_description.add_action( - OpaqueFunction(function=lambda context: ready_fn()) - ) - msgs = config.expected_output - return launch_description, locals() - - -class TestCommandFinishesInAFiniteAmountOfTime(unittest.TestCase): - - def @TEST_NAME@(self, process_under_test): - """Test that ros2 command finished in a finite amount of time.""" - self.proc_info.assertWaitForShutdown(process=process_under_test, timeout=60) - - -@launch_testing.post_shutdown_test() -class TestCommandOutput(unittest.TestCase): - - def @TEST_NAME@(self, proc_info, proc_output, process_under_test, msgs): - """Test that both executables finished cleanly.""" - launch_testing.asserts.assertExitCodes( - proc_info, - [launch_testing.asserts.EXIT_OK], - process_under_test - ) - for msg in msgs: - launch_testing.asserts.assertInStdout( - proc_output, - msg, - process_under_test, - )