forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
System-tests for auditbeat show feature
New system-tests: - check that `show` is recognised as a command. - check that `show auditd-rules` shows rules. - check that `show auditd-status` displays valid status. The last two require `sudo`, `auditctl` and access to the auditd kernel subsystem.
- Loading branch information
1 parent
8a03054
commit 33a5fa3
Showing
3 changed files
with
119 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import os | ||
import re | ||
import sys | ||
import tempfile | ||
import unittest | ||
from auditbeat import * | ||
|
||
|
||
def is_root(): | ||
if 'geteuid' not in dir(os): | ||
return False | ||
euid = os.geteuid() | ||
print("euid is", euid) | ||
return euid == 0 | ||
|
||
|
||
@unittest.skipUnless(re.match("(?i)linux", sys.platform), "Requires Linux") | ||
class Test(BaseTest): | ||
|
||
def test_show_command(self): | ||
""" | ||
show sub-command is present | ||
Runs auditbeat show --help. The process should terminate with | ||
a successful status if show is recognised. | ||
""" | ||
self.run_beat(extra_args=['show', '--help'], exit_code=0) | ||
|
||
@unittest.skipUnless(is_root(), "Requires root") | ||
def test_show_auditd_rules(self): | ||
""" | ||
show auditd-rules sub-command | ||
Set some rules and read them. | ||
""" | ||
pid = os.getpid() | ||
rules = [ | ||
'-w {0} -p w -k rule0_{1}'.format(os.path.realpath(__file__), pid), | ||
'-a always,exit -S mount -F pid={0} -F key=rule1_{0}'.format(pid), | ||
] | ||
rules_body = '|\n' + ''.join([' ' + rule + '\n' for rule in rules]) | ||
self.render_config_template( | ||
modules=[{ | ||
"name": "auditd", | ||
"extras": { | ||
"audit_rules": rules_body | ||
} | ||
}] | ||
) | ||
proc = self.start_beat(extra_args=['-strict.perms=false']) | ||
# auditbeat adds an extra rule to ignore itself | ||
self.wait_log_contains('Successfully added {0} of {0} audit rules.'.format(len(rules) + 1), | ||
max_timeout=30) | ||
proc.kill() | ||
|
||
fd, output_file = tempfile.mkstemp() | ||
self.run_beat(extra_args=['show', 'auditd-rules'], | ||
exit_code=0, | ||
output=output_file) | ||
fhandle = os.fdopen(fd, 'rb') | ||
lines = fhandle.readlines() | ||
fhandle.close() | ||
os.unlink(output_file) | ||
assert len(lines) >= len(rules) | ||
# get rid of automatic rule | ||
if '-F key=rule' not in lines[0]: | ||
del lines[0] | ||
|
||
for i in range(len(rules)): | ||
expected = rules[i] | ||
got = lines[i].strip() | ||
assert expected == got, \ | ||
"rule {0} doesn't match. expected='{1}' got='{2}'".format( | ||
i, expected, got | ||
) | ||
|
||
@unittest.skipUnless(is_root(), "Requires root") | ||
def test_show_auditd_status(self): | ||
""" | ||
show auditd-status sub-command | ||
""" | ||
expected = [ | ||
'enabled', | ||
'failure', | ||
'pid', | ||
'rate_limit', | ||
'backlog_limit', | ||
'lost', | ||
'backlog', | ||
'backlog_wait_time', | ||
'features', | ||
] | ||
|
||
fields = dict((f, False) for f in expected) | ||
|
||
fd, output_file = tempfile.mkstemp() | ||
self.run_beat(extra_args=['show', 'auditd-status'], | ||
exit_code=0, | ||
output=output_file) | ||
fhandle = os.fdopen(fd, 'rb') | ||
lines = fhandle.readlines() | ||
fhandle.close() | ||
os.unlink(output_file) | ||
|
||
for line in lines: | ||
if line == "PASS\n": | ||
break | ||
k, v = line.strip().split() | ||
assert k in fields, "Unexpected field '{0}'".format(k) | ||
assert not fields[k], "Field '{0}' repeated".format(k) | ||
n = int(v, 0) | ||
assert n >= 0, "Field '{0}' has negative value {1}".format(k, v) | ||
fields[k] = True | ||
|
||
for (k, v) in fields.iteritems(): | ||
assert v, "Field {0} not found".format(k) |