Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data segmentation in JSON file #81

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ and will prompt you for credentials and save the retrieved API key to "API_KEY.t
* Run all validators on an XML configuration file downloaded with "Export Panorama configuration version":
`pan_analyzer --xml 12345.xml`

* Run all validators on an XML configuration file downloaded with "Export Panorama configuration version" and choose type output file (formats support txt (text) and json (json)):
`pan_analyzer --xml 12345.xml --output text`
`pan_analyzer --xml 12345.xml --output json`

If you're not sure where to start, I recommend downloading an XML file from:
`Panorama -> Setup -> Operations -> Export Panorama configuration version` and running: `pan_analyzer.py --xml 12345.xml`
Expand Down
39 changes: 34 additions & 5 deletions src/palo_alto_firewall_analyzer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import socket
import typing
import xml.etree.ElementTree

import xmltodict

from palo_alto_firewall_analyzer.pan_config import PanConfig
Expand Down Expand Up @@ -140,9 +139,16 @@ class ProfilePackage:
devicegroup_exclusive_objects: typing.Dict
rule_limit_enabled: bool


BadEntry = collections.namedtuple('BadEntry', ['data', 'text', 'device_group', 'entry_type'])

Detail = collections.namedtuple('Detail',['policy_type','policy_name','device_group',
'entry_type','entry_name','entry_value',
'rule_type','rule_name','protocol',
'port','allowed_group_profiles','group_profile_setting','address',
'fqdn','ip','ip_mask','loc','mandated_log_profile','log_setting',
'object_entry_name','policy_entry_name','shadowing_address_name',
'zone_type','zones','extra'
]
)
BadEntry = collections.namedtuple('BadEntry', ['data', 'text', 'device_group', 'entry_type','Detail'])

@functools.lru_cache(maxsize=None)
def cached_dns_lookup(domain):
Expand Down Expand Up @@ -178,11 +184,34 @@ def cached_fqdn_lookup(domain):


@functools.lru_cache(maxsize=None)
def xml_object_to_dict(xml_obj):
def xml_object_to_dict1(xml_obj):
obj_xml_string = xml.etree.ElementTree.tostring(xml_obj)
obj_dict = xmltodict.parse(obj_xml_string)
return obj_dict

def xml_object_to_dict(xml_obj):
obj_xml_string = xml.etree.ElementTree.tostring(xml_obj)

root = xml.etree.ElementTree.fromstring(obj_xml_string)

list_atr_remove = ['loc']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here as to why you're removing the 'loc' attribute

Copy link
Contributor Author

@Nawtest Nawtest Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll add a comment, but I'll also comment it here just in case you see another solution. The issue is that they've added the 'loc' attribute, and now the script doesn't retrieve the data properly from the XML. Here's an example.

image

What I've done is clean the XML of attributes. I did it this way in case they add other attributes later on.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long response time, my work and personal lives have both been very busy.

I was doing some testing and noticed a problem:
Some validators retrieve attributes, like the name, as is done here: https://github.com/moshekaplan/palo_alto_firewall_analyzer/blob/main/src/palo_alto_firewall_analyzer/validators/misleading_objects.py#L27 :

address_dict = xml_object_to_dict(address_entry)
entry_name = address_dict['entry']['@name']

Therefore, removing the attributes breaks at least that validator - probably more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, no problem at all, I understand this is a hobby, there are more important things. I don't understand what you mean, I've been testing and I wasn't getting any errors, well, except for my files,... but the only tag I remove is 'loc' and I make it nominal, I don't remove any other name or anything. Could you provide some kind of example of your tests? As I mentioned, with the exported files I've been working with, both with the '@loc' tag and without it, I haven't had any issues. Thanks.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the super-delayed follow-up on this.

Say I have an address object with a single IP-netmask of 127.0.0.1.

The old code (now called with xml_object_to_dict1) would result in a dict like the following:

(Pdb) xml_object_to_dict1(address_entry)
{'entry': {'@name': 'ip-127.0.0.1', 'ip-netmask': '127.0.0.1'}}

The new code (what in this PR is called xml_object_to_dict) removes the @name attribute, resulting in the following:

(Pdb) xml_object_to_dict(address_entry)
{'entry': {'ip-netmask': '127.0.0.1'}}

This is problematic because of code that accesses the @name attribute, like:
https://github.com/Nawtest/palo_alto_firewall_analyzer/blob/main/src/palo_alto_firewall_analyzer/validators/misleading_objects.py#L28 :

entry_name = address_dict['entry']['@name']

Because the @name attribute is missing, the dictionary access fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okiz, don't worry, I'll review it. It's strange to me that this attribute gets removed; the one I remove is '@loc'. I'll check it to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I am checking the code, and I don't understand why removing the 'loc' tag causes an error when looking for the 'name' tag. It is true that in my XML files, there are some entries without the 'name' tag. I added an if statement to check the issue, and this is the result:

  • if not delete the 'loc' tag, member is:
    image

  • if delete the 'loc' tag, and include a if, the json generated is:
    image

This example my file:
image

Not include 'name' tag


def remove_loc(elements,atr):
if atr in elements.attrib:
del elements.attrib[atr]
for elem in elements:
remove_loc(elem,atr)

for atr in list_atr_remove:
for entry in root:
remove_loc(entry,atr)

xml_atr_remove = xml.etree.ElementTree.tostring(root)

obj_dict = xmltodict.parse(xml_atr_remove)

return obj_dict


@functools.lru_cache(maxsize=None)
def get_single_ip_from_address(address_entry):
Expand Down
6 changes: 3 additions & 3 deletions src/palo_alto_firewall_analyzer/pan_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, configdata: str, from_file=False):
fake_result.append(conf)
# fake_response.append(fake_result)
self.configroot = fake_result
self.config_xml = {"version": conf.get("version"),"urldb": conf.get("urldb"),"detail-version":conf.get("detail-version")}
else:
self.configroot = xml.etree.ElementTree.fromstring(configdata).find('./result')

Expand All @@ -40,10 +41,9 @@ def get_device_groups(self):

@functools.lru_cache(maxsize=None)
def get_device_groups_hierarchy(self):
xpath = "./config/readonly/devices/entry[@name='localhost.localdomain']/device-group/entry"

xpath = "./config/readonly/devices/entry[@name='localhost.localdomain']/device-group/entry"
device_group_hierarchy_children = collections.defaultdict(list)
device_group_hierarchy_parent = {}
device_group_hierarchy_parent = {}
Nawtest marked this conversation as resolved.
Show resolved Hide resolved
for devicegroup_elem in self.configroot.findall(xpath):
name = devicegroup_elem.get('name')
parent = devicegroup_elem.find('parent-dg')
Expand Down
84 changes: 68 additions & 16 deletions src/palo_alto_firewall_analyzer/scripts/pan_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os.path
import sys
import time
import json

# Used to trigger loading the validators and fixers
import palo_alto_firewall_analyzer.validators
Expand All @@ -13,11 +14,13 @@
from palo_alto_firewall_analyzer.core import get_policy_validators, get_policy_fixers, ConfigurationSettings
from palo_alto_firewall_analyzer.pan_helpers import load_config_package, load_API_key

from palo_alto_firewall_analyzer.scripts.pan_details import get_json_detail

DEFAULT_CONFIG_DIR = os.path.expanduser("~" + os.sep + ".pan_policy_analyzer" + os.sep)
DEFAULT_CONFIGFILE = DEFAULT_CONFIG_DIR + "PAN_CONFIG.cfg"
DEFAULT_API_KEYFILE = DEFAULT_CONFIG_DIR + "API_KEY.txt"
EXECUTION_START_TIME = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')

RUNTIME_START = time.time()
logger = logging.getLogger('palo_alto_firewall_analyzer')


Expand Down Expand Up @@ -59,19 +62,25 @@ def run_policy_fixers(fixers, profilepackage, output_fname):
def run_policy_validators(validators, profilepackage, output_fname):
problems = {}
total_problems = 0
total_checks = 0
logger.info("Running validators")

for name, validator_values in validators.items():
validator_name, validator_description, validator_function = validator_values
validator_problems = validator_function(profilepackage)
problems[(validator_name, validator_description)] = validator_problems
total_problems += len(validator_problems)

return problems, total_problems


def write_analyzer_output(problems, fname, out_format):
supported_output_formats = ["text"]
validator_problems, count_checks = validator_function(profilepackage)
problems[(validator_name, validator_description),count_checks] = validator_problems
total_problems += len(validator_problems)
total_checks += count_checks

return problems, total_problems, total_checks


def write_analyzer_output(problems, fname, profilepackage, sum_total_checks, out_format = 'text'):

supported_output_formats = ["text", "json"]
if out_format is None:
out_format = 'text'

if out_format not in supported_output_formats:
raise Exception(
f"Unsupported output format of {out_format}! Output format must be one of {supported_output_formats}")
Expand All @@ -82,7 +91,7 @@ def write_analyzer_output(problems, fname, out_format):
validator_name, validator_description = validator_info

fh.write("#" * 80 + '\n')
fh.write(f"{validator_name}: {validator_description} ({len(problem_entries)})\n")
fh.write(f"{validator_name}: {validator_description} ({len(problem_entries)}/{sum_total_checks})\n")
fh.write("#" * 80 + '\n')
for problem_entry in problem_entries:
# fh.write(f"Output for config name: {config_name} \n\n")
Expand All @@ -91,7 +100,43 @@ def write_analyzer_output(problems, fname, out_format):
# else:
# fh.write('(none)\n')
fh.write('\n')

elif out_format == 'json':
moshekaplan marked this conversation as resolved.
Show resolved Hide resolved
#build json
total_problems = 0
entries = []
#print(problems)
for validator_info, problem_entries in problems.items():
#TODO: Can I doing better?
validator_name, validator_description = validator_info[0]
total_checks = validator_info[1]

problems = []
for problem_entry in problem_entries:
if problem_entry.Detail is not None:
problem = {"desc":problem_entry.text,"detail":get_json_detail(problem_entry.Detail)}
else:
problem = {"desc":problem_entry.text}

problems.append(problem)
total_problems+=1

entry = {"validator_name":validator_name, "total_checks": total_checks,"problems":problems}
entries.append(entry)

end_time = time.time()

data = {"config_version":profilepackage.pan_config.config_xml['version'],
"detail-version":profilepackage.pan_config.config_xml['detail-version'],
"urldb":profilepackage.pan_config.config_xml['urldb'],
"date_execution": EXECUTION_START_TIME,
"runtime":round(end_time - RUNTIME_START, 2),
"total_problems": total_problems,
"total_checks": sum_total_checks,
"entries":entries
}

with open(fname,'w') as fh:
json.dump(data,fh)

def build_output_fname(parsed_args):
# Build the name of the output file
Expand Down Expand Up @@ -119,7 +164,12 @@ def build_output_fname(parsed_args):
else:
limit_string = ""

output_fname = f'pan_analyzer_output_{EXECUTION_START_TIME}{devicegroup_string}{xml_string}{validators_string}{fixers_string}{limit_string}.txt'
if parsed_args.output == 'json':
extension = '.json'
else:
extension = '.txt'

output_fname = f'pan_analyzer_output_{EXECUTION_START_TIME}{devicegroup_string}{xml_string}{validators_string}{fixers_string}{limit_string}'+extension
Nawtest marked this conversation as resolved.
Show resolved Hide resolved
return output_fname


Expand Down Expand Up @@ -152,6 +202,7 @@ def main():

parser.add_argument("--debug", help="Write all debug output to pan_validator_debug_YYMMDD_HHMMSS.log", action='store_true')
parser.add_argument("--limit", help="Limit processing to the first N rules (useful for debugging)", type=int)
parser.add_argument("--output", help="Type File Output (text, json), default = text", type=str)
Nawtest marked this conversation as resolved.
Show resolved Hide resolved
parsed_args = parser.parse_args()

configure_logging(parsed_args.debug, not parsed_args.quiet)
Expand Down Expand Up @@ -182,6 +233,7 @@ def main():
start_time = time.time()
profilepackage = load_config_package(configuration_settings, api_key, parsed_args.device_group,
parsed_args.limit, parsed_args.xml)

if parsed_args.fixer:
fixers = {parsed_args.fixer: get_policy_fixers()[parsed_args.fixer]}
problems, total_problems = run_policy_fixers(fixers, profilepackage, output_fname)
Expand All @@ -190,9 +242,9 @@ def main():
validators = {validator: get_policy_validators()[validator] for validator in parsed_args.validator}
else:
validators = get_policy_validators()
problems, total_problems = run_policy_validators(validators, profilepackage, output_fname)

write_analyzer_output(problems, output_fname, 'text')
problems, total_problems, total_checks = run_policy_validators(validators, profilepackage, output_fname)
write_analyzer_output(problems, output_fname, profilepackage, total_checks, parsed_args.output)
end_time = time.time()

logger.info(f"Full run took {round(end_time - start_time, 2)} seconds")
Expand Down
72 changes: 72 additions & 0 deletions src/palo_alto_firewall_analyzer/scripts/pan_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging

from palo_alto_firewall_analyzer.core import Detail

def get_value(data):
if data is None:
data = ""

return data

def parsed_details(details):

detail = Detail(
policy_type=get_value(details.get('policy_type')),
policy_name=get_value(details.get('policy_name')),
device_group=get_value(details.get('device_group')),
entry_type=get_value(details.get('entry_type')),
entry_name=get_value(details.get('entry_name')),
entry_value=get_value(details.get('entry_value')),
rule_type=get_value(details.get('rule_type')),
rule_name=get_value(details.get('rule_name')),
protocol=get_value(details.get('protocol')),
port=get_value(details.get('port')),
allowed_group_profiles=get_value(details.get('allowed_group_profiles')),
group_profile_setting=get_value(details.get('group_profile_setting')),
address=get_value(details.get('address')),
fqdn=get_value(details.get('fqdn')),
ip=get_value(details.get('ip')),
ip_mask=get_value(details.get('ip_mask')),
loc=get_value(details.get('loc')),
shadowing_address_name=get_value(details.get('shadowing_address_name')),
mandated_log_profile=get_value(details.get('mandated_log_profile')),
log_setting=get_value(details.get('log_setting')),
object_entry_name=get_value(details.get('object_entry_name')),
policy_entry_name=get_value(details.get('policy_entry_name')),
zone_type=get_value(details.get('zone_type')),
zones=get_value(details.get('zones')),
extra=get_value(details.get('extra'))
)

return detail


def get_json_detail(detail):
json_detail={
"policy_type":detail.policy_type,
"policy_name":detail.policy_name,
"device_group":detail.device_group,
"entry_type":detail.entry_type,
"entry_name":detail.entry_name,
"rule_type":detail.rule_type,
"rule_name":detail.rule_name,
"protocol":detail.protocol,
"port":detail.port,
"allowed_group_profiles":detail.allowed_group_profiles,
"group_profile_setting": detail.group_profile_setting,
"address": detail.address,
"fqdn": detail.fqdn,
"ip":detail.ip,
"ip_mask":detail.ip_mask,
"loc":detail.loc,
"shadowing_address_name":detail.shadowing_address_name,
"mandated_log_profile":detail.mandated_log_profile,
"log_setting":detail.log_setting,
"object_entry_name":detail.object_entry_name,
"policy_entry_name":detail.policy_entry_name,
"zone_type":detail.zone_type,
"zones":detail.zones,
"extra":detail.extra
}

return json_detail
2 changes: 1 addition & 1 deletion src/palo_alto_firewall_analyzer/validators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from . import misleading_objects
from . import redundant_rule_members
from . import rules_missing_security_profile
from . import shadowing_addresses_and_groups
from . import shadowing_addresses_and_groups
from . import shadowing_rules
from . import shadowing_services_and_groups
from . import similar_objects
Expand Down
29 changes: 20 additions & 9 deletions src/palo_alto_firewall_analyzer/validators/bad_group_profile.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import logging

from palo_alto_firewall_analyzer.core import register_policy_validator, BadEntry
from palo_alto_firewall_analyzer.scripts.pan_details import parsed_details

logger = logging.getLogger(__name__)

@register_policy_validator("BadGroupProfile", "Rule uses an incorrect group profile")
def find_bad_group_profile_setting(profilepackage):
device_groups = profilepackage.device_groups
devicegroup_exclusive_objects = profilepackage.devicegroup_exclusive_objects

devicegroup_exclusive_objects = profilepackage.devicegroup_exclusive_objects
count_checks=0

if not profilepackage.settings.get('Allowed Group Profiles'):
logger.debug("Allowed Group Profiles are not set; skipping")
return []
return [],count_checks

allowed_group_profiles = profilepackage.settings.get('Allowed Group Profiles').split(',')

badentries = []

logger.info("*"*80)
logger.info("Checking for incorrect group profile")

for i, device_group in enumerate(device_groups):
for ruletype in ('SecurityPreRules', 'SecurityPostRules'):
rules = devicegroup_exclusive_objects[device_group][ruletype]
Expand All @@ -37,10 +39,19 @@ def find_bad_group_profile_setting(profilepackage):
else:
group_profile_setting = ""

if group_profile_setting not in allowed_group_profiles:
if group_profile_setting not in allowed_group_profiles:
text = f"Device Group {device_group}'s {ruletype} '{rule_name}' doesn't use an approved group " \
f"profile '{allowed_group_profiles}', instead it uses '{group_profile_setting}' "
logger.debug(text)
badentries.append( BadEntry(data=entry, text=text, device_group=device_group, entry_type=ruletype) )

return badentries
detail={
"entry_type":ruletype,
"device_group":device_group,
"rule_type":ruletype,
"rule_name":rule_name,
"allowed_group_profiles":allowed_group_profiles,
"group_profile_setting":group_profile_setting
}
badentries.append( BadEntry(data=entry, text=text, device_group=device_group, entry_type=ruletype, Detail=parsed_details(detail)) )
count_checks+=1
print(count_checks)
return badentries, count_checks
Loading