diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c1a9f2..1188ac1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,5 +45,5 @@ jobs: - name: "Run crs linter tests for ${{ matrix.crs-version }}" run: | curl -SLs https://github.com/coreruleset/coreruleset/archive/refs/tags/v${{ matrix.crs-version }}.tar.gz -o - | \ - tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" --wildcards "*/util/APPROVED_TAGS" - uv run crs-linter --output=github -r crs-setup.conf.example -d . -r 'rules/*.conf' -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} + tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" + uv run crs-linter -o github -d . -r crs-setup.conf.example -r 'rules/*.conf' -t APPROVED_TAGS -v ${{ matrix.crs-version }} diff --git a/pyproject.toml b/pyproject.toml index d63c53d..66f3773 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ packages = [ dependencies = [ "msc_pyparser >=1.2.1", "dulwich (>=0.22.7,<0.23.0)", - "semver (>=3.0.2,<4.0.0)" + "semver (>=3.0.2,<4.0.0)", + "github-action-utils>=1.1.0,<2.0.0", ] [project.scripts] diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index ca49a84..0eec9d2 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -11,40 +11,7 @@ from semver import Version from crs_linter.linter import Check - -oformat = "native" - -logger = logging.getLogger(__name__) - -def errmsg(msg): - if oformat == "github": - print("::error::%s" % (msg)) - else: - print(msg) - - -def errmsgf(msg): - if oformat == "github": - if 'message' in msg and msg['message'].strip() != "": - print("::error%sfile={file},line={line},endLine={endLine},title={title}:: {message}".format(**msg) % ( - msg['indent'] * " ")) - else: - print("::error%sfile={file},line={line},endLine={endLine},title={title}::".format(**msg) % ( - msg['indent'] * " ")) - else: - if 'message' in msg and msg['message'].strip() != "": - print("%sfile={file}, line={line}, endLine={endLine}, title={title}: {message}".format(**msg) % ( - msg['indent'] * " ")) - else: - print("%sfile={file}, line={line}, endLine={endLine}, title={title}".format(**msg) % (msg['indent'] * " ")) - - -def msg(msg): - if oformat == "github": - print("::debug::%s" % (msg)) - else: - print(msg) - +from crs_linter.logger import Logger def remove_comments(data): """ @@ -127,6 +94,7 @@ def generate_version_string(directory): return f"OWASP_CRS/{version}" + def get_tags_from_file(filename): try: with open(filename, "r") as fp: @@ -134,11 +102,12 @@ def get_tags_from_file(filename): # remove empty items, if any tags = list(filter(lambda x: len(x) > 0, tags)) except: - errmsg(f"Can't open tags list: {filename}") + logger.error(f"Can't open tags list: {filename}") sys.exit(1) return tags + def get_crs_version(directory, version=None): crs_version = "" if version is None: @@ -152,6 +121,7 @@ def get_crs_version(directory, version=None): return crs_version + def check_indentation(filename, content): error = False @@ -163,7 +133,7 @@ def check_indentation(filename, content): from_lines = remove_comments("".join(from_lines)).split("\n") from_lines = [l + "\n" for l in from_lines] except: - errmsg(" Can't open file for indent check: %s" % (f)) + logger.error(" Can't open file for indent check: %s" % (f)) error = True # virtual output @@ -183,9 +153,9 @@ def check_indentation(filename, content): diff = difflib.unified_diff(from_lines, output) if from_lines == output: - msg(" Indentation check ok.") + logger.debug(" Indentation check ok.") else: - errmsg(" Indentation check found error(s)") + logger.error(" Indentation check found error(s)") error = True for d in diff: @@ -202,11 +172,14 @@ def check_indentation(filename, content): 'message': "an indentation error has found" } errmsgf(e) - errmsg(d.strip("\n")) + logger.error(d.strip("\n")) return error + def read_files(filenames): + global logger + parsed = {} for f in filenames: try: @@ -216,7 +189,7 @@ def read_files(filenames): if f.startswith("crs-setup.conf.example"): data = remove_comments(data) except: - errmsg("Can't open file: %s" % f) + logger.error(f"Can't open file: {f}") sys.exit(1) ### check file syntax @@ -224,7 +197,7 @@ def read_files(filenames): try: mparser = msc_pyparser.MSCParser() mparser.parser.parse(data) - logger.info("Parsing OK") + logger.debug("Parsing OK") parsed[f] = mparser.configlines except Exception as e: err = e.args[1] @@ -232,24 +205,20 @@ def read_files(filenames): cause = "Lexer" else: cause = "Parser" - errmsg("Can't parse config file: %s" % (f)) - errmsgf({ - 'indent': 2, - 'file': f, - 'title': "%s error" % (cause), - 'line': err['line'], - 'endLine': err['line'], - 'message': "can't parse file"}) + logger.error(f"Can't parse config file: {f}", title=f"{cause} error", file=f, line=err['line'], endLine=err['line']) retval = 1 continue return parsed + def parse_args(argv): parser = argparse.ArgumentParser(prog="crs-linter", description="CRS Rules Check tool") - parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False) + parser.add_argument("-o", "--output", dest="output", default="native", + help="Output format", choices=["native", "github"], required=False) parser.add_argument("-d", "--directory", dest="directory", default=pathlib.Path("."), type=pathlib.Path, help='Directory path to CRS git repository', required=True) + parser.add_argument("--debug", dest="debug", help="Show debug information.", action='store_true') parser.add_argument("-r", "--rules", type=str, dest="crs_rules", help='CRS rules file to check. Can be used multiple times.', action='append', required=True) parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True) @@ -258,6 +227,7 @@ def parse_args(argv): return parser.parse_args(argv) def main(): + global logger retval = 0 args = parse_args(sys.argv[1:]) @@ -265,11 +235,7 @@ def main(): for r in args.crs_rules: files.extend(glob.glob(r)) - if args.output is not None: - if args.output not in ["native", "github"]: - print("--output can be one of the 'native' or 'github'. Default value is 'native'") - sys.exit(1) - oformat = args.output + logger = Logger(output=args.output, debug=args.debug) crs_version = get_crs_version(args.directory, args.version) tags = get_tags_from_file(args.tagslist) @@ -278,32 +244,25 @@ def main(): logger.info("Checking parsed rules...") for f in parsed.keys(): - msg(f) + logger.debug(f) c = Check(parsed[f], f, txvars) ### check case usings c.check_ignore_case() if len(c.caseerror) == 0: - logger.info(" Ignore case check ok.") + logger.info("Ignore case check ok.") else: - errmsg(" Ignore case check found error(s)") + logger.error("Ignore case check found error(s)") for a in c.caseerror: - a['indent'] = 2 - a['file'] = f - a['title'] = "Case check" - errmsgf(a) + logger.error(file=f, title="Case check") ### check action's order c.check_action_order() if len(c.orderacts) == 0: - msg(" Action order check ok.") + logger.debug("Action order check ok.") else: - errmsg(" Action order check found error(s)") for a in c.orderacts: - a['indent'] = 2 - a['file'] = f - a['title'] = 'Action order check' - errmsgf(a) + logger.error("Action order check found error(s)", file=f, title='Action order check') error = check_indentation(f, parsed[f]) if error: @@ -312,14 +271,11 @@ def main(): ### check `ctl:auditLogParts=+E` right place in chained rules c.check_ctl_audit_log() if len(c.auditlogparts) == 0: - msg(" no 'ctl:auditLogParts' action found.") + logger.debug(" no 'ctl:auditLogParts' action found.") else: - errmsg(" Found 'ctl:auditLogParts' action") + logger.error() for a in c.auditlogparts: - a['indent'] = 2 - a['file'] = f - a['title'] = "'ctl:auditLogParts' isn't allowed in CRS" - errmsgf(a) + logger.error("Found 'ctl:auditLogParts' action", file=f, title="'ctl:auditLogParts' isn't allowed in CRS") ### collect TX variables # this method collects the TX variables, which set via a @@ -330,129 +286,84 @@ def main(): ### check duplicate ID's # c.dupes filled during the tx variable collected if len(c.dupes) == 0: - msg(" no duplicate id's") + logger.debug("No duplicate id's") else: - errmsg(" Found duplicated id('s)") - for a in c.dupes: - a['indent'] = 2 - a['file'] = f - a['title'] = "'id' is duplicated" - errmsgf(a) + logger.error("Found duplicated id('s)", file=f, title="'id' is duplicated") ### check PL consistency c.check_pl_consistency() if len(c.pltags) == 0: - msg(" paranoia-level tags are correct.") + logger.debug("Paranoia-level tags are correct.") else: - errmsg(" Found incorrect paranoia-level/N tag(s)") for a in c.pltags: - a['indent'] = 2 - a['file'] = f - a['title'] = "wrong or missing paranoia-level/N tag" - errmsgf(a) + logger.error("Found incorrect paranoia-level/N tag(s)", file=f, title="wrong or missing paranoia-level/N tag") if len(c.plscores) == 0: - msg(" PL anomaly_scores are correct.") + logger.debug("PL anomaly_scores are correct.") else: - errmsg(" Found incorrect (inbound|outbout)_anomaly_score value(s)") for a in c.plscores: - a['indent'] = 2 - a['file'] = f - a['title'] = "wrong (inbound|outbout)_anomaly_score variable or value" - errmsgf(a) + logger.error(" Found incorrect (inbound|outbout)_anomaly_score value(s)", file=f, title="wrong (inbound|outbout)_anomaly_score variable or value") ### check existence of used TX variables c.check_tx_variable() if len(c.undef_txvars) == 0: - msg(" All TX variables are set.") + logger.debug("All TX variables are set.") else: - errmsg(" There are one or more unset TX variables.") for a in c.undef_txvars: - a['indent'] = 2 - a['file'] = f - a['title'] = "unset TX variable" - errmsgf(a) + logger.error("There are one or more unset TX variables.", file=f, title="unset TX variable") ### check new unlisted tags c.check_tags(tags) if len(c.newtags) == 0: - msg(" No new tags added.") + logger.debug("No new tags added.") else: - errmsg(" There are one or more new tag(s).") - for a in c.newtags: - a['indent'] = 2 - a['file'] = f - a['title'] = "new unlisted tag" - errmsgf(a) + logger.error(" There are one or more new tag(s).", file=f, title="new unlisted tag") ### check for t:lowercase in combination with (?i) in regex c.check_lowercase_ignorecase() if len(c.ignorecase) == 0: - msg(" No t:lowercase and (?i) flag used.") + logger.debug("No t:lowercase and (?i) flag used.") else: - errmsg(" There are one or more combinations of t:lowercase and (?i) flag.") - for a in c.ignorecase: - a['indent'] = 2 - a['file'] = f - a['title'] = "t:lowercase and (?i)" - errmsgf(a) + logger.error("There are one or more combinations of t:lowercase and (?i) flag", file=f, title="t:lowercase and (?i)") ### check for tag:'OWASP_CRS' c.check_crs_tag() if len(c.nocrstags) == 0: - msg(" No rule without OWASP_CRS tag.") + logger.debug("No rule without OWASP_CRS tag.") else: - errmsg(" There are one or more rules without OWASP_CRS tag.") - for a in c.nocrstags: - a['indent'] = 2 - a['file'] = f - a['title'] = "tag:OWASP_CRS is missing" - errmsgf(a) + logger.error("There are one or more rules without OWASP_CRS tag", file=f, title="tag:OWASP_CRS is missing") ### check for ver action c.check_ver_action(crs_version) if len(c.noveract) == 0: - msg(" No rule without correct ver action.") + logger.debug("No rule without correct ver action.") else: - errmsg(" There are one or more rules without ver action.") - for a in c.noveract: - a['indent'] = 2 - a['file'] = f - a['title'] = "ver is missing / incorrect" - errmsgf(a) + logger.error("There are one or more rules without ver action.", file=f, title="ver is missing / incorrect") c.check_capture_action() if len(c.nocaptact) == 0: - msg(" No rule uses TX.N without capture action.") + logger.debug("No rule uses TX.N without capture action.") else: - errmsg(" There are one or more rules using TX.N without capture action.") - for a in c.nocaptact: - a['indent'] = 2 - a['file'] = f - a['title'] = "capture is missing" - errmsgf(a) + logger.error("There are one or more rules using TX.N without capture action.", file=f, title="capture is missing") # set it once if there is an error if c.is_error(): retval = 1 - logger.info("End of checking parsed rules") + logger.debug("End of checking parsed rules") - logger.info("Cumulated report about unused TX variables") + logger.debug("Cumulated report about unused TX variables") has_unused = False for tk in txvars: if txvars[tk]['used'] == False: if has_unused == False: - msg(" Unused TX variable(s):") + logger.debug("Unused TX variable(s):") a = txvars[tk] - a['indent'] = 2 - a['title'] = "unused TX variable" - a['message'] = "unused variable: %s" % (tk) - errmsgf(a) + logger.error(f"unused variable: {tk}", title="unused TX variable") has_unused = True if not has_unused: - logger.info(" No unused TX variable") + logger.debug("No unused TX variable") return retval diff --git a/src/crs_linter/logger.py b/src/crs_linter/logger.py new file mode 100644 index 0000000..dfa3577 --- /dev/null +++ b/src/crs_linter/logger.py @@ -0,0 +1,43 @@ +import logging + +import github_action_utils as gha_utils + + +class Logger: + def __init__(self, output="native", debug=False): + self.output = output + self.debugging = debug + level = logging.INFO + if self.output == "native": + self.logger = logging.getLogger() + if self.debugging: + level = logging.DEBUG + self.logger.setLevel(level) + else: + self.logger = gha_utils + + def debug(self, *args, **kwargs): + if self.debugging: + if self.output == "native": + self.logger.debug(*args) + else: + self.logger.debug(*args, **kwargs) + + pass + + def error(self, *args, **kwargs): + if self.output == "native": + self.logger.error(*args) + else: + self.logger.error(*args, **kwargs) + + def warning(self, *args, **kwargs): + if self.output == "native": + self.logger.warning(*args) + else: + self.logger.warning(*args, **kwargs) + + def info(self, *args, **kwargs): + if self.output == "native": + self.logger.info(*args, **kwargs) +