Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

taking allow_elevated out of expected fields in ARM template #25

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 93 additions & 120 deletions src/confcom/azext_confcom/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import json
import os
from typing import Any, List, Dict
from itertools import product
from azext_confcom.template_util import (
case_insensitive_dict_get,
replace_params_and_vars,
Expand Down Expand Up @@ -230,44 +229,6 @@ def extract_exec_process(container_json: Any) -> List:
return exec_processes_output


def extract_allow_elevated(container_json: Any) -> bool:
# privileged is used for arm templates
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)

# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"

if privileged_value:
return privileged_value

# allow_elevated is used for input.json
_allow_elevated = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED
)
if _allow_elevated is not None:
if not isinstance(_allow_elevated, bool):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED}"] can only be boolean value.'
)
return _allow_elevated
# default value is false
return False


def extract_allow_stdio_access(container_json: Any) -> bool:
# get the field for Standard IO access, default to true
allow_stdio_value = case_insensitive_dict_get(
Expand Down Expand Up @@ -321,110 +282,119 @@ def extract_user(container_json: Any) -> Dict:
return user


def extract_capabilities(container_json: Any, allow_elevated: bool):
def extract_capabilities(container_json: Any, privileged_value: bool):
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)
# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"

output_capabilities = copy.deepcopy(_CAPABILITIES)
non_added_fields = [
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_BOUNDING,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_EFFECTIVE,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_PERMITTED,
]

# if privileged and allow elevated is true, then set all capabilities to true
# else, get the capabilities field from the ARM Template
if privileged_value and allow_elevated:
for key in output_capabilities:
# we still want the ambient set to be empty
if key != config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_AMBIENT:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_PRIVILEGED_CAPABILITIES)
# add privileged default capabilities if true, otherwise add unprivileged default capabilities
if privileged_value:
# only ambient should be empty
non_added_fields.append(config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_INHERITABLE)
for key in non_added_fields:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_PRIVILEGED_CAPABILITIES)
else:
non_added_fields = [
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_BOUNDING,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_EFFECTIVE,
config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_PERMITTED,
]

# add the default capabilities to the output
for key in non_added_fields:
output_capabilities[key] = copy.deepcopy(config.DEFAULT_UNPRIVILEGED_CAPABILITIES)
# get the capabilities field
capabilities = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_CAPABILITIES

# add and drop capabilities if they are explicitly set in the ARM template
capabilities = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_CAPABILITIES
)

# user can ADD and DROP capabilities in the ARM template
if capabilities:
# error check if capabilities is not a dict
if not isinstance(capabilities, dict):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES}"] can only be a dictionary.'
)

# get the add field
add = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD
)
# if allow elevated is true and container is not privileged,
# user can ADD and DROP capabilities in the ARM template
# if not allow elevated and container is not privileged, use default unprivileged capabilities
if capabilities and allow_elevated:
# error check if capabilities is not a dict
if not isinstance(capabilities, dict):
if add:
# error check if add is not a list
if not isinstance(add, list):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES}"] can only be a dictionary.'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only be a list.'
)

# get the add field
add = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD
)
if add:
# error check if add is not a list
if not isinstance(add, list):
# error check if add contains non-string values
for capability in add:
# error check that all the items in "add" are strings
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only be a list.'
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only contain strings.'
)
for key, capability in product(output_capabilities, add):
# error check that all the items in "add" are strings
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_ADD}"] can only contain strings.'
)
# add the capabilities to the output, except ambient list
# we still want the ambient set to be empty
if key != config.POLICY_FIELD_CONTAINERS_ELEMENTS_CAPABILITIES_AMBIENT:
output_capabilities[key].append(capability)

# get the drop field
drop = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP
)
if drop:
# error check if drop is not a list
if not isinstance(drop, list):
for key in non_added_fields:
# add the capabilities to the output, except ambient list
# we still want the ambient set to be empty
output_capabilities[key] += add

# get the drop field
drop = case_insensitive_dict_get(
capabilities, config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP
)
if drop:
# error check if drop is not a list
if not isinstance(drop, list):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only be a list.'
)
# error check that all the items in "drop" are strings
for capability in drop:
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only be a list.'
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only contain strings.'
)
# error check that all the items in "drop" are strings
for capability in drop:
if not isinstance(capability, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_CAPABILITIES_DROP}"] can only contain strings.'
)
# drop the capabilities from the output
for value in non_added_fields:
output_capabilities[value] = [x for x in output_capabilities[value] if x not in drop]
# drop the capabilities from the output
for keys in output_capabilities:
output_capabilities[keys] = [x for x in output_capabilities[keys] if x not in drop]
# de-duplicate the capabilities
for key, value in output_capabilities.items():
output_capabilities[key] = sorted(list(set(value)))

return output_capabilities


def extract_allow_elevated(container_json: Any) -> bool:
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
)

# get the field for privileged, default to false
privileged_value = case_insensitive_dict_get(
security_context, config.ACI_FIELD_CONTAINERS_PRIVILEGED
)
if privileged_value and not isinstance(privileged_value, bool) and not isinstance(privileged_value, str):
eprint(
f'Field ["{config.ACI_FIELD_CONTAINERS}"]["{config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT}"]'
+ f'["{config.ACI_FIELD_CONTAINERS_PRIVILEGED}"] can only be a boolean or string value.'
)

# force the field into a bool
if isinstance(privileged_value, str):
privileged_value = privileged_value.lower() == "true"
# default to false
return privileged_value or False


def extract_seccomp_profile_sha256(container_json: Any) -> Dict:
security_context = case_insensitive_dict_get(
container_json, config.ACI_FIELD_CONTAINERS_SECURITY_CONTEXT
Expand Down Expand Up @@ -521,7 +491,10 @@ def from_json(
command = extract_command(container_json)
working_dir = extract_working_dir(container_json)
mounts = extract_mounts(container_json)
allow_elevated = extract_allow_elevated(container_json)
# the first half of the conditional is for backwards compatibility with input.json-formatted files
allow_elevated = case_insensitive_dict_get(
container_json,
config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED) or extract_allow_elevated(container_json)
exec_processes = extract_exec_process(
container_json
)
Expand Down
3 changes: 0 additions & 3 deletions src/confcom/azext_confcom/security_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,6 @@ def load_policy_from_arm_template_str(
)
or [],
config.ACI_FIELD_CONTAINERS_MOUNTS: process_mounts(image_properties, volumes),
config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED: case_insensitive_dict_get(
image_properties, config.ACI_FIELD_CONTAINERS_ALLOW_ELEVATED
),
config.ACI_FIELD_CONTAINERS_EXEC_PROCESSES: exec_processes
+ config.DEBUG_MODE_SETTINGS.get("execProcesses")
if debug_mode
Expand Down
Loading