Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End-to-end test coverage for CLI commands output #304

Merged
merged 11 commits into from
Oct 22, 2019
1 change: 1 addition & 0 deletions ros2action/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>ros_testing</test_depend>
<test_depend>test_msgs</test_depend>

<export>
Expand Down
71 changes: 71 additions & 0 deletions ros2action/test/fixtures/fibonacci_action_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import time

import sys

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from test_msgs.action import Fibonacci


class FibonacciActionServer(Node):

def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)

def destroy_node(self):
self._action_server.destroy()
super().destroy_node()

def execute_callback(self, goal_handle):
feedback = Fibonacci.Feedback()
feedback.sequence = [0, 1]

for i in range(1, goal_handle.request.order):
feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1])
goal_handle.publish_feedback(feedback)

goal_handle.succeed()

result = Fibonacci.Result()
result.sequence = feedback.sequence
return result


def main(args=None):
rclpy.init(args=args)

node = FibonacciActionServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
print('server stopped cleanly')
except BaseException:
print('exception in server:', file=sys.stderr)
raise
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we could provide a package with test nodes, so they can be used from other packages.
Probably, this is out of the scope of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I entertained that idea for a bit, but besides basic talker / listeners, fixture nodes tend to be quite specific to a test and not so easy to reuse unless you start generalizing them -- which I purposefully didn't want to.

313 changes: 313 additions & 0 deletions ros2action/test/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import re
import sys
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction

import launch_testing
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

from rmw_implementation import get_available_rmw_implementations

import yaml


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation, ready_fn):
path_to_action_server_executable = os.path.join(
os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py'
)
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
ExecuteProcess(
cmd=[sys.executable, path_to_action_server_executable],
additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
),
OpaqueFunction(function=lambda context: ready_fn())
],
additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
)
]
),
])


def get_fibonacci_send_goal_output(*, order=1, with_feedback=False):
assert order > 0
output = [
'Waiting for an action server to become available...',
'Sending goal:',
' order: {}'.format(order),
'',
re.compile('Goal accepted with ID: [a-f0-9]+'),
'',
]
sequence = [0, 1]
for _ in range(order - 1):
sequence.append(sequence[-1] + sequence[-2])
if with_feedback:
output.append('Feedback:')
output.extend((' ' + yaml.dump({
'sequence': sequence
})).splitlines())
output.append('')
output.append('Result:'),
output.extend((' ' + yaml.dump({
'sequence': sequence
})).splitlines())
output.append('')
output.append('Goal finished with status: SUCCEEDED')
return output


class TestROS2ActionCLI(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_action_command(self, arguments):
action_command_action = ExecuteProcess(
cmd=['ros2', 'action', *arguments],
name='ros2action-cli', output='screen',
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
}
)
with launch_testing.tools.launch_process(
launch_service, action_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=rmw_implementation
)
) as action_command:
yield action_command
cls.launch_action_command = launch_action_command

def test_info_on_nonexistent_action(self):
with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /not_an_action',
'Action clients: 0',
'Action servers: 0',
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info(self):
with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
' /fibonacci_action_server'
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info_with_types(self):
with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
' /fibonacci_action_server [test_msgs/action/Fibonacci]'
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info_count(self):
with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list(self):
with self.launch_action_command(arguments=['list']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['/fibonacci'],
text=action_command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list_with_types(self):
with self.launch_action_command(arguments=['list', '-t']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'],
text=action_command.output, strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list_count(self):
with self.launch_action_command(arguments=['list', '-c']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
command_output_lines = action_command.output.splitlines()
assert len(command_output_lines) == 1
assert int(command_output_lines[0]) == 1

@launch_testing.markers.retry_on_failure(times=5)
def test_send_fibonacci_goal(self):
with self.launch_action_command(
arguments=[
'send_goal',
'/fibonacci',
'test_msgs/action/Fibonacci',
'{order: 5}'
],
) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=get_fibonacci_send_goal_output(order=5),
text=action_command.output, strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_send_fibonacci_goal_with_feedback(self):
with self.launch_action_command(
arguments=[
'send_goal',
'-f',
'/fibonacci',
'test_msgs/action/Fibonacci',
'{order: 5}'
],
) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=get_fibonacci_send_goal_output(
order=5, with_feedback=True
),
text=action_command.output, strict=True
)

def test_show_fibonacci(self):
with self.launch_action_command(
arguments=['show', 'test_msgs/action/Fibonacci'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'int32 order',
'---',
'int32[] sequence',
'---',
'int32[] sequence'
],
text=action_command.output,
strict=False
)

def test_show_not_a_package(self):
with self.launch_action_command(
arguments=['show', 'not_a_package/action/Fibonacci'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['Unknown package name'],
text=action_command.output, strict=True
)

# TODO(hidmic): make 'ros2 action show' fail accordingly
# def test_show_not_an_action_ns(self):
# with self.launch_action_command(
# arguments=['show', 'test_msgs/foo/Fibonacci'],
# ) as action_command:
# assert action_command.wait_for_shutdown(timeout=2)
# assert action_command.exit_code == 1
# assert launch_testing.tools.expect_output(
# expected_lines=['Unknown action type'],
# text=action_command.output, strict=True
# )

def test_show_not_an_action_typename(self):
with self.launch_action_command(
arguments=['show', 'test_msgs/action/NotAnActionTypeName'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['Unknown action type'],
text=action_command.output, strict=True
)

def test_show_not_an_action_type(self):
with self.launch_action_command(
arguments=['show', 'not_an_action_type']
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['The passed action type is invalid'],
text=action_command.output, strict=True
)
Loading