-
Notifications
You must be signed in to change notification settings - Fork 170
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
End-to-end test coverage for CLI commands output (#304)
* Add end-to-end CLI output tests for ros2: - ros2action - ros2service - ros2topic - ros2msg - ros2srv - ros2interface - ros2node - ros2pkg Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
- Loading branch information
Showing
29 changed files
with
2,754 additions
and
583 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2019 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License.import time | ||
|
||
import sys | ||
|
||
import rclpy | ||
from rclpy.action import ActionServer | ||
from rclpy.node import Node | ||
|
||
from test_msgs.action import Fibonacci | ||
|
||
|
||
class FibonacciActionServer(Node): | ||
|
||
def __init__(self): | ||
super().__init__('fibonacci_action_server') | ||
self._action_server = ActionServer( | ||
self, | ||
Fibonacci, | ||
'fibonacci', | ||
self.execute_callback) | ||
|
||
def destroy_node(self): | ||
self._action_server.destroy() | ||
super().destroy_node() | ||
|
||
def execute_callback(self, goal_handle): | ||
feedback = Fibonacci.Feedback() | ||
feedback.sequence = [0, 1] | ||
|
||
for i in range(1, goal_handle.request.order): | ||
feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1]) | ||
goal_handle.publish_feedback(feedback) | ||
|
||
goal_handle.succeed() | ||
|
||
result = Fibonacci.Result() | ||
result.sequence = feedback.sequence | ||
return result | ||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
|
||
node = FibonacciActionServer() | ||
try: | ||
rclpy.spin(node) | ||
except KeyboardInterrupt: | ||
print('server stopped cleanly') | ||
except BaseException: | ||
print('exception in server:', file=sys.stderr) | ||
raise | ||
finally: | ||
node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,313 @@ | ||
# Copyright 2019 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import contextlib | ||
import os | ||
import re | ||
import sys | ||
import unittest | ||
|
||
from launch import LaunchDescription | ||
from launch.actions import ExecuteProcess | ||
from launch.actions import OpaqueFunction | ||
|
||
import launch_testing | ||
import launch_testing.asserts | ||
import launch_testing.markers | ||
import launch_testing.tools | ||
import launch_testing_ros.tools | ||
|
||
import pytest | ||
|
||
from rmw_implementation import get_available_rmw_implementations | ||
|
||
import yaml | ||
|
||
|
||
@pytest.mark.rostest | ||
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) | ||
def generate_test_description(rmw_implementation, ready_fn): | ||
path_to_action_server_executable = os.path.join( | ||
os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py' | ||
) | ||
return LaunchDescription([ | ||
# Always restart daemon to isolate tests. | ||
ExecuteProcess( | ||
cmd=['ros2', 'daemon', 'stop'], | ||
name='daemon-stop', | ||
on_exit=[ | ||
ExecuteProcess( | ||
cmd=['ros2', 'daemon', 'start'], | ||
name='daemon-start', | ||
on_exit=[ | ||
ExecuteProcess( | ||
cmd=[sys.executable, path_to_action_server_executable], | ||
additional_env={'RMW_IMPLEMENTATION': rmw_implementation} | ||
), | ||
OpaqueFunction(function=lambda context: ready_fn()) | ||
], | ||
additional_env={'RMW_IMPLEMENTATION': rmw_implementation} | ||
) | ||
] | ||
), | ||
]) | ||
|
||
|
||
def get_fibonacci_send_goal_output(*, order=1, with_feedback=False): | ||
assert order > 0 | ||
output = [ | ||
'Waiting for an action server to become available...', | ||
'Sending goal:', | ||
' order: {}'.format(order), | ||
'', | ||
re.compile('Goal accepted with ID: [a-f0-9]+'), | ||
'', | ||
] | ||
sequence = [0, 1] | ||
for _ in range(order - 1): | ||
sequence.append(sequence[-1] + sequence[-2]) | ||
if with_feedback: | ||
output.append('Feedback:') | ||
output.extend((' ' + yaml.dump({ | ||
'sequence': sequence | ||
})).splitlines()) | ||
output.append('') | ||
output.append('Result:'), | ||
output.extend((' ' + yaml.dump({ | ||
'sequence': sequence | ||
})).splitlines()) | ||
output.append('') | ||
output.append('Goal finished with status: SUCCEEDED') | ||
return output | ||
|
||
|
||
class TestROS2ActionCLI(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass( | ||
cls, | ||
launch_service, | ||
proc_info, | ||
proc_output, | ||
rmw_implementation | ||
): | ||
@contextlib.contextmanager | ||
def launch_action_command(self, arguments): | ||
action_command_action = ExecuteProcess( | ||
cmd=['ros2', 'action', *arguments], | ||
name='ros2action-cli', output='screen', | ||
additional_env={ | ||
'RMW_IMPLEMENTATION': rmw_implementation, | ||
'PYTHONUNBUFFERED': '1' | ||
} | ||
) | ||
with launch_testing.tools.launch_process( | ||
launch_service, action_command_action, proc_info, proc_output, | ||
output_filter=launch_testing_ros.tools.basic_output_filter( | ||
filtered_rmw_implementation=rmw_implementation | ||
) | ||
) as action_command: | ||
yield action_command | ||
cls.launch_action_command = launch_action_command | ||
|
||
def test_info_on_nonexistent_action(self): | ||
with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=[ | ||
'Action: /not_an_action', | ||
'Action clients: 0', | ||
'Action servers: 0', | ||
], | ||
text=action_command.output, | ||
strict=False | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_fibonacci_info(self): | ||
with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=[ | ||
'Action: /fibonacci', | ||
'Action clients: 0', | ||
'Action servers: 1', | ||
' /fibonacci_action_server' | ||
], | ||
text=action_command.output, | ||
strict=False | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_fibonacci_info_with_types(self): | ||
with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=[ | ||
'Action: /fibonacci', | ||
'Action clients: 0', | ||
'Action servers: 1', | ||
' /fibonacci_action_server [test_msgs/action/Fibonacci]' | ||
], | ||
text=action_command.output, | ||
strict=False | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_fibonacci_info_count(self): | ||
with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=[ | ||
'Action: /fibonacci', | ||
'Action clients: 0', | ||
'Action servers: 1', | ||
], | ||
text=action_command.output, | ||
strict=False | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_list(self): | ||
with self.launch_action_command(arguments=['list']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=['/fibonacci'], | ||
text=action_command.output, | ||
strict=True | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_list_with_types(self): | ||
with self.launch_action_command(arguments=['list', '-t']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'], | ||
text=action_command.output, strict=True | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_list_count(self): | ||
with self.launch_action_command(arguments=['list', '-c']) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
command_output_lines = action_command.output.splitlines() | ||
assert len(command_output_lines) == 1 | ||
assert int(command_output_lines[0]) == 1 | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_send_fibonacci_goal(self): | ||
with self.launch_action_command( | ||
arguments=[ | ||
'send_goal', | ||
'/fibonacci', | ||
'test_msgs/action/Fibonacci', | ||
'{order: 5}' | ||
], | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=get_fibonacci_send_goal_output(order=5), | ||
text=action_command.output, strict=True | ||
) | ||
|
||
@launch_testing.markers.retry_on_failure(times=5) | ||
def test_send_fibonacci_goal_with_feedback(self): | ||
with self.launch_action_command( | ||
arguments=[ | ||
'send_goal', | ||
'-f', | ||
'/fibonacci', | ||
'test_msgs/action/Fibonacci', | ||
'{order: 5}' | ||
], | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=10) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=get_fibonacci_send_goal_output( | ||
order=5, with_feedback=True | ||
), | ||
text=action_command.output, strict=True | ||
) | ||
|
||
def test_show_fibonacci(self): | ||
with self.launch_action_command( | ||
arguments=['show', 'test_msgs/action/Fibonacci'], | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=2) | ||
assert action_command.exit_code == launch_testing.asserts.EXIT_OK | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=[ | ||
'int32 order', | ||
'---', | ||
'int32[] sequence', | ||
'---', | ||
'int32[] sequence' | ||
], | ||
text=action_command.output, | ||
strict=False | ||
) | ||
|
||
def test_show_not_a_package(self): | ||
with self.launch_action_command( | ||
arguments=['show', 'not_a_package/action/Fibonacci'], | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=2) | ||
assert action_command.exit_code == 1 | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=['Unknown package name'], | ||
text=action_command.output, strict=True | ||
) | ||
|
||
# TODO(hidmic): make 'ros2 action show' fail accordingly | ||
# def test_show_not_an_action_ns(self): | ||
# with self.launch_action_command( | ||
# arguments=['show', 'test_msgs/foo/Fibonacci'], | ||
# ) as action_command: | ||
# assert action_command.wait_for_shutdown(timeout=2) | ||
# assert action_command.exit_code == 1 | ||
# assert launch_testing.tools.expect_output( | ||
# expected_lines=['Unknown action type'], | ||
# text=action_command.output, strict=True | ||
# ) | ||
|
||
def test_show_not_an_action_typename(self): | ||
with self.launch_action_command( | ||
arguments=['show', 'test_msgs/action/NotAnActionTypeName'], | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=2) | ||
assert action_command.exit_code == 1 | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=['Unknown action type'], | ||
text=action_command.output, strict=True | ||
) | ||
|
||
def test_show_not_an_action_type(self): | ||
with self.launch_action_command( | ||
arguments=['show', 'not_an_action_type'] | ||
) as action_command: | ||
assert action_command.wait_for_shutdown(timeout=2) | ||
assert action_command.exit_code == 1 | ||
assert launch_testing.tools.expect_output( | ||
expected_lines=['The passed action type is invalid'], | ||
text=action_command.output, strict=True | ||
) |
Oops, something went wrong.