Skip to content

Commit

Permalink
fix: logger
Browse files Browse the repository at this point in the history
Signed-off-by: Felipe Zipitria <felipe.zipitria@owasp.org>
  • Loading branch information
fzipi committed Feb 3, 2025
1 parent 13863c3 commit 8b1991a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 144 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ jobs:
- name: "Run crs linter tests for ${{ matrix.crs-version }}"
run: |
curl -SLs https://github.com/coreruleset/coreruleset/archive/refs/tags/v${{ matrix.crs-version }}.tar.gz -o - | \
tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" --wildcards "*/util/APPROVED_TAGS"
uv run crs-linter --output=github -r crs-setup.conf.example -d . -r 'rules/*.conf' -t util/APPROVED_TAGS -v ${{ matrix.crs-version }}
tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example"
uv run crs-linter -o github -d . -r crs-setup.conf.example -r 'rules/*.conf' -t APPROVED_TAGS -v ${{ matrix.crs-version }}
195 changes: 53 additions & 142 deletions src/crs_linter/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,7 @@
from semver import Version

from crs_linter.linter import Check

oformat = "native"

logger = logging.getLogger(__name__)

def errmsg(msg):
if oformat == "github":
print("::error::%s" % (msg))
else:
print(msg)


def errmsgf(msg):
if oformat == "github":
if 'message' in msg and msg['message'].strip() != "":
print("::error%sfile={file},line={line},endLine={endLine},title={title}:: {message}".format(**msg) % (
msg['indent'] * " "))
else:
print("::error%sfile={file},line={line},endLine={endLine},title={title}::".format(**msg) % (
msg['indent'] * " "))
else:
if 'message' in msg and msg['message'].strip() != "":
print("%sfile={file}, line={line}, endLine={endLine}, title={title}: {message}".format(**msg) % (
msg['indent'] * " "))
else:
print("%sfile={file}, line={line}, endLine={endLine}, title={title}".format(**msg) % (msg['indent'] * " "))


def msg(msg):
if oformat == "github":
print("::debug::%s" % (msg))
else:
print(msg)

from crs_linter.logger import Logger

def remove_comments(data):
"""
Expand Down Expand Up @@ -127,18 +94,20 @@ def generate_version_string(directory):

return f"OWASP_CRS/{version}"


def get_tags_from_file(filename):
try:
with open(filename, "r") as fp:
tags = [l.strip() for l in fp.readlines()]
# remove empty items, if any
tags = list(filter(lambda x: len(x) > 0, tags))
except:
errmsg(f"Can't open tags list: {filename}")
logger.error(f"Can't open tags list: {filename}")
sys.exit(1)

return tags


def get_crs_version(directory, version=None):
crs_version = ""
if version is None:
Expand All @@ -152,6 +121,7 @@ def get_crs_version(directory, version=None):

return crs_version


def check_indentation(filename, content):
error = False

Expand All @@ -163,7 +133,7 @@ def check_indentation(filename, content):
from_lines = remove_comments("".join(from_lines)).split("\n")
from_lines = [l + "\n" for l in from_lines]
except:
errmsg(" Can't open file for indent check: %s" % (f))
logger.error(" Can't open file for indent check: %s" % (f))
error = True

# virtual output
Expand All @@ -183,9 +153,9 @@ def check_indentation(filename, content):

diff = difflib.unified_diff(from_lines, output)
if from_lines == output:
msg(" Indentation check ok.")
logger.debug(" Indentation check ok.")
else:
errmsg(" Indentation check found error(s)")
logger.error(" Indentation check found error(s)")
error = True

for d in diff:
Expand All @@ -202,11 +172,14 @@ def check_indentation(filename, content):
'message': "an indentation error has found"
}
errmsgf(e)
errmsg(d.strip("\n"))
logger.error(d.strip("\n"))

return error


def read_files(filenames):
global logger

parsed = {}
for f in filenames:
try:
Expand All @@ -216,40 +189,36 @@ def read_files(filenames):
if f.startswith("crs-setup.conf.example"):
data = remove_comments(data)
except:
errmsg("Can't open file: %s" % f)
logger.error(f"Can't open file: {f}")
sys.exit(1)

### check file syntax
logger.info(f"Config file: {f}")
try:
mparser = msc_pyparser.MSCParser()
mparser.parser.parse(data)
logger.info("Parsing OK")
logger.debug("Parsing OK")
parsed[f] = mparser.configlines
except Exception as e:
err = e.args[1]
if err['cause'] == "lexer":
cause = "Lexer"
else:
cause = "Parser"
errmsg("Can't parse config file: %s" % (f))
errmsgf({
'indent': 2,
'file': f,
'title': "%s error" % (cause),
'line': err['line'],
'endLine': err['line'],
'message': "can't parse file"})
logger.error(f"Can't parse config file: {f}", title=f"{cause} error", file=f, line=err['line'], endLine=err['line'])
retval = 1
continue

return parsed


def parse_args(argv):
parser = argparse.ArgumentParser(prog="crs-linter", description="CRS Rules Check tool")
parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False)
parser.add_argument("-o", "--output", dest="output", default="native",
help="Output format", choices=["native", "github"], required=False)
parser.add_argument("-d", "--directory", dest="directory", default=pathlib.Path("."), type=pathlib.Path,
help='Directory path to CRS git repository', required=True)
parser.add_argument("--debug", dest="debug", help="Show debug information.", action='store_true')
parser.add_argument("-r", "--rules", type=str, dest="crs_rules",
help='CRS rules file to check. Can be used multiple times.', action='append', required=True)
parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True)
Expand All @@ -258,18 +227,15 @@ def parse_args(argv):
return parser.parse_args(argv)

def main():
global logger
retval = 0
args = parse_args(sys.argv[1:])

files = []
for r in args.crs_rules:
files.extend(glob.glob(r))

if args.output is not None:
if args.output not in ["native", "github"]:
print("--output can be one of the 'native' or 'github'. Default value is 'native'")
sys.exit(1)
oformat = args.output
logger = Logger(output=args.output, debug=args.debug)

crs_version = get_crs_version(args.directory, args.version)
tags = get_tags_from_file(args.tagslist)
Expand All @@ -278,32 +244,25 @@ def main():

logger.info("Checking parsed rules...")
for f in parsed.keys():
msg(f)
logger.debug(f)
c = Check(parsed[f], f, txvars)

### check case usings
c.check_ignore_case()
if len(c.caseerror) == 0:
logger.info(" Ignore case check ok.")
logger.info("Ignore case check ok.")
else:
errmsg(" Ignore case check found error(s)")
logger.error("Ignore case check found error(s)")
for a in c.caseerror:
a['indent'] = 2
a['file'] = f
a['title'] = "Case check"
errmsgf(a)
logger.error(file=f, title="Case check")

### check action's order
c.check_action_order()
if len(c.orderacts) == 0:
msg(" Action order check ok.")
logger.debug("Action order check ok.")
else:
errmsg(" Action order check found error(s)")
for a in c.orderacts:
a['indent'] = 2
a['file'] = f
a['title'] = 'Action order check'
errmsgf(a)
logger.error("Action order check found error(s)", file=f, title='Action order check')

error = check_indentation(f, parsed[f])
if error:
Expand All @@ -312,14 +271,11 @@ def main():
### check `ctl:auditLogParts=+E` right place in chained rules
c.check_ctl_audit_log()
if len(c.auditlogparts) == 0:
msg(" no 'ctl:auditLogParts' action found.")
logger.debug(" no 'ctl:auditLogParts' action found.")
else:
errmsg(" Found 'ctl:auditLogParts' action")
logger.error()
for a in c.auditlogparts:
a['indent'] = 2
a['file'] = f
a['title'] = "'ctl:auditLogParts' isn't allowed in CRS"
errmsgf(a)
logger.error("Found 'ctl:auditLogParts' action", file=f, title="'ctl:auditLogParts' isn't allowed in CRS")

### collect TX variables
# this method collects the TX variables, which set via a
Expand All @@ -330,129 +286,84 @@ def main():
### check duplicate ID's
# c.dupes filled during the tx variable collected
if len(c.dupes) == 0:
msg(" no duplicate id's")
logger.debug("No duplicate id's")
else:
errmsg(" Found duplicated id('s)")
for a in c.dupes:
a['indent'] = 2
a['file'] = f
a['title'] = "'id' is duplicated"
errmsgf(a)
logger.error("Found duplicated id('s)", file=f, title="'id' is duplicated")

### check PL consistency
c.check_pl_consistency()
if len(c.pltags) == 0:
msg(" paranoia-level tags are correct.")
logger.debug("Paranoia-level tags are correct.")
else:
errmsg(" Found incorrect paranoia-level/N tag(s)")
for a in c.pltags:
a['indent'] = 2
a['file'] = f
a['title'] = "wrong or missing paranoia-level/N tag"
errmsgf(a)
logger.error("Found incorrect paranoia-level/N tag(s)", file=f, title="wrong or missing paranoia-level/N tag")

if len(c.plscores) == 0:
msg(" PL anomaly_scores are correct.")
logger.debug("PL anomaly_scores are correct.")
else:
errmsg(" Found incorrect (inbound|outbout)_anomaly_score value(s)")
for a in c.plscores:
a['indent'] = 2
a['file'] = f
a['title'] = "wrong (inbound|outbout)_anomaly_score variable or value"
errmsgf(a)
logger.error(" Found incorrect (inbound|outbout)_anomaly_score value(s)", file=f, title="wrong (inbound|outbout)_anomaly_score variable or value")

### check existence of used TX variables
c.check_tx_variable()
if len(c.undef_txvars) == 0:
msg(" All TX variables are set.")
logger.debug("All TX variables are set.")
else:
errmsg(" There are one or more unset TX variables.")
for a in c.undef_txvars:
a['indent'] = 2
a['file'] = f
a['title'] = "unset TX variable"
errmsgf(a)
logger.error("There are one or more unset TX variables.", file=f, title="unset TX variable")

### check new unlisted tags
c.check_tags(tags)
if len(c.newtags) == 0:
msg(" No new tags added.")
logger.debug("No new tags added.")
else:
errmsg(" There are one or more new tag(s).")
for a in c.newtags:
a['indent'] = 2
a['file'] = f
a['title'] = "new unlisted tag"
errmsgf(a)
logger.error(" There are one or more new tag(s).", file=f, title="new unlisted tag")

### check for t:lowercase in combination with (?i) in regex
c.check_lowercase_ignorecase()
if len(c.ignorecase) == 0:
msg(" No t:lowercase and (?i) flag used.")
logger.debug("No t:lowercase and (?i) flag used.")
else:
errmsg(" There are one or more combinations of t:lowercase and (?i) flag.")
for a in c.ignorecase:
a['indent'] = 2
a['file'] = f
a['title'] = "t:lowercase and (?i)"
errmsgf(a)
logger.error("There are one or more combinations of t:lowercase and (?i) flag", file=f, title="t:lowercase and (?i)")

### check for tag:'OWASP_CRS'
c.check_crs_tag()
if len(c.nocrstags) == 0:
msg(" No rule without OWASP_CRS tag.")
logger.debug("No rule without OWASP_CRS tag.")
else:
errmsg(" There are one or more rules without OWASP_CRS tag.")
for a in c.nocrstags:
a['indent'] = 2
a['file'] = f
a['title'] = "tag:OWASP_CRS is missing"
errmsgf(a)
logger.error("There are one or more rules without OWASP_CRS tag", file=f, title="tag:OWASP_CRS is missing")

### check for ver action
c.check_ver_action(crs_version)
if len(c.noveract) == 0:
msg(" No rule without correct ver action.")
logger.debug("No rule without correct ver action.")
else:
errmsg(" There are one or more rules without ver action.")
for a in c.noveract:
a['indent'] = 2
a['file'] = f
a['title'] = "ver is missing / incorrect"
errmsgf(a)
logger.error("There are one or more rules without ver action.", file=f, title="ver is missing / incorrect")

c.check_capture_action()
if len(c.nocaptact) == 0:
msg(" No rule uses TX.N without capture action.")
logger.debug("No rule uses TX.N without capture action.")
else:
errmsg(" There are one or more rules using TX.N without capture action.")
for a in c.nocaptact:
a['indent'] = 2
a['file'] = f
a['title'] = "capture is missing"
errmsgf(a)
logger.error("There are one or more rules using TX.N without capture action.", file=f, title="capture is missing")

# set it once if there is an error
if c.is_error():
retval = 1

logger.info("End of checking parsed rules")
logger.debug("End of checking parsed rules")

logger.info("Cumulated report about unused TX variables")
logger.debug("Cumulated report about unused TX variables")
has_unused = False
for tk in txvars:
if txvars[tk]['used'] == False:
if has_unused == False:
msg(" Unused TX variable(s):")
logger.debug("Unused TX variable(s):")
a = txvars[tk]
a['indent'] = 2
a['title'] = "unused TX variable"
a['message'] = "unused variable: %s" % (tk)
errmsgf(a)
logger.error(f"unused variable: {tk}", title="unused TX variable")
has_unused = True

if not has_unused:
logger.info(" No unused TX variable")
logger.debug("No unused TX variable")

return retval

Expand Down
Loading

0 comments on commit 8b1991a

Please sign in to comment.