Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate input of config mirror_session add #1825

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@

asic_type = None

DSCP_RANGE = click.IntRange(min=0, max=63)
TTL_RANGE = click.IntRange(min=0, max=255)
QUEUE_RANGE = click.IntRange(min=0, max=255)
GRE_TYPE_RANGE = click.IntRange(min=0, max=65535)

#
# Helper functions
#
Expand Down Expand Up @@ -953,6 +958,19 @@ def cache_arp_entries():
open(restore_flag_file, 'w').close()
return success


def validate_ipv4_address(ctx, param, ip_addr):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move this to common so it can be used by other files in future? -

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impmenting a common function to validate IP address is a good suggestion, but this interface is not to be common because this one is implemented as a callback interface of click. I will implement that common interface in another PR.

"""Helper function to validate ipv4 address
"""
try:
ip_n = ipaddress.ip_network(ip_addr, False)
if ip_n.version != 4:
raise click.UsageError("{} is not a valid IPv4 address".format(ip_addr))
return ip_addr
except ValueError as e:
raise click.UsageError(str(e))


# This is our main entrypoint - the main 'config' command
@click.group(cls=clicommon.AbbreviationGroup, context_settings=CONTEXT_SETTINGS)
@click.pass_context
Expand Down Expand Up @@ -1775,12 +1793,12 @@ def mirror_session():

@mirror_session.command('add')
@click.argument('session_name', metavar='<session_name>', required=True)
@click.argument('src_ip', metavar='<src_ip>', required=True)
@click.argument('dst_ip', metavar='<dst_ip>', required=True)
@click.argument('dscp', metavar='<dscp>', required=True)
@click.argument('ttl', metavar='<ttl>', required=True)
@click.argument('gre_type', metavar='[gre_type]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('src_ip', metavar='<src_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dst_ip', metavar='<dst_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dscp', metavar='<dscp>', type=DSCP_RANGE, required=True)
@click.argument('ttl', metavar='<ttl>', type=TTL_RANGE, required=True)
@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.option('--policer')
def add(session_name, src_ip, dst_ip, dscp, ttl, gre_type, queue, policer):
""" Add ERSPAN mirror session.(Legacy support) """
Expand All @@ -1799,12 +1817,12 @@ def erspan(ctx):

@erspan.command('add')
@click.argument('session_name', metavar='<session_name>', required=True)
@click.argument('src_ip', metavar='<src_ip>', required=True)
@click.argument('dst_ip', metavar='<dst_ip>', required=True)
@click.argument('dscp', metavar='<dscp>', required=True)
@click.argument('ttl', metavar='<ttl>', required=True)
@click.argument('gre_type', metavar='[gre_type]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('src_ip', metavar='<src_ip>', callback=validate_ipv4_address, required=True)
@click.argument('dst_ip', metavar='<dst_ip>', callback=validate_ipv4_address,required=True)
@click.argument('dscp', metavar='<dscp>', type=DSCP_RANGE, required=True)
@click.argument('ttl', metavar='<ttl>', type=TTL_RANGE, required=True)
@click.argument('gre_type', metavar='[gre_type]', type=GRE_TYPE_RANGE, required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.argument('src_port', metavar='[src_port]', required=False)
@click.argument('direction', metavar='[direction]', required=False)
@click.option('--policer')
Expand Down Expand Up @@ -1877,7 +1895,7 @@ def span(ctx):
@click.argument('dst_port', metavar='<dst_port>', required=True)
@click.argument('src_port', metavar='[src_port]', required=False)
@click.argument('direction', metavar='[direction]', required=False)
@click.argument('queue', metavar='[queue]', required=False)
@click.argument('queue', metavar='[queue]', type=QUEUE_RANGE, required=False)
@click.option('--policer')
def add(session_name, dst_port, src_port, direction, queue, policer):
""" Add SPAN mirror session """
Expand Down
130 changes: 130 additions & 0 deletions tests/config_mirror_session_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pytest
import config.main as config
from unittest import mock
from click.testing import CliRunner

ERR_MSG_IP_FAILURE = "does not appear to be an IPv4 or IPv6 network"
ERR_MSG_IP_VERSION_FAILURE = "not a valid IPv4 address"
ERR_MSG_VALUE_FAILURE = "Invalid value for"

def test_mirror_session_add():
runner = CliRunner()

# Verify invalid src_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid dst_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid ip version
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1::1", "2::2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_VERSION_FAILURE in result.stdout

# Verify invalid dscp
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid ttl
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid gre
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid queue
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Positive case
with mock.patch('config.main.add_erspan') as mocked:
result = runner.invoke(
config.config.commands["mirror_session"].commands["add"],
["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"])

mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None)



def test_mirror_session_erspan_add():
runner = CliRunner()

# Verify invalid src_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "400.1.1.1", "2.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid dst_ip
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "256.2.2.2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_FAILURE in result.stdout

# Verify invalid ip version
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1::1", "2::2", "8", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_IP_VERSION_FAILURE in result.stdout

# Verify invalid dscp
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "65536", "63", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid ttl
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "256", "10", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid gre
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65536", "100"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Verify invalid queue
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "1.1.1.1", "2.2.2.2", "6", "63", "65", "65536"])
assert result.exit_code != 0
assert ERR_MSG_VALUE_FAILURE in result.stdout

# Positive case
with mock.patch('config.main.add_erspan') as mocked:
result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_session", "100.1.1.1", "2.2.2.2", "8", "63", "10", "100"])

mocked.assert_called_with("test_session", "100.1.1.1", "2.2.2.2", 8, 63, 10, 100, None, None, None)