-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtest_tf.py
151 lines (128 loc) · 8.43 KB
/
test_tf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import subprocess
import unittest
import pytest
from parameterized import parameterized
from main import get_policy_check_type, get_required_inputs, get_optional_inputs, get_sub_command
from main import get_treat_findings_as_non_blocking_flag, build_command, execute_command, set_output
from main import CLI_POLICY_VALIDATOR, POLICY_CHECK_TYPE, VALIDATE_POLICY, CHECK_NO_NEW_ACCESS, CHECK_ACCESS_NOT_GRANTED, CHECK_NO_PUBLIC_ACCESS
from main import COMMON_REQUIRED_INPUTS, TREAT_FINDINGS_AS_NON_BLOCKING
from unittest.mock import patch
INVALID_POLICY_CHECK = "INVALID_POLICY_CHECK"
class TfpvTest(unittest.TestCase):
# case 1: test_get_type_ENVIRON_NOT_SET: failure expected because required os.environ[] are not set
def test_get_type_ENVIRON_NOT_SET(self):
self.assertRaises(KeyError, get_policy_check_type)
# case 2: test_get_type_INVALID_POLICY_CHECK: failure expected because os.environ[] is set to an invalid value
def test_get_type_INVALID_POLICY_CHECK(self):
os.environ[POLICY_CHECK_TYPE] = INVALID_POLICY_CHECK
self.assertRaises(ValueError, get_policy_check_type)
# case 3, 4, 5: test_get_type_WITH_VALIDATE_POLICY: success with valid POLICY_CHECK_TYPE
@parameterized.expand([VALIDATE_POLICY, CHECK_NO_NEW_ACCESS, CHECK_ACCESS_NOT_GRANTED, CHECK_NO_PUBLIC_ACCESS])
def test_get_type_WITH_VALID_POLICY_CHECK(self, policy_check_type):
os.environ[POLICY_CHECK_TYPE] = policy_check_type
policy_type = get_policy_check_type()
self.assertEqual(policy_type, policy_check_type)
# case 6: test_get_required_input_INVALID_POLICY_CHECK: failure expected because an invalid policy_check_type is provided
@unittest.expectedFailure
def test_get_required_input_INVALID_POLICY_CHECK(self):
policy_check = INVALID_POLICY_CHECK
self.assertEqual(get_required_inputs(policy_check), "")
# case 7, 8, 9: test_get_required_input_WITH_VALIDATE_POLICY: success as a valid POLICY_CHECK_TYPE is provided: VALIDATE_POLICY
@parameterized.expand([VALIDATE_POLICY, CHECK_NO_NEW_ACCESS, CHECK_ACCESS_NOT_GRANTED, CHECK_NO_PUBLIC_ACCESS])
def test_get_required_input_WITH_VALID_POLICY_CHECK(self, policy_check_type):
result = get_required_inputs(policy_check_type)
if policy_check_type == CHECK_NO_NEW_ACCESS:
self.assertEqual(result, {"INPUT_TEMPLATE-PATH", "INPUT_REGION", "INPUT_REFERENCE-POLICY", "INPUT_REFERENCE-POLICY-TYPE"})
elif policy_check_type == CHECK_ACCESS_NOT_GRANTED:
self.assertEqual(result, {"INPUT_TEMPLATE-PATH", "INPUT_REGION", ("INPUT_ACTIONS", "INPUT_RESOURCES")})
else:
self.assertEqual(result, {"INPUT_TEMPLATE-PATH", "INPUT_REGION"})
# case 10: test_get_optional_input_INVALID_POLICY_CHECK: failure expected because an invalid policy_check_type is provided
@unittest.expectedFailure
def test_get_optional_input_INVALID_POLICY_CHECK(self):
policy_check = INVALID_POLICY_CHECK
result = get_optional_inputs(policy_check)
assertEqual(result, {"INPUT_IGNORE-FINDING", "INPUT_ALLOW-DYNAMIC-REF-WITHOUT-VERSION", "INPUT_EXCLUDE-RESOURCE-TYPES"})
# case 11, 12, 13: test_get_optional_input_WITH_VALID_POLICY_CHECK: success as a valid POLICY_CHECK_TYPE is provided
@parameterized.expand([VALIDATE_POLICY, CHECK_NO_NEW_ACCESS, CHECK_ACCESS_NOT_GRANTED, CHECK_NO_NEW_ACCESS])
def test_get_optional_input_WITH_VALID_POLICY_CHECK(self, policy_check_type):
result = get_optional_inputs(policy_check_type)
if policy_check_type == VALIDATE_POLICY:
self.assertEqual(result, {"INPUT_ALLOW-EXTERNAL-PRINCIPALS", "INPUT_TREAT-FINDING-TYPE-AS-BLOCKING", "INPUT_IGNORE-FINDING",
"INPUT_ALLOW-DYNAMIC-REF-WITHOUT-VERSION", "INPUT_EXCLUDE-RESOURCE-TYPES"})
else:
self.assertEqual(result, {"INPUT_IGNORE-FINDING", "INPUT_ALLOW-DYNAMIC-REF-WITHOUT-VERSION", "INPUT_EXCLUDE-RESOURCE-TYPES"})
# case 14: test_get_sub_command_ENVIRON_NOT_SET: failure expected because required os.environ[]s are not set
def test_get_sub_command_ENVIRON_NOT_SET(self):
os.environ['INPUT_REGION'] = ''
required = ['INPUT_REGION']
self.assertRaises(ValueError, get_sub_command, required, True)
# case 15: test_get_sub_command_MEET_REQUIRED_INPUTS: success as valid inputs are provided.
def test_get_sub_command_MEET_REQUIRED_INPUTS(self):
tft = os.getcwd() + './main.py'
os.environ['INPUT_TEMPLATE-PATH'] = tft
os.environ['INPUT_REGION'] = 'us-west-2'
required = {"INPUT_TEMPLATE-PATH", "INPUT_REGION"}
expected = ['--template-path', tft, '--region', 'us-west-2']
flags = get_sub_command(required, True)
self.assertEqual(set(flags), set(expected))
# case 16: test_get_treat_findings_as_non_blocking_flag_ENVIRON_NOT_SET: failure expected because os.environ[TREAT_FINDINGS_AS_NON_BLOCKING] is not set
def test_get_treat_findings_as_non_blocking_flag_ENVIRON_NOT_SET(self):
os.environ[TREAT_FINDINGS_AS_NON_BLOCKING] = ''
policy_check = CHECK_NO_NEW_ACCESS
self.assertRaises(ValueError, get_treat_findings_as_non_blocking_flag, policy_check)
# case 17: test_get_treat_findings_as_non_blocking_flag_VALIDATE_POLICY: pass
def test_get_treat_findings_as_non_blocking_flag_VALIDATE_POLICY(self):
policy_check = VALIDATE_POLICY
result = get_treat_findings_as_non_blocking_flag(policy_check)
self.assertEqual(result, "")
# case 18: test_get_treat_findings_as_non_blocking_flag_CHECK_ACCESS_NOT_GRANTED_True: pass, a string of flag returned
def test_get_treat_findings_as_non_blocking_flag_CHECK_ACCESS_NOT_GRANTED_True(self):
policy_check = CHECK_ACCESS_NOT_GRANTED
os.environ[TREAT_FINDINGS_AS_NON_BLOCKING] = 'True'
result = get_treat_findings_as_non_blocking_flag(policy_check)
self.assertEqual(result, ['--treat-findings-as-non-blocking'])
# case 19: test_get_treat_findings_as_non_blocking_flag_CHECK_ACCESS_NOT_GRANTED_False: pass, an empty string returned
def test_get_treat_findings_as_non_blocking_flag_CHECK_ACCESS_NOT_GRANTED_False(self):
policy_check = CHECK_ACCESS_NOT_GRANTED
os.environ[TREAT_FINDINGS_AS_NON_BLOCKING] = 'False'
result = get_treat_findings_as_non_blocking_flag(policy_check)
self.assertEqual(result, "")
# case 20: test_build_command_VALIDATE_POLICY: pass with correct parameters.
def test_build_command_VALIDATE_POLICY(self):
policy_check = VALIDATE_POLICY
tft = os.getcwd() + './main.py'
os.environ['INPUT_TEMPLATE-PATH'] = tft
os.environ['INPUT_REGION'] = 'us-west-2'
os.environ['TREAT_FINDINGS_AS_NON_BLOCKING'] = 'False'
os.environ['INPUT_IGNORE-FINDING'] = 'PASS_ROLE_WITH_STAR_IN_RESOURCE'
required = {"INPUT_TEMPLATE-PATH", "INPUT_REGION"}
optional = {"INPUT_IGNORE-FINDING"}
command = build_command(policy_check, required, optional)
expected = set([CLI_POLICY_VALIDATOR, 'validate', '--template-path', tft, '--region', 'us-west-2', '--ignore-finding', 'PASS_ROLE_WITH_STAR_IN_RESOURCE'])
command_set = set(command)
self.assertEqual(command_set, expected)
# case 21: test_execute_command_EXCEPTION: exception raised
@patch("subprocess.run", side_effect=Exception("invalid-input"))
def test_execute_command_EXCEPTION(self, mock_run):
with pytest.raises(Exception) as exc:
result = execute_command()
assertNotEqual(str(exc.value).find("invalid-input"), -1)
# case 21: test_execute_command_VALIDATE: pass with mockrun
@patch("main.subprocess.run")
def test_execute_command_VALIDATE(self, mock_run):
command = [CLI_POLICY_VALIDATOR, 'validate']
expected = "pass"
completed_process = subprocess.CompletedProcess(args=['command', 'args'], returncode=0, stdout=expected, stderr=b'error')
mock_run.return_value = completed_process
result = execute_command(command)
self.assertEqual(result, expected)
# case 23: test_set_output: pass
def test_set_output_WRITE_TO_FILE(self):
os.environ['GITHUB_OUTPUT'] = 'o1'
val = '{"BlockingFindings": [],"NonBlockingFindings": []}'
set_output(val)
res = subprocess.run(["grep", "BlockingFindings", "o1"], check=True, capture_output=True, encoding="utf-8").stdout
subprocess.run(["rm", "o1"])
self.assertNotEqual(res.find("BlockingFindings"), -1)