Skip to content

Commit

Permalink
Merge pull request #334 from jjnicola/alive-test
Browse files Browse the repository at this point in the history
Apply alive test preferences if one method was selected.
  • Loading branch information
jjnicola authored Sep 10, 2020
2 parents 90746aa + eed0626 commit 9e04941
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed
- Fix nvticache name when for stable version from sources. [#317](https://github.com/greenbone/ospd-openvas/pull/317)
- Fix stop scan during preferences handling, before spawining OpenVAS. [#332](https://github.com/greenbone/ospd-openvas/pull/332)
- Fix alive test preferences when a non default method is selected. [#334](https://github.com/greenbone/ospd-openvas/pull/334)

[20.8.1]: https://github.com/greenbone/ospd-openvas/compare/v20.8.0...ospd-openvas-20.08

Expand Down
5 changes: 5 additions & 0 deletions ospd_openvas/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,11 @@ def exec_scan(self, scan_id: str):
scan_prefs.prepare_scan_params_for_openvas(OSPD_PARAMS)
scan_prefs.prepare_reverse_lookup_opt_for_openvas()
scan_prefs.prepare_alive_test_option_for_openvas()

# VT preferences are stored after all preferences have been processed,
# since alive tests preferences have to be able to overwrite default
# preferences of ping_host.nasl for the classic method.
scan_prefs.prepare_nvt_preferences()
scan_prefs.prepare_boreas_alive_test()

# Release memory used for scan preferences.
Expand Down
86 changes: 39 additions & 47 deletions ospd_openvas/preferencehandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(
self._openvas_scan_id = None

self._target_options = None
self._nvts_params = None

self.nvti = nvticache

Expand Down Expand Up @@ -255,32 +256,33 @@ def _process_vts(
return vts_list, vts_params

def prepare_plugins_for_openvas(self) -> bool:
"""Get the plugin list to be launched from the Scan Collection
and prepare the vts preferences. Store the data in the kb.
"""Get the plugin list and it preferences from the Scan Collection.
The plugin list is inmediately stored in the kb.
"""
nvts = self.scan_collection.get_vts(self.scan_id)
if nvts:
nvts_list, nvts_params = self._process_vts(nvts)
nvts_list, self._nvts_params = self._process_vts(nvts)
# Add nvts list
separ = ';'
plugin_list = 'plugin_set|||%s' % separ.join(nvts_list)
self.kbdb.add_scan_preferences(self._openvas_scan_id, [plugin_list])

# Add nvts parameters
for key, val in nvts_params.items():
item = '%s|||%s' % (key, val)
self.kbdb.add_scan_preferences(self._openvas_scan_id, [item])

nvts_params = None
nvts_list = None
item = None
plugin_list = None
nvts = None

return True

return False

def prepare_nvt_preferences(self):
"""Prepare the vts preferences. Store the data in the kb."""

items_list = []
for key, val in self._nvts_params.items():
items_list.append('%s|||%s' % (key, val))
self.kbdb.add_scan_preferences(self._openvas_scan_id, items_list)

@staticmethod
def build_alive_test_opt_as_prefs(
target_options: Dict[str, str]
Expand All @@ -293,7 +295,8 @@ def build_alive_test_opt_as_prefs(
A list with the target options related to alive test method
in string format to be added to the redis KB.
"""
target_opt_prefs_list = []
target_opt_prefs_list = {}

if target_options and target_options.get('alive_test'):
try:
alive_test = int(target_options.get('alive_test'))
Expand All @@ -305,6 +308,8 @@ def build_alive_test_opt_as_prefs(
)
return target_opt_prefs_list

# No alive test or wrong value, uses the default
# preferences sent by the client.
if alive_test < 1 or alive_test > 31:
return target_opt_prefs_list

Expand All @@ -315,12 +320,9 @@ def build_alive_test_opt_as_prefs(
value = "yes"
else:
value = "no"
target_opt_prefs_list.append(
OID_PING_HOST
+ ':1:checkbox:'
+ 'Do a TCP ping|||'
+ '{0}'.format(value)
)
target_opt_prefs_list[
OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
] = value

if (
alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE
Expand All @@ -329,64 +331,56 @@ def build_alive_test_opt_as_prefs(
value = "yes"
else:
value = "no"
target_opt_prefs_list.append(
target_opt_prefs_list[
OID_PING_HOST
+ ':2:checkbox:'
+ 'TCP ping tries also TCP-SYN ping|||'
+ '{0}'.format(value)
)
+ 'TCP ping tries also TCP-SYN ping'
] = value

if (alive_test & AliveTest.ALIVE_TEST_TCP_SYN_SERVICE) and not (
alive_test & AliveTest.ALIVE_TEST_TCP_ACK_SERVICE
):
value = "yes"
else:
value = "no"
target_opt_prefs_list.append(
target_opt_prefs_list[
OID_PING_HOST
+ ':7:checkbox:'
+ 'TCP ping tries only TCP-SYN ping|||'
+ '{0}'.format(value)
)
+ 'TCP ping tries only TCP-SYN ping'
] = value

if alive_test & AliveTest.ALIVE_TEST_ICMP:
value = "yes"
else:
value = "no"
target_opt_prefs_list.append(
OID_PING_HOST
+ ':3:checkbox:'
+ 'Do an ICMP ping|||'
+ '{0}'.format(value)
)
target_opt_prefs_list[
OID_PING_HOST + ':3:checkbox:' + 'Do an ICMP ping'
] = value

if alive_test & AliveTest.ALIVE_TEST_ARP:
value = "yes"
else:
value = "no"
target_opt_prefs_list.append(
OID_PING_HOST
+ ':4:checkbox:'
+ 'Use ARP|||'
+ '{0}'.format(value)
)
target_opt_prefs_list[
OID_PING_HOST + ':4:checkbox:' + 'Use ARP'
] = value

if alive_test & AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
value = "no"
else:
value = "yes"
target_opt_prefs_list.append(
target_opt_prefs_list[
OID_PING_HOST
+ ':5:checkbox:'
+ 'Mark unrechable Hosts as dead (not scanning)|||'
+ '{0}'.format(value)
)
+ 'Mark unrechable Hosts as dead (not scanning)'
] = value

# Also select a method, otherwise Ping Host logs a warning.
if alive_test == AliveTest.ALIVE_TEST_CONSIDER_ALIVE:
target_opt_prefs_list.append(
OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping|||yes'
)
target_opt_prefs_list[
OID_PING_HOST + ':1:checkbox:' + 'Do a TCP ping'
] = 'yes'

return target_opt_prefs_list

def prepare_alive_test_option_for_openvas(self):
Expand All @@ -396,9 +390,7 @@ def prepare_alive_test_option_for_openvas(self):
alive_test_opt = self.build_alive_test_opt_as_prefs(
self.target_options
)
self.kbdb.add_scan_preferences(
self._openvas_scan_id, alive_test_opt
)
self._nvts_params.update(alive_test_opt)

def prepare_boreas_alive_test(self):
"""Set alive_test for Boreas if boreas scanner config
Expand Down
67 changes: 49 additions & 18 deletions tests/test_preferencehandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from unittest import TestCase
from unittest.mock import call, patch, Mock, MagicMock
from collections import OrderedDict

from ospd.vts import Vts

Expand Down Expand Up @@ -200,19 +201,20 @@ def test_build_alive_test_opt_empty(self):
p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
ret = p.build_alive_test_opt_as_prefs(target_options_dict)

self.assertEqual(ret, [])
self.assertEqual(ret, {})

def test_build_alive_test_opt(self):
w = DummyDaemon()

alive_test_out = [
"1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no",
"1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping|||no",
"1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping|||no",
"1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping|||yes",
"1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP|||no",
"1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)|||yes",
]
alive_test_out = {
"1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no",
"1.3.6.1.4.1.25623.1.0.100315:2:checkbox:TCP ping tries also TCP-SYN ping": "no",
"1.3.6.1.4.1.25623.1.0.100315:7:checkbox:TCP ping tries only TCP-SYN ping": "no",
"1.3.6.1.4.1.25623.1.0.100315:3:checkbox:Do an ICMP ping": "yes",
"1.3.6.1.4.1.25623.1.0.100315:4:checkbox:Use ARP": "no",
"1.3.6.1.4.1.25623.1.0.100315:5:checkbox:Mark unrechable Hosts as dead (not scanning)": "yes",
}

target_options_dict = {'alive_test': '2'}
p = PreferenceHandler('1234-1234', None, w.scan_collection, None)
ret = p.build_alive_test_opt_as_prefs(target_options_dict)
Expand Down Expand Up @@ -242,7 +244,8 @@ def test_set_target(self, mock_kb):
p.prepare_target_for_openvas()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id, ['TARGET|||192.168.0.1'],
p._openvas_scan_id,
['TARGET|||192.168.0.1'],
)

@patch('ospd_openvas.db.KbDB')
Expand All @@ -257,7 +260,8 @@ def test_set_ports(self, mock_kb):
p.prepare_ports_for_openvas()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id, ['port_range|||80,443'],
p._openvas_scan_id,
['port_range|||80,443'],
)

@patch('ospd_openvas.db.KbDB')
Expand All @@ -270,7 +274,8 @@ def test_set_main_kbindex(self, mock_kb):
p.prepare_main_kbindex_for_openvas()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id, ['ov_maindbid|||2'],
p._openvas_scan_id,
['ov_maindbid|||2'],
)

@patch('ospd_openvas.db.KbDB')
Expand Down Expand Up @@ -363,7 +368,8 @@ def test_set_host_options(self, mock_kb):
p.prepare_host_options_for_openvas()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id, ['exclude_hosts|||192.168.0.1'],
p._openvas_scan_id,
['exclude_hosts|||192.168.0.1'],
)

@patch('ospd_openvas.db.KbDB')
Expand Down Expand Up @@ -422,7 +428,10 @@ def test_set_reverse_lookup_opt(self, mock_kb):

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id,
['reverse_lookup_only|||yes', 'reverse_lookup_unify|||no',],
[
'reverse_lookup_only|||yes',
'reverse_lookup_unify|||no',
],
)

@patch('ospd_openvas.db.KbDB')
Expand Down Expand Up @@ -544,11 +553,33 @@ def test_set_alive_pinghost(self, mock_kb):

with patch.object(Openvas, 'get_settings', return_value=ov_setting):
p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)

p._nvts_params = {}
p._openvas_scan_id = '456-789'
p.kbdb.add_scan_preferences = MagicMock()
p.prepare_alive_test_option_for_openvas()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id, alive_test_out,
)
for key, value in p._nvts_params.items():
self.assertTrue(
"{0}|||{1}".format(key, value) in alive_test_out
)

@patch('ospd_openvas.db.KbDB')
def test_prepare_nvt_prefs(self, mock_kb):
w = DummyDaemon()

alive_test_out = [
"1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping|||no"
]

p = PreferenceHandler('1234-1234', mock_kb, w.scan_collection, None)
p._nvts_params = {
"1.3.6.1.4.1.25623.1.0.100315:1:checkbox:Do a TCP ping": "no"
}
p._openvas_scan_id = '456-789'
p.kbdb.add_scan_preferences = MagicMock()
p.prepare_nvt_preferences()

p.kbdb.add_scan_preferences.assert_called_with(
p._openvas_scan_id,
alive_test_out,
)

0 comments on commit 9e04941

Please sign in to comment.