diff --git a/ros2action/package.xml b/ros2action/package.xml
index bbccb3234..dac59edde 100644
--- a/ros2action/package.xml
+++ b/ros2action/package.xml
@@ -23,6 +23,7 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
test_msgs
diff --git a/ros2action/test/fixtures/fibonacci_action_server.py b/ros2action/test/fixtures/fibonacci_action_server.py
new file mode 100644
index 000000000..19f3948f2
--- /dev/null
+++ b/ros2action/test/fixtures/fibonacci_action_server.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.import time
+
+import sys
+
+import rclpy
+from rclpy.action import ActionServer
+from rclpy.node import Node
+
+from test_msgs.action import Fibonacci
+
+
+class FibonacciActionServer(Node):
+
+ def __init__(self):
+ super().__init__('fibonacci_action_server')
+ self._action_server = ActionServer(
+ self,
+ Fibonacci,
+ 'fibonacci',
+ self.execute_callback)
+
+ def destroy_node(self):
+ self._action_server.destroy()
+ super().destroy_node()
+
+ def execute_callback(self, goal_handle):
+ feedback = Fibonacci.Feedback()
+ feedback.sequence = [0, 1]
+
+ for i in range(1, goal_handle.request.order):
+ feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1])
+ goal_handle.publish_feedback(feedback)
+
+ goal_handle.succeed()
+
+ result = Fibonacci.Result()
+ result.sequence = feedback.sequence
+ return result
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = FibonacciActionServer()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('server stopped cleanly')
+ except BaseException:
+ print('exception in server:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2action/test/test_cli.py b/ros2action/test/test_cli.py
new file mode 100644
index 000000000..90a4cd390
--- /dev/null
+++ b/ros2action/test/test_cli.py
@@ -0,0 +1,313 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import os
+import re
+import sys
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
+from rmw_implementation import get_available_rmw_implementations
+
+import yaml
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation, ready_fn):
+ path_to_action_server_executable = os.path.join(
+ os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py'
+ )
+ return LaunchDescription([
+ # Always restart daemon to isolate tests.
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=[
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=[
+ ExecuteProcess(
+ cmd=[sys.executable, path_to_action_server_executable],
+ additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
+ ),
+ OpaqueFunction(function=lambda context: ready_fn())
+ ],
+ additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
+ )
+ ]
+ ),
+ ])
+
+
+def get_fibonacci_send_goal_output(*, order=1, with_feedback=False):
+ assert order > 0
+ output = [
+ 'Waiting for an action server to become available...',
+ 'Sending goal:',
+ ' order: {}'.format(order),
+ '',
+ re.compile('Goal accepted with ID: [a-f0-9]+'),
+ '',
+ ]
+ sequence = [0, 1]
+ for _ in range(order - 1):
+ sequence.append(sequence[-1] + sequence[-2])
+ if with_feedback:
+ output.append('Feedback:')
+ output.extend((' ' + yaml.dump({
+ 'sequence': sequence
+ })).splitlines())
+ output.append('')
+ output.append('Result:'),
+ output.extend((' ' + yaml.dump({
+ 'sequence': sequence
+ })).splitlines())
+ output.append('')
+ output.append('Goal finished with status: SUCCEEDED')
+ return output
+
+
+class TestROS2ActionCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation
+ ):
+ @contextlib.contextmanager
+ def launch_action_command(self, arguments):
+ action_command_action = ExecuteProcess(
+ cmd=['ros2', 'action', *arguments],
+ name='ros2action-cli', output='screen',
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ 'PYTHONUNBUFFERED': '1'
+ }
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, action_command_action, proc_info, proc_output,
+ output_filter=launch_testing_ros.tools.basic_output_filter(
+ filtered_rmw_implementation=rmw_implementation
+ )
+ ) as action_command:
+ yield action_command
+ cls.launch_action_command = launch_action_command
+
+ def test_info_on_nonexistent_action(self):
+ with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Action: /not_an_action',
+ 'Action clients: 0',
+ 'Action servers: 0',
+ ],
+ text=action_command.output,
+ strict=False
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_fibonacci_info(self):
+ with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Action: /fibonacci',
+ 'Action clients: 0',
+ 'Action servers: 1',
+ ' /fibonacci_action_server'
+ ],
+ text=action_command.output,
+ strict=False
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_fibonacci_info_with_types(self):
+ with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Action: /fibonacci',
+ 'Action clients: 0',
+ 'Action servers: 1',
+ ' /fibonacci_action_server [test_msgs/action/Fibonacci]'
+ ],
+ text=action_command.output,
+ strict=False
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_fibonacci_info_count(self):
+ with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Action: /fibonacci',
+ 'Action clients: 0',
+ 'Action servers: 1',
+ ],
+ text=action_command.output,
+ strict=False
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list(self):
+ with self.launch_action_command(arguments=['list']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/fibonacci'],
+ text=action_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_with_types(self):
+ with self.launch_action_command(arguments=['list', '-t']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'],
+ text=action_command.output, strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_count(self):
+ with self.launch_action_command(arguments=['list', '-c']) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ command_output_lines = action_command.output.splitlines()
+ assert len(command_output_lines) == 1
+ assert int(command_output_lines[0]) == 1
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_send_fibonacci_goal(self):
+ with self.launch_action_command(
+ arguments=[
+ 'send_goal',
+ '/fibonacci',
+ 'test_msgs/action/Fibonacci',
+ '{order: 5}'
+ ],
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=get_fibonacci_send_goal_output(order=5),
+ text=action_command.output, strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_send_fibonacci_goal_with_feedback(self):
+ with self.launch_action_command(
+ arguments=[
+ 'send_goal',
+ '-f',
+ '/fibonacci',
+ 'test_msgs/action/Fibonacci',
+ '{order: 5}'
+ ],
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=10)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=get_fibonacci_send_goal_output(
+ order=5, with_feedback=True
+ ),
+ text=action_command.output, strict=True
+ )
+
+ def test_show_fibonacci(self):
+ with self.launch_action_command(
+ arguments=['show', 'test_msgs/action/Fibonacci'],
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=2)
+ assert action_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'int32 order',
+ '---',
+ 'int32[] sequence',
+ '---',
+ 'int32[] sequence'
+ ],
+ text=action_command.output,
+ strict=False
+ )
+
+ def test_show_not_a_package(self):
+ with self.launch_action_command(
+ arguments=['show', 'not_a_package/action/Fibonacci'],
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=2)
+ assert action_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package name'],
+ text=action_command.output, strict=True
+ )
+
+ # TODO(hidmic): make 'ros2 action show' fail accordingly
+ # def test_show_not_an_action_ns(self):
+ # with self.launch_action_command(
+ # arguments=['show', 'test_msgs/foo/Fibonacci'],
+ # ) as action_command:
+ # assert action_command.wait_for_shutdown(timeout=2)
+ # assert action_command.exit_code == 1
+ # assert launch_testing.tools.expect_output(
+ # expected_lines=['Unknown action type'],
+ # text=action_command.output, strict=True
+ # )
+
+ def test_show_not_an_action_typename(self):
+ with self.launch_action_command(
+ arguments=['show', 'test_msgs/action/NotAnActionTypeName'],
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=2)
+ assert action_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown action type'],
+ text=action_command.output, strict=True
+ )
+
+ def test_show_not_an_action_type(self):
+ with self.launch_action_command(
+ arguments=['show', 'not_an_action_type']
+ ) as action_command:
+ assert action_command.wait_for_shutdown(timeout=2)
+ assert action_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['The passed action type is invalid'],
+ text=action_command.output, strict=True
+ )
diff --git a/ros2interface/package.xml b/ros2interface/package.xml
index 57d7e8a1f..fe0495a58 100644
--- a/ros2interface/package.xml
+++ b/ros2interface/package.xml
@@ -19,8 +19,10 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
std_msgs
std_srvs
+ test_msgs
ament_python
diff --git a/ros2interface/ros2interface/api/__init__.py b/ros2interface/ros2interface/api/__init__.py
index f7cb0f4ee..2c193854c 100644
--- a/ros2interface/ros2interface/api/__init__.py
+++ b/ros2interface/ros2interface/api/__init__.py
@@ -41,6 +41,8 @@ def get_interfaces(package_name):
def get_interface_path(parts):
prefix_path = has_resource('packages', parts[0])
+ if not prefix_path:
+ raise LookupError('Unknown package {}'.format(parts[0]))
joined = '/'.join(parts)
if len(parts[-1].rsplit('.', 1)) == 1:
joined += '.idl'
diff --git a/ros2interface/test/test_cli.py b/ros2interface/test/test_cli.py
index d161cc11f..07d3ff1c2 100644
--- a/ros2interface/test/test_cli.py
+++ b/ros2interface/test/test_cli.py
@@ -12,42 +12,350 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import subprocess
-
-
-def test_cli():
- packages_cmd = ['ros2', 'interface', 'packages']
- packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True)
- package_names = packages_result.stdout.decode().splitlines()
-
- # explicit dependencies of this package will for sure be available
- assert 'std_msgs' in package_names
-
- count = 0
- for package_name in package_names:
- package_cmd = ['ros2', 'interface', 'package', package_name]
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE, check=True)
- message_types = package_result.stdout.decode().splitlines()
- assert all(t.startswith(package_name + '/') for t in message_types)
- count += len(message_types)
-
- if package_name != 'std_msgs':
- continue
- for message_name in [t[len(package_name) + 1:] for t in message_types]:
- show_cmd = [
- 'ros2', 'interface', 'show', package_name + '/' + message_name]
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE, check=True)
- if message_name == 'String':
- assert show_result.stdout.rstrip() == b'string data'
-
- package_cmd = ['ros2', 'interface', 'package', '_not_existing_package_name']
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE)
- assert package_result.returncode
-
- show_cmd = ['ros2', 'interface', 'show', 'std_msgs/_not_existing_message_name']
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE)
- assert show_result.returncode
+import contextlib
+import itertools
+import re
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+
+import pytest
+
+
+some_messages_from_std_msgs = [
+ 'std_msgs/msg/Bool',
+ 'std_msgs/msg/Float32',
+ 'std_msgs/msg/Float64',
+]
+
+some_services_from_std_srvs = [
+ 'std_srvs/srv/Empty',
+ 'std_srvs/srv/SetBool',
+ 'std_srvs/srv/Trigger',
+]
+
+some_actions_from_test_msgs = [
+ 'test_msgs/action/Fibonacci'
+]
+
+some_interfaces = (
+ some_messages_from_std_msgs +
+ some_services_from_std_srvs +
+ some_actions_from_test_msgs
+)
+
+
+@pytest.mark.rostest
+@launch_testing.markers.keep_alive
+def generate_test_description(ready_fn):
+ return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())])
+
+
+class TestROS2MsgCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output
+ ):
+ @contextlib.contextmanager
+ def launch_interface_command(self, arguments):
+ interface_command_action = ExecuteProcess(
+ cmd=['ros2', 'interface', *arguments],
+ additional_env={'PYTHONUNBUFFERED': '1'},
+ name='ros2interface-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, interface_command_action, proc_info, proc_output
+ ) as interface_command:
+ yield interface_command
+ cls.launch_interface_command = launch_interface_command
+
+ def test_list_interfaces(self):
+ with self.launch_interface_command(arguments=['list']) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ filter_ = launch_testing.tools.basic_output_filter(
+ filtered_prefixes=['Messages:', 'Services:', 'Actions:']
+ )
+ output_lines = filter_(interface_command.output).splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.repeat(
+ re.compile(r'\s*[A-z0-9_]+/(msg|srv|action)/[A-z0-9_]+'), len(output_lines)
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert launch_testing.tools.expect_output(
+ expected_lines=some_interfaces,
+ lines=output_lines,
+ strict=False
+ )
+
+ def test_list_messages(self):
+ with self.launch_interface_command(arguments=['list', '-m']) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['Messages:'], itertools.repeat(
+ re.compile(r'\s*[A-z0-9_]+/msg/[A-z0-9_]+'), len(output_lines) - 1
+ )
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert launch_testing.tools.expect_output(
+ expected_lines=some_messages_from_std_msgs,
+ lines=output_lines,
+ strict=False
+ )
+
+ def test_list_services(self):
+ with self.launch_interface_command(arguments=['list', '-s']) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['Services:'], itertools.repeat(
+ re.compile(r'\s*[A-z0-9_]+/srv/[A-z0-9_]+'), len(output_lines) - 1
+ )
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert launch_testing.tools.expect_output(
+ expected_lines=some_services_from_std_srvs,
+ lines=output_lines,
+ strict=False
+ )
+
+ def test_list_actions(self):
+ with self.launch_interface_command(arguments=['list', '-a']) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['Actions:'], itertools.repeat(
+ re.compile(r'\s*[A-z0-9_]+/action/[A-z0-9_]+'), len(output_lines) - 1
+ )
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert launch_testing.tools.expect_output(
+ expected_lines=some_actions_from_test_msgs,
+ lines=output_lines,
+ strict=False
+ )
+
+ def test_package_on_nonexistent_package(self):
+ with self.launch_interface_command(
+ arguments=['package', 'not_a_package']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package not_a_package'],
+ text=interface_command.output,
+ strict=True
+ )
+
+ def test_package_on_std_msgs(self):
+ with self.launch_interface_command(
+ arguments=['package', 'std_msgs']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.repeat(
+ re.compile(r'std_msgs/msg/[A-z0-9_]+'), len(output_lines)
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert all(msg in output_lines for msg in some_messages_from_std_msgs)
+
+ def test_package_on_std_srvs(self):
+ with self.launch_interface_command(
+ arguments=['package', 'std_srvs']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.repeat(
+ re.compile(r'std_srvs/srv/[A-z0-9_]+'), len(output_lines)
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert all(srv in output_lines for srv in some_services_from_std_srvs)
+
+ def test_package_on_test_msgs(self):
+ with self.launch_interface_command(
+ arguments=['package', 'test_msgs']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.repeat(
+ re.compile(r'test_msgs/(msg|srv|action)/[A-z0-9_]+'), len(output_lines)
+ ),
+ lines=output_lines,
+ strict=True
+ )
+ assert all(action in output_lines for action in some_actions_from_test_msgs)
+
+ def test_packages(self):
+ with self.launch_interface_command(arguments=['packages']) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert 'std_msgs' in output_lines
+ assert 'std_srvs' in output_lines
+ assert 'test_msgs' in output_lines
+
+ def test_packages_with_messages(self):
+ with self.launch_interface_command(
+ arguments=['packages', '-m']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert 'std_msgs' in output_lines
+ assert 'std_srvs' not in output_lines
+ assert 'test_msgs' in output_lines
+
+ def test_packages_with_services(self):
+ with self.launch_interface_command(
+ arguments=['packages', '-s']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert 'std_msgs' not in output_lines
+ assert 'std_srvs' in output_lines
+ assert 'test_msgs' in output_lines
+
+ def test_packages_with_actions(self):
+ with self.launch_interface_command(
+ arguments=['packages', '-a']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = interface_command.output.splitlines()
+ assert 'std_msgs' not in output_lines
+ assert 'std_srvs' not in output_lines
+ assert 'test_msgs' in output_lines
+
+ def test_show_message(self):
+ with self.launch_interface_command(
+ arguments=['show', 'std_msgs/msg/String']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'module std_msgs {',
+ ' module msg {',
+ ' struct String {',
+ ' string data;',
+ ' };',
+ ' };',
+ '};'
+ ],
+ text=interface_command.output,
+ strict=False
+ )
+
+ def test_show_service(self):
+ with self.launch_interface_command(
+ arguments=['show', 'std_srvs/srv/SetBool']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'module std_srvs {',
+ ' module srv {',
+ ' struct SetBool_Request {',
+ ' boolean data;',
+ ' };',
+ ' struct SetBool_Response {',
+ ' boolean success;',
+ ' string message;',
+ ' };',
+ ' };',
+ '};'
+ ],
+ text=interface_command.output,
+ strict=False
+ )
+
+ def test_show_action(self):
+ with self.launch_interface_command(
+ arguments=['show', 'test_msgs/action/Fibonacci']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'module test_msgs {',
+ ' module action {',
+ ' struct Fibonacci_Goal {',
+ ' int32 order;',
+ ' };',
+ ' struct Fibonacci_Result {',
+ ' sequence sequence;',
+ ' };',
+ ' struct Fibonacci_Feedback {',
+ ' sequence sequence;',
+ ' };',
+ ' };',
+ '};'
+ ],
+ text=interface_command.output,
+ strict=False
+ )
+
+ def test_show_not_a_package(self):
+ with self.launch_interface_command(
+ arguments=['show', 'not_a_package/msg/NotAMessageTypeName']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package not_a_package'],
+ text=interface_command.output,
+ strict=True
+ )
+
+ def test_show_not_an_interface(self):
+ with self.launch_interface_command(
+ arguments=['show', 'std_msgs/msg/NotAMessageTypeName']
+ ) as interface_command:
+ assert interface_command.wait_for_shutdown(timeout=2)
+ assert interface_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=[re.compile(
+ r"Could not find the interface '.+NotAMessageTypeName\.idl'"
+ )],
+ text=interface_command.output,
+ strict=True
+ )
diff --git a/ros2msg/package.xml b/ros2msg/package.xml
index f8a5139de..9cc7e38ec 100644
--- a/ros2msg/package.xml
+++ b/ros2msg/package.xml
@@ -18,6 +18,7 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
std_msgs
std_srvs
diff --git a/ros2msg/test/test_cli.py b/ros2msg/test/test_cli.py
index 8bd92f7d2..142464e79 100644
--- a/ros2msg/test/test_cli.py
+++ b/ros2msg/test/test_cli.py
@@ -1,4 +1,4 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
+# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,47 +12,140 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import subprocess
-
-
-def test_cli():
- packages_cmd = ['ros2', 'msg', 'packages']
- packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True)
- package_names = packages_result.stdout.decode().splitlines()
-
- # explicit dependencies of this package will for sure be available
- assert 'std_msgs' in package_names
-
- count = 0
- for package_name in package_names:
- package_cmd = ['ros2', 'msg', 'package', package_name]
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE, check=True)
- message_types = package_result.stdout.decode().splitlines()
- assert all(t.startswith(package_name + '/') for t in message_types)
- count += len(message_types)
-
- if package_name != 'std_msgs':
- continue
- for message_name in [t[len(package_name) + 1:] for t in message_types]:
- show_cmd = [
- 'ros2', 'msg', 'show', package_name + '/' + message_name]
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE, check=True)
- if message_name == 'String':
- assert show_result.stdout.rstrip() == b'string data'
-
- list_cmd = ['ros2', 'msg', 'list']
- list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True)
- message_types = list_result.stdout.decode().splitlines()
- assert len(message_types) == count
-
- package_cmd = ['ros2', 'msg', 'package', '_not_existing_package_name']
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE)
- assert package_result.returncode
-
- show_cmd = ['ros2', 'msg', 'show', 'std_msgs/_not_existing_message_name']
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE)
- assert show_result.returncode
+import contextlib
+import re
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+
+import pytest
+
+
+@pytest.mark.rostest
+@launch_testing.markers.keep_alive
+def generate_test_description(ready_fn):
+ return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())])
+
+
+some_messages_from_std_msgs = [
+ 'std_msgs/msg/Bool',
+ 'std_msgs/msg/Float32',
+ 'std_msgs/msg/Float64',
+]
+
+
+class TestROS2MsgCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output
+ ):
+ @contextlib.contextmanager
+ def launch_msg_command(self, arguments):
+ msg_command_action = ExecuteProcess(
+ cmd=['ros2', 'msg', *arguments],
+ additional_env={'PYTHONUNBUFFERED': '1'},
+ name='ros2msg-cli', output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, msg_command_action, proc_info, proc_output
+ ) as msg_command:
+ yield msg_command
+ cls.launch_msg_command = launch_msg_command
+
+ def test_list_messages(self):
+ with self.launch_msg_command(arguments=['list']) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = msg_command.output.splitlines()
+ assert all(msg in output_lines for msg in some_messages_from_std_msgs)
+ assert all(re.match(r'.*/msg/.*', line) is not None for line in output_lines)
+
+ def test_package_messages(self):
+ with self.launch_msg_command(arguments=['package', 'std_msgs']) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = msg_command.output.splitlines()
+ assert all(msg in output_lines for msg in some_messages_from_std_msgs)
+ assert all(re.match(r'std_msgs/msg/.*', line) is not None for line in output_lines)
+
+ def test_not_a_package_messages(self):
+ with self.launch_msg_command(arguments=['package', 'not_a_package']) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package name'],
+ text=msg_command.output, strict=True
+ )
+
+ def test_list_packages_with_messages(self):
+ with self.launch_msg_command(arguments=['packages']) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert 'std_msgs' in msg_command.output.splitlines()
+
+ def test_show_message(self):
+ with self.launch_msg_command(
+ arguments=['show', 'std_msgs/msg/String']
+ ) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['string data'],
+ text=msg_command.output, strict=False
+ )
+
+ def test_show_not_a_message_typename(self):
+ with self.launch_msg_command(
+ arguments=['show', 'std_msgs/msg/NotAMessageTypeName']
+ ) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown message name'],
+ text=msg_command.output, strict=True
+ )
+
+ # TODO(hidmic): make 'ros2 msg show' fail accordingly
+ # def test_show_not_a_message_ns(self):
+ # with self.launch_msg_command(
+ # arguments=['show', 'std_msgs/foo/String']
+ # ) as msg_command:
+ # assert msg_command.wait_for_shutdown(timeout=10)
+ # assert msg_command.exit_code == 1
+ # assert launch_testing.tools.expect_output(
+ # expected_lines=['Unknown message name'],
+ # text=msg_command.output, strict=True
+ # )
+
+ def test_show_not_a_package(self):
+ with self.launch_msg_command(
+ arguments=['show', 'not_a_package/msg/String']
+ ) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package name'],
+ text=msg_command.output, strict=True
+ )
+
+ def test_show_not_a_message_type(self):
+ with self.launch_msg_command(
+ arguments=['show', 'not_a_message_type']
+ ) as msg_command:
+ assert msg_command.wait_for_shutdown(timeout=10)
+ assert msg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['The passed message type is invalid'],
+ text=msg_command.output, strict=True
+ )
diff --git a/ros2node/package.xml b/ros2node/package.xml
index 99b76a2c4..3cdce7134 100644
--- a/ros2node/package.xml
+++ b/ros2node/package.xml
@@ -16,6 +16,9 @@
ament_pep257
ament_xmllint
python3-pytest
+ rclpy
+ ros_testing
+ test_msgs
ament_python
diff --git a/ros2node/test/fixtures/complex_node.py b/ros2node/test/fixtures/complex_node.py
new file mode 100644
index 000000000..d715e218b
--- /dev/null
+++ b/ros2node/test/fixtures/complex_node.py
@@ -0,0 +1,76 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import rclpy
+from rclpy.action import ActionServer
+from rclpy.node import Node
+from rclpy.qos import qos_profile_system_default
+
+from test_msgs.action import Fibonacci
+from test_msgs.msg import Arrays
+from test_msgs.msg import Strings
+from test_msgs.srv import BasicTypes
+
+
+class ComplexNode(Node):
+
+ def __init__(self):
+ super().__init__('complex_node')
+ self.publisher = self.create_publisher(Arrays, 'arrays', qos_profile_system_default)
+ self.subscription = self.create_subscription(
+ Strings, 'strings', lambda msg: None, qos_profile_system_default
+ )
+ self.server = self.create_service(BasicTypes, 'basic', lambda req, res: res)
+ self.action_server = ActionServer(
+ self, Fibonacci, 'fibonacci', self.action_callback
+ )
+ self.timer = self.create_timer(1.0, self.pub_callback)
+
+ def destroy_node(self):
+ self.timer.destroy()
+ self.publisher.destroy()
+ self.subscription.destroy()
+ self.server.destroy()
+ self.action_server.destroy()
+ super().destroy_node()
+
+ def pub_callback(self):
+ self.publisher.publish(Arrays())
+
+ def action_callback(self, goal_handle):
+ goal_handle.succeed()
+ return Fibonacci.Result()
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = ComplexNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('node stopped cleanly')
+ except BaseException:
+ print('exception in node:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2node/test/test_cli.py b/ros2node/test/test_cli.py
new file mode 100644
index 000000000..a1f6e254c
--- /dev/null
+++ b/ros2node/test/test_cli.py
@@ -0,0 +1,181 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import itertools
+import os
+import re
+import sys
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+from launch_ros.actions import Node
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
+from rmw_implementation import get_available_rmw_implementations
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation, ready_fn):
+ path_to_complex_node_script = os.path.join(
+ os.path.dirname(__file__), 'fixtures', 'complex_node.py'
+ )
+ additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+ return LaunchDescription([
+ # Always restart daemon to isolate tests.
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=[
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=[
+ # Add test fixture actions.
+ Node(
+ node_executable=sys.executable,
+ arguments=[path_to_complex_node_script],
+ node_name='complex_node',
+ additional_env=additional_env
+ ),
+ Node(
+ node_executable=sys.executable,
+ arguments=[path_to_complex_node_script],
+ node_name='_hidden_complex_node',
+ additional_env=additional_env
+ ),
+ OpaqueFunction(function=lambda context: ready_fn())
+ ],
+ additional_env=additional_env
+ )
+ ]
+ ),
+ ])
+
+
+class TestROS2NodeCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation
+ ):
+ @contextlib.contextmanager
+ def launch_node_command(self, arguments):
+ node_command_action = ExecuteProcess(
+ cmd=['ros2', 'node', *arguments],
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ 'PYTHONUNBUFFERED': '1'
+ },
+ name='ros2node-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, node_command_action, proc_info, proc_output,
+ output_filter=launch_testing_ros.tools.basic_output_filter(
+ # ignore launch_ros and ros2cli daemon nodes
+ filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'],
+ filtered_rmw_implementation=rmw_implementation
+ )
+ ) as node_command:
+ yield node_command
+ cls.launch_node_command = launch_node_command
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_nodes(self):
+ with self.launch_node_command(arguments=['list']) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/complex_node'],
+ text=node_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_all_nodes(self):
+ with self.launch_node_command(arguments=['list', '-a']) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ '/_hidden_complex_node',
+ '/complex_node'
+ ],
+ text=node_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_node_count(self):
+ with self.launch_node_command(arguments=['list', '-c']) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = node_command.output.splitlines()
+ assert len(output_lines) == 1
+ # Fixture nodes that are not hidden plus launch_ros node.
+ assert int(output_lines[0]) == 2
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_all_nodes_count(self):
+ with self.launch_node_command(arguments=['list', '-c', '-a']) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = node_command.output.splitlines()
+ assert len(output_lines) == 1
+ # All fixture nodes plus launch_ros and ros2cli daemon nodes.
+ assert int(output_lines[0]) == 4
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_info_node(self):
+ with self.launch_node_command(arguments=['info', '/complex_node']) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ # TODO(hidmic): only optionally show hidden topics and services
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain([
+ '/complex_node',
+ ' Subscribers:',
+ ' /strings: test_msgs/msg/Strings',
+ ' Publishers:',
+ ' /arrays: test_msgs/msg/Arrays',
+ ' /parameter_events: rcl_interfaces/msg/ParameterEvent',
+ ' /rosout: rcl_interfaces/msg/Log',
+ ' Services:',
+ ' /basic: test_msgs/srv/BasicTypes',
+ ], itertools.repeat(re.compile(
+ r'\s*/complex_node/.*parameter.*: rcl_interfaces/srv/.*Parameter.*'
+ ), 6), [
+ ' Actions Servers:',
+ ' /fibonacci: test_msgs/action/Fibonacci',
+ ' Actions Clients:',
+ ''
+ ]),
+ text=node_command.output, strict=False
+ ), 'Output does not match:\n' + node_command.output
diff --git a/ros2pkg/package.xml b/ros2pkg/package.xml
index 92a2b50d6..816622f59 100644
--- a/ros2pkg/package.xml
+++ b/ros2pkg/package.xml
@@ -21,6 +21,7 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
ament_python
diff --git a/ros2pkg/test/test_cli.py b/ros2pkg/test/test_cli.py
index 0a71854fa..291e14416 100644
--- a/ros2pkg/test/test_cli.py
+++ b/ros2pkg/test/test_cli.py
@@ -12,26 +12,175 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
import os
-import subprocess
+import tempfile
+import unittest
+import xml.etree.ElementTree as ET
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
-def test_cli():
- list_cmd = ['ros2', 'pkg', 'list']
- list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True)
- package_names = list_result.stdout.decode().splitlines()
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
- # explicit dependencies of this package will for sure be available
- assert 'ros2cli' in package_names
+import pytest
- prefix_cmd = ['ros2', 'pkg', 'prefix', 'ros2cli']
- prefix_result = subprocess.run(
- prefix_cmd, stdout=subprocess.PIPE, check=True)
- prefix_path = prefix_result.stdout.decode().splitlines()
- assert len(prefix_path) == 1
- assert os.path.isdir(prefix_path[0])
- prefix_cmd = ['ros2', 'pkg', 'prefix', '_not_existing_package_name']
- prefix_result = subprocess.run(
- prefix_cmd, stdout=subprocess.PIPE)
- assert prefix_result.returncode
+some_cli_packages = [
+ 'ros2cli',
+ 'ros2pkg'
+]
+
+
+@pytest.mark.rostest
+@launch_testing.markers.keep_alive
+def generate_test_description(ready_fn):
+ return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())])
+
+
+class TestROS2PkgCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output
+ ):
+ @contextlib.contextmanager
+ def launch_pkg_command(self, arguments, **kwargs):
+ pkg_command_action = ExecuteProcess(
+ cmd=['ros2', 'pkg', *arguments],
+ additional_env={'PYTHONUNBUFFERED': '1'},
+ name='ros2pkg-cli',
+ output='screen',
+ **kwargs
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, pkg_command_action, proc_info, proc_output
+ ) as pkg_command:
+ yield pkg_command
+ cls.launch_pkg_command = launch_pkg_command
+
+ def test_list_packages(self):
+ with self.launch_pkg_command(arguments=['list']) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=2)
+ assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = pkg_command.output.splitlines()
+ assert all(pkg in output_lines for pkg in some_cli_packages)
+
+ def test_package_prefix(self):
+ with self.launch_pkg_command(arguments=['prefix', 'ros2cli']) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=2)
+ assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = pkg_command.output.splitlines()
+ assert len(output_lines) == 1
+ prefix_path = output_lines[0]
+ assert os.path.isdir(prefix_path)
+
+ def test_not_a_package_prefix(self):
+ with self.launch_pkg_command(arguments=['prefix', 'not_a_package']) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=2)
+ assert pkg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Package not found'],
+ text=pkg_command.output,
+ strict=True
+ )
+
+ def test_xml(self):
+ with self.launch_pkg_command(arguments=['xml', 'ros2cli']) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=2)
+ assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK
+ root = ET.XML(pkg_command.output)
+ assert root.tag == 'package'
+ assert root.find('name').text == 'ros2cli'
+
+ def test_not_a_package_xml(self):
+ with self.launch_pkg_command(arguments=['xml', 'not_a_package']) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=2)
+ assert pkg_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Package not found'],
+ text=pkg_command.output,
+ strict=True
+ )
+
+ def test_create_package(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ with self.launch_pkg_command(
+ arguments=[
+ 'create', 'a_test_package',
+ '--package-format', '3',
+ '--description', 'A test package dummy description',
+ '--license', 'Apache License 2.0',
+ '--build-type', 'ament_cmake',
+ '--dependencies', 'ros2pkg',
+ '--maintainer-email', 'nobody@nowhere.com',
+ '--maintainer-name', 'Nobody',
+ '--node-name', 'test_node',
+ '--library-name', 'test_library'
+ ], cwd=tmpdir
+ ) as pkg_command:
+ assert pkg_command.wait_for_shutdown(timeout=5)
+ assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'going to create a new package',
+ 'package name: a_test_package',
+ 'destination directory: ' + tmpdir,
+ 'package format: 3',
+ 'version: 0.0.0',
+ 'description: A test package dummy description',
+ "maintainer: ['Nobody ']",
+ "licenses: ['Apache License 2.0']",
+ 'build type: ament_cmake',
+ "dependencies: ['ros2pkg']",
+ 'node_name: test_node',
+ 'library_name: test_library',
+ 'creating folder ./a_test_package',
+ 'creating ./a_test_package/package.xml',
+ 'creating source and include folder',
+ 'creating folder ./a_test_package/src',
+ 'creating folder ./a_test_package/include/a_test_package',
+ 'creating ./a_test_package/CMakeLists.txt',
+ 'creating ./a_test_package/src/test_node.cpp',
+ 'creating ./a_test_package/include/a_test_package/test_library.hpp',
+ 'creating ./a_test_package/src/test_library.cpp',
+ 'creating ./a_test_package/include/a_test_package/visibility_control.h',
+ ],
+ text=pkg_command.output,
+ strict=True
+ )
+ # Check layout
+ assert os.path.isdir(os.path.join(tmpdir, 'a_test_package'))
+ assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'package.xml'))
+ assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'CMakeLists.txt'))
+ assert os.path.isfile(
+ os.path.join(tmpdir, 'a_test_package', 'src', 'test_node.cpp')
+ )
+ assert os.path.isfile(
+ os.path.join(tmpdir, 'a_test_package', 'src', 'test_library.cpp')
+ )
+ assert os.path.isfile(os.path.join(
+ tmpdir, 'a_test_package', 'include', 'a_test_package', 'test_library.hpp'
+ ))
+ assert os.path.isfile(os.path.join(
+ tmpdir, 'a_test_package', 'include', 'a_test_package', 'visibility_control.h'
+ ))
+ # Check package.xml
+ tree = ET.parse(os.path.join(tmpdir, 'a_test_package', 'package.xml'))
+ root = tree.getroot()
+ assert root.tag == 'package'
+ assert root.attrib['format'] == '3'
+ assert root.find('name').text == 'a_test_package'
+ assert root.find('description').text == 'A test package dummy description'
+ assert root.find('maintainer').text == 'Nobody'
+ assert root.find('maintainer').attrib['email'] == 'nobody@nowhere.com'
+ assert root.find('license').text == 'Apache License 2.0'
+ assert root.find('depend').text == 'ros2pkg'
+ assert root.find('.//build_type').text == 'ament_cmake'
diff --git a/ros2service/package.xml b/ros2service/package.xml
index e162f1e69..54e10df23 100644
--- a/ros2service/package.xml
+++ b/ros2service/package.xml
@@ -21,6 +21,8 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
+ test_msgs
ament_python
diff --git a/ros2service/test/fixtures/echo_server.py b/ros2service/test/fixtures/echo_server.py
new file mode 100644
index 000000000..ca3584d7c
--- /dev/null
+++ b/ros2service/test/fixtures/echo_server.py
@@ -0,0 +1,54 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import rclpy
+from rclpy.node import Node
+
+from test_msgs.srv import BasicTypes
+
+
+class EchoServer(Node):
+
+ def __init__(self):
+ super().__init__('echo_server')
+ self.server = self.create_service(BasicTypes, 'echo', self.callback)
+
+ def callback(self, request, response):
+ for field_name in request.get_fields_and_field_types():
+ setattr(response, field_name, getattr(request, field_name))
+ return response
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = EchoServer()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('server stopped cleanly')
+ except BaseException:
+ print('exception in server:', file=sys.stderr)
+ raise
+ finally:
+ # Destroy the node explicitly
+ # (optional - Done automatically when node is garbage collected)
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2service/test/test_cli.py b/ros2service/test/test_cli.py
new file mode 100644
index 000000000..c1bbb5a77
--- /dev/null
+++ b/ros2service/test/test_cli.py
@@ -0,0 +1,314 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import functools
+import itertools
+import os
+import re
+import sys
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+from launch_ros.actions import Node
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
+from rmw_implementation import get_available_rmw_implementations
+
+from test_msgs.srv import BasicTypes
+
+
+def get_echo_call_output(**kwargs):
+ request = BasicTypes.Request()
+ for field_name, field_value in kwargs.items():
+ setattr(request, field_name, field_value)
+ response = BasicTypes.Response()
+ for field_name, field_value in kwargs.items():
+ setattr(response, field_name, field_value)
+ return [
+ 'requester: making request: ' + repr(request),
+ '',
+ 'response:',
+ repr(response),
+ ''
+ ]
+
+
+# TODO(hidmic): check Opensplice service types
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation, ready_fn):
+ path_to_echo_server_script = os.path.join(
+ os.path.dirname(__file__), 'fixtures', 'echo_server.py'
+ )
+ additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
+ return LaunchDescription([
+ # Always restart daemon to isolate tests.
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=[
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=[
+ # Add test fixture actions.
+ Node(
+ node_executable=sys.executable,
+ arguments=[path_to_echo_server_script],
+ node_name='echo_server',
+ node_namespace='my_ns',
+ additional_env=additional_env,
+ ),
+ Node(
+ node_executable=sys.executable,
+ arguments=[path_to_echo_server_script],
+ node_name='_hidden_echo_server',
+ node_namespace='my_ns',
+ remappings=[('echo', '_echo')],
+ additional_env=additional_env,
+ ),
+ OpaqueFunction(function=lambda context: ready_fn())
+ ],
+ additional_env=additional_env
+ )
+ ]
+ ),
+ ])
+
+
+class TestROS2ServiceCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation
+ ):
+ @contextlib.contextmanager
+ def launch_service_command(self, arguments):
+ service_command_action = ExecuteProcess(
+ cmd=['ros2', 'service', *arguments],
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ 'PYTHONUNBUFFERED': '1'
+ },
+ name='ros2service-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, service_command_action, proc_info, proc_output,
+ output_filter=launch_testing_ros.tools.basic_output_filter(
+ filtered_prefixes=[
+ 'waiting for service to become available...',
+ '/launch_ros' # cope with launch_ros internal node.
+ ],
+ filtered_rmw_implementation=rmw_implementation
+ )
+ ) as service_command:
+ yield service_command
+ cls.launch_service_command = launch_service_command
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_services(self):
+ with self.launch_service_command(arguments=['list']) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['/my_ns/echo'],
+ itertools.repeat(re.compile(
+ r'/my_ns/echo_server/.*parameter.*'
+ ), 6)
+ ),
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_hidden(self):
+ with self.launch_service_command(
+ arguments=['--include-hidden-services', 'list']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['/my_ns/_echo'],
+ itertools.repeat(re.compile(
+ r'/my_ns/_hidden_echo_server/.*parameter.*'
+ ), 6),
+ ['/my_ns/echo'],
+ itertools.repeat(re.compile(
+ r'/my_ns/echo_server/.*parameter.*'
+ ), 6)
+ ),
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_with_types(self):
+ with self.launch_service_command(arguments=['list', '-t']) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain(
+ ['/my_ns/echo [test_msgs/srv/BasicTypes]'],
+ itertools.repeat(re.compile(
+ r'/my_ns/echo_server/.*parameter.*'
+ r' \[rcl_interfaces/srv/.*Parameter.*\]'
+ ), 6)
+ ),
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_count(self):
+ with self.launch_service_command(arguments=['list', '-c']) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = service_command.output.splitlines()
+ assert len(output_lines) == 1
+ assert int(output_lines[0]) == 7 + 6 # cope with launch_ros internal node.
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_find(self):
+ with self.launch_service_command(
+ arguments=['find', 'test_msgs/srv/BasicTypes']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/my_ns/echo'],
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_find_hidden(self):
+ with self.launch_service_command(
+ arguments=['find', '--include-hidden-services', 'test_msgs/srv/BasicTypes']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/my_ns/_echo', '/my_ns/echo'],
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_find_count(self):
+ with self.launch_service_command(
+ arguments=['find', '-c', 'test_msgs/srv/BasicTypes']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = service_command.output.splitlines()
+ assert len(output_lines) == 1
+ assert int(output_lines[0]) == 1
+
+ def test_find_not_a_service_type(self):
+ with self.launch_service_command(
+ arguments=['find', 'not_a_service_type']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert service_command.output == ''
+
+ def test_type(self):
+ with self.launch_service_command(
+ arguments=['type', '/my_ns/echo']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['test_msgs/srv/BasicTypes'],
+ text=service_command.output,
+ strict=True
+ )
+
+ def test_type_on_not_a_service(self):
+ with self.launch_service_command(
+ arguments=['type', '/not_a_service']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == 1
+ assert service_command.output == ''
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_call_no_args(self):
+ with self.launch_service_command(
+ arguments=['call', '/my_ns/echo', 'test_msgs/srv/BasicTypes']
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=get_echo_call_output(),
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_call(self):
+ with self.launch_service_command(
+ arguments=[
+ 'call',
+ '/my_ns/echo',
+ 'test_msgs/srv/BasicTypes',
+ '{bool_value: false, int32_value: -1, float64_value: 0.1, string_value: bazbar}'
+ ]
+ ) as service_command:
+ assert service_command.wait_for_shutdown(timeout=10)
+ assert service_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=get_echo_call_output(
+ bool_value=False,
+ int32_value=-1,
+ float64_value=0.1,
+ string_value='bazbar'
+ ),
+ text=service_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_repeated_call(self):
+ with self.launch_service_command(
+ arguments=[
+ 'call',
+ '-r', '1',
+ '/my_ns/echo',
+ 'test_msgs/srv/BasicTypes',
+ '{bool_value: true, int32_value: 1, float64_value: 1.0, string_value: foobar}'
+ ],
+ ) as service_command:
+ assert service_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=2 * get_echo_call_output(
+ bool_value=True, int32_value=1, float64_value=1.0, string_value='foobar'
+ )
+ ), timeout=10)
diff --git a/ros2srv/package.xml b/ros2srv/package.xml
index 9b1a5ae15..b388bb379 100644
--- a/ros2srv/package.xml
+++ b/ros2srv/package.xml
@@ -18,6 +18,7 @@
ament_pep257
ament_xmllint
python3-pytest
+ ros_testing
std_msgs
std_srvs
diff --git a/ros2srv/test/test_cli.py b/ros2srv/test/test_cli.py
index de0906840..2c9e0edd9 100644
--- a/ros2srv/test/test_cli.py
+++ b/ros2srv/test/test_cli.py
@@ -1,4 +1,4 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
+# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,47 +12,166 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import subprocess
-
-
-def test_cli():
- packages_cmd = ['ros2', 'srv', 'packages']
- packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True)
- package_names = packages_result.stdout.decode().splitlines()
-
- # explicit dependencies of this package will for sure be available
- assert 'std_srvs' in package_names
-
- count = 0
- for package_name in package_names:
- package_cmd = ['ros2', 'srv', 'package', package_name]
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE, check=True)
- service_types = package_result.stdout.decode().splitlines()
- assert all(t.startswith(package_name + '/') for t in service_types)
- count += len(service_types)
-
- if package_name != 'std_srvs':
- continue
- for service_name in [t[len(package_name) + 1:] for t in service_types]:
- show_cmd = [
- 'ros2', 'srv', 'show', package_name + '/' + service_name]
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE, check=True)
- if service_name == 'Empty':
- assert show_result.stdout.rstrip() == b'---'
-
- list_cmd = ['ros2', 'srv', 'list']
- list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True)
- service_types = list_result.stdout.decode().splitlines()
- assert len(service_types) == count
-
- package_cmd = ['ros2', 'srv', 'package', '_not_existing_package_name']
- package_result = subprocess.run(
- package_cmd, stdout=subprocess.PIPE)
- assert package_result.returncode
-
- show_cmd = ['ros2', 'srv', 'show', 'std_srvs/_not_existing_service_name']
- show_result = subprocess.run(
- show_cmd, stdout=subprocess.PIPE)
- assert show_result.returncode
+import contextlib
+import re
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+
+import pytest
+
+
+@pytest.mark.rostest
+@launch_testing.markers.keep_alive
+def generate_test_description(ready_fn):
+ return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())])
+
+
+some_services_from_std_srvs = [
+ 'std_srvs/srv/Empty',
+ 'std_srvs/srv/SetBool',
+ 'std_srvs/srv/Trigger',
+]
+
+
+class TestROS2SrvCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output
+ ):
+ @contextlib.contextmanager
+ def launch_srv_command(self, arguments):
+ srv_command_action = ExecuteProcess(
+ cmd=['ros2', 'srv', *arguments],
+ additional_env={'PYTHONUNBUFFERED': '1'},
+ name='ros2srv-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, srv_command_action, proc_info, proc_output
+ ) as srv_command:
+ yield srv_command
+ cls.launch_srv_command = launch_srv_command
+
+ def test_list_service_types(self):
+ with self.launch_srv_command(arguments=['list']) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = srv_command.output.splitlines()
+ assert all(srv in output_lines for srv in some_services_from_std_srvs)
+ assert all(re.match(r'.*/srv/.*', line) is not None for line in output_lines)
+
+ def test_list_service_types_in_a_package(self):
+ with self.launch_srv_command(arguments=['package', 'std_srvs']) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = srv_command.output.splitlines()
+ assert all(srv in output_lines for srv in some_services_from_std_srvs)
+ assert all(re.match(r'std_srvs/srv/.*', line) is not None for line in output_lines)
+
+ def test_list_service_types_in_not_a_package(self):
+ with self.launch_srv_command(arguments=['package', 'not_a_package']) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package name'],
+ text=srv_command.output
+ )
+
+ def test_list_packages_with_service_types(self):
+ with self.launch_srv_command(arguments=['packages']) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert 'std_srvs' in srv_command.output.splitlines()
+
+ def test_show_service_type(self):
+ with self.launch_srv_command(
+ arguments=['show', 'std_srvs/srv/SetBool']
+ ) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'bool data',
+ '---',
+ 'bool success',
+ 'string message'
+ ],
+ text=srv_command.output,
+ strict=False
+ )
+
+ with self.launch_srv_command(
+ arguments=['show', 'std_srvs/srv/Trigger']
+ ) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ '---',
+ 'bool success',
+ 'string message'
+ ],
+ text=srv_command.output,
+ strict=False
+ )
+
+ def test_show_not_a_service_typename(self):
+ with self.launch_srv_command(
+ arguments=['show', 'std_srvs/srv/NotAServiceTypeName']
+ ) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown service name'],
+ text=srv_command.output,
+ strict=True
+ )
+
+ # TODO(hidmic): make 'ros2 srv show' more robust
+ # def test_show_not_a_service_ns(self):
+ # with self.launch_srv_command(
+ # arguments=['show', 'std_srvs/foo/Empty']
+ # ) as srv_command:
+ # assert srv_command.wait_for_shutdown(timeout=10)
+ # assert srv_command.exit_code == 1
+ # assert launch_testing.tools.expect_output(
+ # expected_lines=['Unknown service name'],
+ # text=srv_command.output,
+ # strict=True
+ # )
+
+ def test_show_not_a_package(self):
+ with self.launch_srv_command(
+ arguments=['show', 'not_a_package/srv/Empty']
+ ) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['Unknown package name'],
+ text=srv_command.output,
+ strict=True
+ )
+
+ def test_show_not_a_service_type(self):
+ with self.launch_srv_command(
+ arguments=['show', 'not_a_service_type']
+ ) as srv_command:
+ assert srv_command.wait_for_shutdown(timeout=10)
+ assert srv_command.exit_code == 1
+ assert launch_testing.tools.expect_output(
+ expected_lines=['The passed service type is invalid'],
+ text=srv_command.output,
+ strict=True
+ )
diff --git a/ros2topic/package.xml b/ros2topic/package.xml
index 5ddb75977..6d2c70b4b 100644
--- a/ros2topic/package.xml
+++ b/ros2topic/package.xml
@@ -22,7 +22,11 @@
ament_flake8
ament_pep257
ament_xmllint
+ geometry_msgs
python3-pytest
+ ros_testing
+ std_msgs
+ test_msgs
ament_python
diff --git a/ros2topic/test/fixtures/controller_node.py b/ros2topic/test/fixtures/controller_node.py
new file mode 100644
index 000000000..1dd926597
--- /dev/null
+++ b/ros2topic/test/fixtures/controller_node.py
@@ -0,0 +1,58 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from geometry_msgs.msg import TwistStamped
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import qos_profile_system_default
+
+
+class ControllerNode(Node):
+
+ def __init__(self):
+ super().__init__('controller')
+ self.pub = self.create_publisher(
+ TwistStamped, 'cmd_vel', qos_profile_system_default
+ )
+ self.tmr = self.create_timer(1.0, self.callback)
+
+ def callback(self):
+ msg = TwistStamped()
+ msg.header.stamp = self.get_clock().now().to_msg()
+ msg.twist.linear.x = 1.0
+ self.pub.publish(msg)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = ControllerNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('controller stopped cleanly')
+ except BaseException:
+ print('exception in controller:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2topic/test/fixtures/listener_node.py b/ros2topic/test/fixtures/listener_node.py
new file mode 100644
index 000000000..8ee4fddfa
--- /dev/null
+++ b/ros2topic/test/fixtures/listener_node.py
@@ -0,0 +1,54 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import qos_profile_system_default
+
+from std_msgs.msg import String
+
+
+class ListenerNode(Node):
+
+ def __init__(self):
+ super().__init__('listener')
+ self.sub = self.create_subscription(
+ String, 'chatter', self.callback, qos_profile_system_default
+ )
+
+ def callback(self, msg):
+ self.get_logger().info('I heard: [%s]' % msg.data)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = ListenerNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('listener stopped cleanly')
+ except BaseException:
+ print('exception in listener:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2topic/test/fixtures/repeater_node.py b/ros2topic/test/fixtures/repeater_node.py
new file mode 100644
index 000000000..8fd4a41a6
--- /dev/null
+++ b/ros2topic/test/fixtures/repeater_node.py
@@ -0,0 +1,73 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import sys
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import qos_profile_system_default
+from rclpy.utilities import remove_ros_args
+
+from ros2topic.api import import_message_type
+
+
+class RepeaterNode(Node):
+
+ def __init__(self, message_type):
+ super().__init__('repeater_node')
+ self.message_type = message_type
+ self.pub = self.create_publisher(
+ self.message_type, '~/output', qos_profile_system_default
+ )
+ self.tmr = self.create_timer(1.0, self.callback)
+
+ def callback(self):
+ self.pub.publish(self.message_type())
+
+
+def message_type(message_typename):
+ return import_message_type('~/output', message_typename)
+
+
+def parse_arguments(args=None):
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ 'message_type', type=message_type,
+ help='Message type for the repeater to publish.'
+ )
+ return parser.parse_args(args=remove_ros_args(args))
+
+
+def main(args=None):
+ parsed_args = parse_arguments(args=args)
+
+ rclpy.init(args=args)
+
+ node = RepeaterNode(message_type=parsed_args.message_type)
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('repeater stopped cleanly')
+ except BaseException:
+ print('exception in repeater:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main(args=sys.argv[1:])
diff --git a/ros2topic/test/fixtures/talker_node.py b/ros2topic/test/fixtures/talker_node.py
new file mode 100644
index 000000000..a29f6ecdc
--- /dev/null
+++ b/ros2topic/test/fixtures/talker_node.py
@@ -0,0 +1,54 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import rclpy
+from rclpy.node import Node
+
+from std_msgs.msg import String
+
+
+class TalkerNode(Node):
+
+ def __init__(self):
+ super().__init__('talker_node')
+ self.count = 1
+ self.pub = self.create_publisher(String, 'chatter', 10)
+ self.tmr = self.create_timer(1.0, self.callback)
+
+ def callback(self):
+ self.pub.publish(String(data='Hello World: {0}'.format(self.count)))
+ self.count += 1
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ node = TalkerNode()
+
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ print('talker stopped cleanly')
+ except BaseException:
+ print('exception in talker:', file=sys.stderr)
+ raise
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py
new file mode 100644
index 000000000..228283a64
--- /dev/null
+++ b/ros2topic/test/test_cli.py
@@ -0,0 +1,652 @@
+# Copyright 2019 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import functools
+import math
+import os
+import re
+import sys
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+from launch.actions import OpaqueFunction
+
+from launch_ros.actions import Node
+
+import launch_testing
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+import launch_testing_ros.tools
+
+import pytest
+
+from rmw_implementation import get_available_rmw_implementations
+
+
+@pytest.mark.rostest
+@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
+def generate_test_description(rmw_implementation, ready_fn):
+ path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
+ additional_env = {
+ 'RMW_IMPLEMENTATION': rmw_implementation, 'PYTHONUNBUFFERED': '1'
+ }
+
+ path_to_talker_node_script = os.path.join(path_to_fixtures, 'talker_node.py')
+ path_to_listener_node_script = os.path.join(path_to_fixtures, 'listener_node.py')
+
+ hidden_talker_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_talker_node_script],
+ remappings=[('chatter', '_hidden_chatter')],
+ additional_env=additional_env
+ )
+ talker_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_talker_node_script],
+ additional_env=additional_env
+ )
+ listener_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_listener_node_script],
+ remappings=[('chatter', 'chit_chatter')],
+ additional_env=additional_env
+ )
+
+ path_to_repeater_node_script = os.path.join(path_to_fixtures, 'repeater_node.py')
+
+ array_repeater_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_repeater_node_script, 'test_msgs/msg/Arrays'],
+ node_name='array_repeater',
+ remappings=[('/array_repeater/output', '/arrays')],
+ output='screen',
+ additional_env=additional_env
+ )
+ defaults_repeater_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_repeater_node_script, 'test_msgs/msg/Defaults'],
+ node_name='defaults_repeater',
+ remappings=[('/defaults_repeater/output', '/defaults')],
+ additional_env=additional_env,
+ )
+ bounded_sequences_repeater_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[
+ path_to_repeater_node_script, 'test_msgs/msg/BoundedSequences'
+ ],
+ node_name='bounded_sequences_repeater',
+ remappings=[('/bounded_sequences_repeater/output', '/bounded_sequences')],
+ additional_env=additional_env
+ )
+ unbounded_sequences_repeater_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[
+ path_to_repeater_node_script, 'test_msgs/msg/UnboundedSequences'
+ ],
+ node_name='unbounded_sequences_repeater',
+ remappings=[('/unbounded_sequences_repeater/output', '/unbounded_sequences')],
+ additional_env=additional_env
+ )
+
+ path_to_controller_node_script = os.path.join(path_to_fixtures, 'controller_node.py')
+
+ cmd_vel_controller_node_action = Node(
+ node_executable=sys.executable,
+ arguments=[path_to_controller_node_script],
+ additional_env=additional_env
+ )
+
+ return LaunchDescription([
+ # Always restart daemon to isolate tests.
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'stop'],
+ name='daemon-stop',
+ on_exit=[
+ ExecuteProcess(
+ cmd=['ros2', 'daemon', 'start'],
+ name='daemon-start',
+ on_exit=[
+ # Add talker/listener pair.
+ talker_node_action,
+ listener_node_action,
+ # Add hidden talker.
+ hidden_talker_node_action,
+ # Add topic repeaters.
+ array_repeater_node_action,
+ defaults_repeater_node_action,
+ bounded_sequences_repeater_node_action,
+ unbounded_sequences_repeater_node_action,
+ # Add stamped data publisher.
+ cmd_vel_controller_node_action,
+ OpaqueFunction(function=lambda context: ready_fn())
+ ],
+ additional_env=additional_env
+ )
+ ]
+ ),
+ ]), locals()
+
+
+class TestROS2TopicCLI(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(
+ cls,
+ launch_service,
+ proc_info,
+ proc_output,
+ rmw_implementation,
+ listener_node_action
+ ):
+ rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter(
+ filtered_patterns=['WARNING: topic .* does not appear to be published yet'],
+ filtered_rmw_implementation=rmw_implementation
+ )
+
+ @contextlib.contextmanager
+ def launch_topic_command(self, arguments):
+ topic_command_action = ExecuteProcess(
+ cmd=['ros2', 'topic', *arguments],
+ additional_env={
+ 'RMW_IMPLEMENTATION': rmw_implementation,
+ 'PYTHONUNBUFFERED': '1'
+ },
+ name='ros2topic-cli',
+ output='screen'
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, topic_command_action, proc_info, proc_output,
+ output_filter=rmw_implementation_filter
+ ) as topic_command:
+ yield topic_command
+ cls.launch_topic_command = launch_topic_command
+
+ cls.listener_node = launch_testing.tools.ProcessProxy(
+ listener_node_action, proc_info, proc_output,
+ output_filter=rmw_implementation_filter
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_topics(self):
+ with self.launch_topic_command(arguments=['list']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ '/arrays',
+ '/bounded_sequences',
+ '/chatter',
+ '/chit_chatter',
+ '/cmd_vel',
+ '/defaults',
+ '/parameter_events',
+ '/rosout',
+ '/unbounded_sequences',
+ ],
+ text=topic_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_all_topics(self):
+ with self.launch_topic_command(
+ arguments=['list', '--include-hidden-topics']
+ ) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ '/_hidden_chatter',
+ '/arrays',
+ '/bounded_sequences',
+ '/chatter',
+ '/chit_chatter',
+ '/cmd_vel',
+ '/defaults',
+ '/parameter_events',
+ '/rosout',
+ '/unbounded_sequences',
+ ],
+ text=topic_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_with_types(self):
+ with self.launch_topic_command(arguments=['list', '-t']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ '/arrays [test_msgs/msg/Arrays]',
+ '/bounded_sequences [test_msgs/msg/BoundedSequences]',
+ '/chatter [std_msgs/msg/String]',
+ '/chit_chatter [std_msgs/msg/String]',
+ '/cmd_vel [geometry_msgs/msg/TwistStamped]',
+ '/defaults [test_msgs/msg/Defaults]',
+ '/parameter_events [rcl_interfaces/msg/ParameterEvent]',
+ '/rosout [rcl_interfaces/msg/Log]',
+ '/unbounded_sequences [test_msgs/msg/UnboundedSequences]',
+ ],
+ text=topic_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_list_count(self):
+ with self.launch_topic_command(arguments=['list', '-c']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ output_lines = topic_command.output.splitlines()
+ assert len(output_lines) == 1
+ assert int(output_lines[0]) == 9
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_info(self):
+ with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Topic: /chatter',
+ 'Publisher count: 1',
+ 'Subscriber count: 0'
+ ],
+ text=topic_command.output,
+ strict=True
+ )
+
+ def test_info_on_unknown_topic(self):
+ with self.launch_topic_command(arguments=['info', '/unknown_topic']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=[
+ 'Topic: /unknown_topic',
+ 'Publisher count: 0',
+ 'Subscriber count: 0'
+ ],
+ text=topic_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_type(self):
+ with self.launch_topic_command(arguments=['type', '/chatter']) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['std_msgs/msg/String'],
+ text=topic_command.output,
+ strict=True
+ )
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_hidden_topic_type(self):
+ with self.launch_topic_command(
+ arguments=['type', '/_hidden_chatter']
+ ) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=10)
+ assert topic_command.exit_code == 1
+ assert topic_command.output == ''
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_find_topic_type(self):
+ with self.launch_topic_command(
+ arguments=['find', 'rcl_interfaces/msg/Log']
+ ) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=2)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=['/rosout'], text=topic_command.output, strict=True
+ )
+
+ def test_find_not_a_topic_typename(self):
+ with self.launch_topic_command(
+ arguments=['find', 'rcl_interfaces/msg/NotAMessageTypeName']
+ ) as topic_command:
+ assert topic_command.wait_for_shutdown(timeout=2)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert not topic_command.output
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_echo(self):
+ with self.launch_topic_command(
+ arguments=['echo', '/chatter']
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ re.compile(r"data: 'Hello World: \d+'"),
+ '---'
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_no_str_topic_echo(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--no-str', '/chatter']
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ re.compile(r"data: '>'"),
+ '---'
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_csv_topic_echo(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--csv', '/defaults']
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ "True,b'2',100,1.125,1.125,-50,200,-1000,2000,-30000,60000,-40000000,50000000"
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_no_arr_topic_echo_on_array_message(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--no-arr', '/arrays'],
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ "bool_values: ''",
+ "byte_values: ''",
+ "char_values: ''",
+ "float32_values: ''",
+ "float64_values: ''",
+ "int8_values: ''",
+ "uint8_values: ''",
+ "int16_values: ''",
+ "uint16_values: ''",
+ "int32_values: ''",
+ "uint32_values: ''",
+ "int64_values: ''",
+ "uint64_values: ''",
+ "string_values: ''",
+ "basic_types_values: ''",
+ "constants_values: ''",
+ "defaults_values: ''",
+ "bool_values_default: ''",
+ "byte_values_default: ''",
+ "char_values_default: ''",
+ "float32_values_default: ''",
+ "float64_values_default: ''",
+ "int8_values_default: ''",
+ "uint8_values_default: ''",
+ "int16_values_default: ''",
+ "uint16_values_default: ''",
+ "int32_values_default: ''",
+ "uint32_values_default: ''",
+ "int64_values_default: ''",
+ "uint64_values_default: ''",
+ "string_values_default: ''",
+ 'alignment_check: 0',
+ '---'
+ ], strict=False
+ ), timeout=5), 'Output does not match: ' + topic_command.output
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_no_arr_topic_echo_on_seq_message(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--no-arr', '/unbounded_sequences'],
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ "bool_values: ''",
+ "byte_values: ''",
+ "char_values: ''",
+ "float32_values: ''",
+ "float64_values: ''",
+ "int8_values: ''",
+ "uint8_values: ''",
+ "int16_values: ''",
+ "uint16_values: ''",
+ "int32_values: ''",
+ "uint32_values: ''",
+ "int64_values: ''",
+ "uint64_values: ''",
+ "string_values: ''",
+ "basic_types_values: ''",
+ "constants_values: ''",
+ "defaults_values: ''",
+ "bool_values_default: ''",
+ "byte_values_default: ''",
+ "char_values_default: ''",
+ "float32_values_default: ''",
+ "float64_values_default: ''",
+ "int8_values_default: ''",
+ "uint8_values_default: ''",
+ "int16_values_default: ''",
+ "uint16_values_default: ''",
+ "int32_values_default: ''",
+ "uint32_values_default: ''",
+ "int64_values_default: ''",
+ "uint64_values_default: ''",
+ "string_values_default: ''",
+ 'alignment_check: 0',
+ '---'
+ ], strict=True
+ ), timeout=5)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_no_arr_topic_echo_on_bounded_seq_message(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--no-arr', '/bounded_sequences'],
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ "bool_values: ''",
+ "byte_values: ''",
+ "char_values: ''",
+ "float32_values: ''",
+ "float64_values: ''",
+ "int8_values: ''",
+ "uint8_values: ''",
+ "int16_values: ''",
+ "uint16_values: ''",
+ "int32_values: ''",
+ "uint32_values: ''",
+ "int64_values: ''",
+ "uint64_values: ''",
+ "string_values: ''",
+ 'basic_types_values: '
+ "''",
+ "constants_values: ''",
+ "defaults_values: ''",
+ "bool_values_default: ''",
+ "byte_values_default: ''",
+ "char_values_default: ''",
+ "float32_values_default: ''",
+ "float64_values_default: ''",
+ "int8_values_default: ''",
+ "uint8_values_default: ''",
+ "int16_values_default: ''",
+ "uint16_values_default: ''",
+ "int32_values_default: ''",
+ "uint32_values_default: ''",
+ "int64_values_default: ''",
+ "uint64_values_default: ''",
+ "string_values_default: ''",
+ 'alignment_check: 0',
+ '---'
+ ], strict=True
+ ), timeout=5)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_truncate_length_topic_echo(self):
+ with self.launch_topic_command(
+ arguments=['echo', '--truncate-length', '5', '/chatter'],
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ re.compile(r'data: Hello...'),
+ '---'
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ def test_topic_pub(self):
+ with self.launch_topic_command(
+ arguments=['pub', '/chit_chatter', 'std_msgs/msg/String', '{data: foo}'],
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ 'publisher: beginning loop',
+ "publishing #1: std_msgs.msg.String(data='foo')",
+ ''
+ ], strict=True
+ ), timeout=10)
+ self.listener_node.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ '[INFO] [listener]: I heard: [foo]'
+ ] * 3, strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ def test_topic_pub_once(self):
+ with self.launch_topic_command(
+ arguments=[
+ 'pub', '--once',
+ '/chit_chatter',
+ 'std_msgs/msg/String',
+ '{data: bar}'
+ ]
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ 'publisher: beginning loop',
+ "publishing #1: std_msgs.msg.String(data='bar')",
+ ''
+ ], strict=True
+ ), timeout=5)
+ assert topic_command.wait_for_shutdown(timeout=5)
+ self.listener_node.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ '[INFO] [listener]: I heard: [bar]'
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
+
+ def test_topic_pub_print_every_two(self):
+ with self.launch_topic_command(
+ arguments=[
+ 'pub',
+ '-p', '2',
+ '/chit_chatter',
+ 'std_msgs/msg/String',
+ '{data: fizz}'
+ ]
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ 'publisher: beginning loop',
+ "publishing #2: std_msgs.msg.String(data='fizz')",
+ '',
+ "publishing #4: std_msgs.msg.String(data='fizz')",
+ ''
+ ], strict=True
+ ), timeout=6), 'Output does not match: ' + topic_command.output
+ self.listener_node.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ '[INFO] [listener]: I heard: [fizz]'
+ ], strict=True
+ ), timeout=5)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_delay(self):
+ average_delay_line_pattern = re.compile(r'average delay: (\d+.\d{3})')
+ stats_line_pattern = re.compile(
+ r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+'
+ )
+ with self.launch_topic_command(arguments=['delay', '/cmd_vel']) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ average_delay_line_pattern, stats_line_pattern
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ head_line = topic_command.output.splitlines()[0]
+ average_delay = float(average_delay_line_pattern.match(head_line).group(1))
+ assert math.isclose(average_delay, 0.0, abs_tol=10e-3)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_hz(self):
+ average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})')
+ stats_line_pattern = re.compile(
+ r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+'
+ )
+ with self.launch_topic_command(arguments=['hz', '/chatter']) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ average_rate_line_pattern, stats_line_pattern
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ head_line = topic_command.output.splitlines()[0]
+ average_rate = float(average_rate_line_pattern.match(head_line).group(1))
+ assert math.isclose(average_rate, 1., rel_tol=1e-3)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_filtered_topic_hz(self):
+ average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})')
+ stats_line_pattern = re.compile(
+ r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+'
+ )
+ with self.launch_topic_command(
+ arguments=[
+ 'hz',
+ '--filter',
+ 'int(m.data.rpartition(\":\")[-1]) % 2 == 0',
+ '/chatter'
+ ]
+ ) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ average_rate_line_pattern, stats_line_pattern
+ ], strict=True
+ ), timeout=10), 'Output does not match: ' + topic_command.output
+ assert topic_command.wait_for_shutdown(timeout=5)
+
+ head_line = topic_command.output.splitlines()[0]
+ average_rate = float(average_rate_line_pattern.match(head_line).group(1))
+ assert math.isclose(average_rate, 0.5, rel_tol=1e-3)
+
+ @launch_testing.markers.retry_on_failure(times=5)
+ def test_topic_bw(self):
+ with self.launch_topic_command(arguments=['bw', '/defaults']) as topic_command:
+ assert topic_command.wait_for_output(functools.partial(
+ launch_testing.tools.expect_output, expected_lines=[
+ 'Subscribed to [/defaults]',
+ re.compile(r'average: \d{2}\.\d{2}B/s'),
+ re.compile(
+ r'\s*mean: \d{2}\.\d{2}B/s min: \d{2}\.\d{2}B/s'
+ r' max: \d{2}\.\d{2}B/s window: \d+'
+ )
+ ], strict=True
+ ), timeout=10)
+ assert topic_command.wait_for_shutdown(timeout=5)
diff --git a/test_ros2cli/COLCON_IGNORE b/test_ros2cli/COLCON_IGNORE
new file mode 100644
index 000000000..e69de29bb
diff --git a/test_ros2cli/test/ros2action_test_configuration.py b/test_ros2cli/test/ros2action_test_configuration.py
index a6f9f5e16..a640ce41a 100644
--- a/test_ros2cli/test/ros2action_test_configuration.py
+++ b/test_ros2cli/test/ros2action_test_configuration.py
@@ -195,3 +195,4 @@ def get_test_configurations(rmw_implementation):
exit_codes=[1]
),
]
+
diff --git a/test_ros2cli/test/ros2topic_test_configuration.py b/test_ros2cli/test/ros2topic_test_configuration.py
index c9dd60a68..43257aa37 100644
--- a/test_ros2cli/test/ros2topic_test_configuration.py
+++ b/test_ros2cli/test/ros2topic_test_configuration.py
@@ -155,12 +155,6 @@ def get_test_configurations(rmw_implementation):
fixture_actions=[get_talker_node_action()],
expected_output=None,
),
- CLITestConfiguration(
- command='topic',
- arguments=['find', 'rcl_interfaces/msg/NotAMessageType'],
- fixture_actions=[get_talker_node_action()],
- expected_output=None,
- ),
CLITestConfiguration(
command='topic',
arguments=['echo', '/my_ns/chatter'],
@@ -385,7 +379,7 @@ def get_test_configurations(rmw_implementation):
),
CLITestConfiguration(
command='topic',
- arguments=['pub', '--once', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test}'],
+ arguments=['pub', '--once', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test_once}'],
expected_output=[
'publisher: beginning loop',
re.compile(r"publishing #1: std_msgs\.msg\.String\(data='test'\)"),