From f465608defde14e85423b4448eafba805ac43b24 Mon Sep 17 00:00:00 2001 From: Philipp Eder Date: Mon, 25 Jan 2021 07:19:00 +0100 Subject: [PATCH 1/3] test_create_paren_dirs_fail should not rely on the actual access rights When you're building a debian package within container.io definition than you tyically do that as root with access to /root/. Therefore it is more reliable to work with a mock than relying on actual access rights on a os while testing. --- tests/test_lock.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/test_lock.py b/tests/test_lock.py index 84d94efb..5f3422cc 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -21,7 +21,7 @@ import tempfile import fcntl -from pathlib import Path +from pathlib import Path, PosixPath from unittest.mock import patch, MagicMock from ospd_openvas.lock import LockFile @@ -36,7 +36,7 @@ def tearDown(self): shutil.rmtree(str(self.temp_dir)) def test_acquire_lock(self): - lock_file_path = self.temp_dir / 'test.lock' + lock_file_path = self.temp_dir / "test.lock" lock_file = LockFile(lock_file_path) lock_file._acquire_lock() @@ -45,9 +45,9 @@ def test_acquire_lock(self): self.assertTrue(lock_file_path.exists()) lock_file._release_lock() - @patch('ospd_openvas.lock.logger') + @patch("ospd_openvas.lock.logger") def test_already_locked(self, mock_logger): - lock_file_path = self.temp_dir / 'test.lock' + lock_file_path = self.temp_dir / "test.lock" lock_file_aux = LockFile(lock_file_path) lock_file_aux._acquire_lock() @@ -61,7 +61,7 @@ def test_already_locked(self, mock_logger): lock_file_aux._release_lock() def test_create_parent_dirs(self): - lock_file_path = self.temp_dir / 'foo' / 'bar' / 'test.lock' + lock_file_path = self.temp_dir / "foo" / "bar" / "test.lock" lock_file = LockFile(lock_file_path) lock_file._acquire_lock() @@ -74,9 +74,13 @@ def test_create_parent_dirs(self): lock_file._release_lock() - @patch('ospd_openvas.lock.logger') + @patch("ospd_openvas.lock.logger") def test_create_paren_dirs_fail(self, mock_logger): - lock_file_path = Path('/root/lock/file/test.lock') + lock_file_path = MagicMock(spec=Path).return_value + parent = MagicMock(spec=PosixPath) + lock_file_path.parent = parent + parent.mkdir.side_effect = PermissionError + lock_file = LockFile(lock_file_path) lock_file._acquire_lock() @@ -85,7 +89,7 @@ def test_create_paren_dirs_fail(self, mock_logger): assert_called_once(mock_logger.error) def test_context_manager(self): - lock_file_path = self.temp_dir / 'test.lock' + lock_file_path = self.temp_dir / "test.lock" lock_file = LockFile(lock_file_path) From c5b8c102281be2fc966dcab05cb24828c53fd981 Mon Sep 17 00:00:00 2001 From: Philipp Eder Date: Mon, 25 Jan 2021 08:33:28 +0100 Subject: [PATCH 2/3] set run as root to false to trigger sudo availability check When running on root test_sudo_available did not check if sudo is available, to enforce check _is_running_as_root needs to be manually set to false. --- tests/test_daemon.py | 897 ++++++++++++++++++++++--------------------- 1 file changed, 449 insertions(+), 448 deletions(-) diff --git a/tests/test_daemon.py b/tests/test_daemon.py index fed633f6..93ef41a4 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -38,347 +38,348 @@ from ospd_openvas.openvas import Openvas OSPD_PARAMS_OUT = { - 'auto_enable_dependencies': { - 'type': 'boolean', - 'name': 'auto_enable_dependencies', - 'default': 1, - 'mandatory': 1, - 'visible_for_client': True, - 'description': 'Automatically enable the plugins that are depended on', + "auto_enable_dependencies": { + "type": "boolean", + "name": "auto_enable_dependencies", + "default": 1, + "mandatory": 1, + "visible_for_client": True, + "description": "Automatically enable the plugins that are depended on", }, - 'cgi_path': { - 'type': 'string', - 'name': 'cgi_path', - 'default': '/cgi-bin:/scripts', - 'mandatory': 1, - 'visible_for_client': True, - 'description': 'Look for default CGIs in /cgi-bin and /scripts', + "cgi_path": { + "type": "string", + "name": "cgi_path", + "default": "/cgi-bin:/scripts", + "mandatory": 1, + "visible_for_client": True, + "description": "Look for default CGIs in /cgi-bin and /scripts", }, - 'checks_read_timeout': { - 'type': 'integer', - 'name': 'checks_read_timeout', - 'default': 5, - 'mandatory': 1, - 'visible_for_client': True, - 'description': ( - 'Number of seconds that the security checks will ' - + 'wait for when doing a recv()' + "checks_read_timeout": { + "type": "integer", + "name": "checks_read_timeout", + "default": 5, + "mandatory": 1, + "visible_for_client": True, + "description": ( + "Number of seconds that the security checks will " + + "wait for when doing a recv()" ), }, - 'non_simult_ports': { - 'type': 'string', - 'name': 'non_simult_ports', - 'default': '139, 445, 3389, Services/irc', - 'mandatory': 1, - 'visible_for_client': True, - 'description': ( - 'Prevent to make two connections on the same given ' - + 'ports at the same time.' + "non_simult_ports": { + "type": "string", + "name": "non_simult_ports", + "default": "139, 445, 3389, Services/irc", + "mandatory": 1, + "visible_for_client": True, + "description": ( + "Prevent to make two connections on the same given " + + "ports at the same time." ), }, - 'open_sock_max_attempts': { - 'type': 'integer', - 'name': 'open_sock_max_attempts', - 'default': 5, - 'mandatory': 0, - 'visible_for_client': True, - 'description': ( - 'Number of unsuccessful retries to open the socket ' - + 'before to set the port as closed.' + "open_sock_max_attempts": { + "type": "integer", + "name": "open_sock_max_attempts", + "default": 5, + "mandatory": 0, + "visible_for_client": True, + "description": ( + "Number of unsuccessful retries to open the socket " + + "before to set the port as closed." ), }, - 'timeout_retry': { - 'type': 'integer', - 'name': 'timeout_retry', - 'default': 5, - 'mandatory': 0, - 'visible_for_client': True, - 'description': ( - 'Number of retries when a socket connection attempt ' + 'timesout.' + "timeout_retry": { + "type": "integer", + "name": "timeout_retry", + "default": 5, + "mandatory": 0, + "visible_for_client": True, + "description": ( + "Number of retries when a socket connection attempt " + "timesout." ), }, - 'optimize_test': { - 'type': 'boolean', - 'name': 'optimize_test', - 'default': 1, - 'mandatory': 0, - 'visible_for_client': True, - 'description': ( - 'By default, optimize_test is enabled which means openvas does ' - + 'trust the remote host banners and is only launching plugins ' - + 'against the services they have been designed to check. ' - + 'For example it will check a web server claiming to be IIS only ' - + 'for IIS related flaws but will skip plugins testing for Apache ' - + 'flaws, and so on. This default behavior is used to optimize ' - + 'the scanning performance and to avoid false positives. ' - + 'If you are not sure that the banners of the remote host ' - + 'have been tampered with, you can disable this option.' + "optimize_test": { + "type": "boolean", + "name": "optimize_test", + "default": 1, + "mandatory": 0, + "visible_for_client": True, + "description": ( + "By default, optimize_test is enabled which means openvas does " + + "trust the remote host banners and is only launching plugins " + + "against the services they have been designed to check. " + + "For example it will check a web server claiming to be IIS only " + + "for IIS related flaws but will skip plugins testing for Apache " + + "flaws, and so on. This default behavior is used to optimize " + + "the scanning performance and to avoid false positives. " + + "If you are not sure that the banners of the remote host " + + "have been tampered with, you can disable this option." ), }, - 'plugins_timeout': { - 'type': 'integer', - 'name': 'plugins_timeout', - 'default': 5, - 'mandatory': 0, - 'visible_for_client': True, - 'description': 'This is the maximum lifetime, in seconds of a plugin.', + "plugins_timeout": { + "type": "integer", + "name": "plugins_timeout", + "default": 5, + "mandatory": 0, + "visible_for_client": True, + "description": "This is the maximum lifetime, in seconds of a plugin.", }, - 'report_host_details': { - 'type': 'boolean', - 'name': 'report_host_details', - 'default': 1, - 'mandatory': 1, - 'visible_for_client': True, - 'description': '', + "report_host_details": { + "type": "boolean", + "name": "report_host_details", + "default": 1, + "mandatory": 1, + "visible_for_client": True, + "description": "", }, - 'safe_checks': { - 'type': 'boolean', - 'name': 'safe_checks', - 'default': 1, - 'mandatory': 1, - 'visible_for_client': True, - 'description': ( - 'Disable the plugins with potential to crash ' - + 'the remote services' + "safe_checks": { + "type": "boolean", + "name": "safe_checks", + "default": 1, + "mandatory": 1, + "visible_for_client": True, + "description": ( + "Disable the plugins with potential to crash " + + "the remote services" ), }, - 'scanner_plugins_timeout': { - 'type': 'integer', - 'name': 'scanner_plugins_timeout', - 'default': 36000, - 'mandatory': 1, - 'visible_for_client': True, - 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', + "scanner_plugins_timeout": { + "type": "integer", + "name": "scanner_plugins_timeout", + "default": 36000, + "mandatory": 1, + "visible_for_client": True, + "description": "Like plugins_timeout, but for ACT_SCANNER plugins.", }, - 'time_between_request': { - 'type': 'integer', - 'name': 'time_between_request', - 'default': 0, - 'mandatory': 0, - 'visible_for_client': True, - 'description': ( - 'Allow to set a wait time between two actions ' - + '(open, send, close).' + "time_between_request": { + "type": "integer", + "name": "time_between_request", + "default": 0, + "mandatory": 0, + "visible_for_client": True, + "description": ( + "Allow to set a wait time between two actions " + + "(open, send, close)." ), }, - 'unscanned_closed': { - 'type': 'boolean', - 'name': 'unscanned_closed', - 'default': 1, - 'mandatory': 1, - 'visible_for_client': True, - 'description': '', + "unscanned_closed": { + "type": "boolean", + "name": "unscanned_closed", + "default": 1, + "mandatory": 1, + "visible_for_client": True, + "description": "", }, - 'unscanned_closed_udp': { - 'type': 'boolean', - 'name': 'unscanned_closed_udp', - 'default': 1, - 'mandatory': 1, - 'visible_for_client': True, - 'description': '', + "unscanned_closed_udp": { + "type": "boolean", + "name": "unscanned_closed_udp", + "default": 1, + "mandatory": 1, + "visible_for_client": True, + "description": "", }, - 'expand_vhosts': { - 'type': 'boolean', - 'name': 'expand_vhosts', - 'default': 1, - 'mandatory': 0, - 'visible_for_client': True, - 'description': 'Whether to expand the target hosts ' - + 'list of vhosts with values gathered from sources ' - + 'such as reverse-lookup queries and VT checks ' - + 'for SSL/TLS certificates.', + "expand_vhosts": { + "type": "boolean", + "name": "expand_vhosts", + "default": 1, + "mandatory": 0, + "visible_for_client": True, + "description": "Whether to expand the target hosts " + + "list of vhosts with values gathered from sources " + + "such as reverse-lookup queries and VT checks " + + "for SSL/TLS certificates.", }, - 'test_empty_vhost': { - 'type': 'boolean', - 'name': 'test_empty_vhost', - 'default': 0, - 'mandatory': 0, - 'visible_for_client': True, - 'description': 'If set to yes, the scanner will ' - + 'also test the target by using empty vhost value ' - + 'in addition to the targets associated vhost values.', + "test_empty_vhost": { + "type": "boolean", + "name": "test_empty_vhost", + "default": 0, + "mandatory": 0, + "visible_for_client": True, + "description": "If set to yes, the scanner will " + + "also test the target by using empty vhost value " + + "in addition to the targets associated vhost values.", }, - 'max_hosts': { - 'type': 'integer', - 'name': 'max_hosts', - 'default': 30, - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'The maximum number of hosts to test at the same time which ' - + 'should be given to the client (which can override it). ' - + 'This value must be computed given your bandwidth, ' - + 'the number of hosts you want to test, your amount of ' - + 'memory and the performance of your processor(s).' + "max_hosts": { + "type": "integer", + "name": "max_hosts", + "default": 30, + "mandatory": 0, + "visible_for_client": False, + "description": ( + "The maximum number of hosts to test at the same time which " + + "should be given to the client (which can override it). " + + "This value must be computed given your bandwidth, " + + "the number of hosts you want to test, your amount of " + + "memory and the performance of your processor(s)." ), }, - 'max_checks': { - 'type': 'integer', - 'name': 'max_checks', - 'default': 10, - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'The number of plugins that will run against each host being ' - + 'tested. Note that the total number of process will be max ' - + 'checks x max_hosts so you need to find a balance between ' - + 'these two options. Note that launching too many plugins at ' - + 'the same time may disable the remote host, either temporarily ' - + '(ie: inetd closes its ports) or definitely (the remote host ' - + 'crash because it is asked to do too many things at the ' - + 'same time), so be careful.' + "max_checks": { + "type": "integer", + "name": "max_checks", + "default": 10, + "mandatory": 0, + "visible_for_client": False, + "description": ( + "The number of plugins that will run against each host being " + + "tested. Note that the total number of process will be max " + + "checks x max_hosts so you need to find a balance between " + + "these two options. Note that launching too many plugins at " + + "the same time may disable the remote host, either temporarily " + + "(ie: inetd closes its ports) or definitely (the remote host " + + "crash because it is asked to do too many things at the " + + "same time), so be careful." ), }, - 'port_range': { - 'type': 'string', - 'name': 'port_range', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'This is the default range of ports that the scanner plugins will ' - + 'probe. The syntax of this option is flexible, it can be a ' + "port_range": { + "type": "string", + "name": "port_range", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "This is the default range of ports that the scanner plugins will " + + "probe. The syntax of this option is flexible, it can be a " + 'single range ("1-1500"), several ports ("21,23,80"), several ' + 'ranges of ports ("1-1500,32000-33000"). Note that you can ' - + 'specify UDP and TCP ports by prefixing each range by T or U. ' - + 'For instance, the following range will make openvas scan UDP ' - + 'ports 1 to 1024 and TCP ports 1 to 65535 : ' + + "specify UDP and TCP ports by prefixing each range by T or U. " + + "For instance, the following range will make openvas scan UDP " + + "ports 1 to 1024 and TCP ports 1 to 65535 : " + '"T:1-65535,U:1-1024".' ), }, - 'test_alive_hosts_only': { - 'type': 'boolean', - 'name': 'test_alive_hosts_only', - 'default': 0, - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'If this option is set, openvas will scan the target list for ' - + 'alive hosts in a separate process while only testing those ' - + 'hosts which are identified as alive. This boosts the scan ' - + 'speed of target ranges with a high amount of dead hosts ' - + 'significantly.' + "test_alive_hosts_only": { + "type": "boolean", + "name": "test_alive_hosts_only", + "default": 0, + "mandatory": 0, + "visible_for_client": False, + "description": ( + "If this option is set, openvas will scan the target list for " + + "alive hosts in a separate process while only testing those " + + "hosts which are identified as alive. This boosts the scan " + + "speed of target ranges with a high amount of dead hosts " + + "significantly." ), }, - 'source_iface': { - 'type': 'string', - 'name': 'source_iface', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'Name of the network interface that will be used as the source ' - + 'of connections established by openvas. The scan won\'t be ' - + 'launched if the value isn\'t authorized according to ' - + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.' + "source_iface": { + "type": "string", + "name": "source_iface", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "Name of the network interface that will be used as the source " + + "of connections established by openvas. The scan won't be " + + "launched if the value isn't authorized according to " + + "(sys_)ifaces_allow / (sys_)ifaces_deny if present." ), }, - 'ifaces_allow': { - 'type': 'string', - 'name': 'ifaces_allow', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'Comma-separated list of interfaces names that are authorized ' - + 'as source_iface values.' + "ifaces_allow": { + "type": "string", + "name": "ifaces_allow", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "Comma-separated list of interfaces names that are authorized " + + "as source_iface values." ), }, - 'ifaces_deny': { - 'type': 'string', - 'name': 'ifaces_deny', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'Comma-separated list of interfaces names that are not ' - + 'authorized as source_iface values.' + "ifaces_deny": { + "type": "string", + "name": "ifaces_deny", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "Comma-separated list of interfaces names that are not " + + "authorized as source_iface values." ), }, - 'hosts_allow': { - 'type': 'string', - 'name': 'hosts_allow', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'Comma-separated list of the only targets that are authorized ' - + 'to be scanned. Supports the same syntax as the list targets. ' - + 'Both target hostnames and the address to which they resolve ' - + 'are checked. Hostnames in hosts_allow list are not resolved ' - + 'however.' + "hosts_allow": { + "type": "string", + "name": "hosts_allow", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "Comma-separated list of the only targets that are authorized " + + "to be scanned. Supports the same syntax as the list targets. " + + "Both target hostnames and the address to which they resolve " + + "are checked. Hostnames in hosts_allow list are not resolved " + + "however." ), }, - 'hosts_deny': { - 'type': 'string', - 'name': 'hosts_deny', - 'default': '', - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( - 'Comma-separated list of targets that are not authorized to ' - + 'be scanned. Supports the same syntax as the list targets. ' - + 'Both target hostnames and the address to which they resolve ' - + 'are checked. Hostnames in hosts_deny list are not ' - + 'resolved however.' + "hosts_deny": { + "type": "string", + "name": "hosts_deny", + "default": "", + "mandatory": 0, + "visible_for_client": False, + "description": ( + "Comma-separated list of targets that are not authorized to " + + "be scanned. Supports the same syntax as the list targets. " + + "Both target hostnames and the address to which they resolve " + + "are checked. Hostnames in hosts_deny list are not " + + "resolved however." ), }, - 'table_driven_lsc': { - 'type': 'boolean', - 'name': 'table_driven_lsc', - 'default': 0, - 'mandatory': 0, - 'visible_for_client': False, - 'description': ( + "table_driven_lsc": { + "type": "boolean", + "name": "table_driven_lsc", + "default": 0, + "mandatory": 0, + "visible_for_client": False, + "description": ( 'If this options is set to "yes", openvas enables the local ' - + 'security checks via the table-driven Notus scanner, perfoming ' - + 'the Notus metadata checksum check which allows the metadata ' - + 'upload into redis.' + + "security checks via the table-driven Notus scanner, perfoming " + + "the Notus metadata checksum check which allows the metadata " + + "upload into redis." ), }, } class TestOspdOpenvas(TestCase): - @patch('ospd_openvas.daemon.Openvas') + @patch("ospd_openvas.daemon.Openvas") def test_set_params_from_openvas_settings(self, mock_openvas: Openvas): mock_openvas.get_settings.return_value = { - 'non_simult_ports': '139, 445, 3389, Services/irc', - 'plugins_folder': '/foo/bar', + "non_simult_ports": "139, 445, 3389, Services/irc", + "plugins_folder": "/foo/bar", } w = DummyDaemon() w.set_params_from_openvas_settings() self.assertEqual(mock_openvas.get_settings.call_count, 1) self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) - self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') + self.assertEqual(w.scan_only_params.get("plugins_folder"), "/foo/bar") - @patch('ospd_openvas.daemon.Openvas') + @patch("ospd_openvas.daemon.Openvas") def test_sudo_available(self, mock_openvas): mock_openvas.check_sudo.return_value = True w = DummyDaemon() w._sudo_available = None # pylint: disable=protected-access + w._is_running_as_root = False # pylint: disable=protected-access w.sudo_available # pylint: disable=pointless-statement self.assertTrue(w.sudo_available) def test_get_custom_xml(self): out = ( - '' - 'Services/www, 80' - '3' - 'Settings/disable_cgi_scanning' - 'Product detection' - 'mantis_detect.nasl' - '0' - '' + "" + "Services/www, 80" + "3" + "Settings/disable_cgi_scanning" + "Product detection" + "mantis_detect.nasl" + "0" + "" ) w = DummyDaemon() - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] res = w.get_custom_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') + "1.3.6.1.4.1.25623.1.0.100061", vt.get("custom") ) self.assertEqual(len(res), len(out)) @@ -386,9 +387,9 @@ def test_get_custom_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - custom = {'a': u"\u0006"} + custom = {"a": u"\u0006"} w.get_custom_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', custom=custom + "1.3.6.1.4.1.25623.1.0.100061", custom=custom ) assert_called_once(logging.Logger.warning) @@ -397,18 +398,18 @@ def test_get_severities_xml(self): w = DummyDaemon() out = ( - '' + "" '' - 'AV:N/AC:L/Au:N/C:N/I:N/A:N' - 'Greenbone' - '1237458156' - '' - '' - ) - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - severities = vt.get('severities') + "AV:N/AC:L/Au:N/C:N/I:N/A:N" + "Greenbone" + "1237458156" + "" + "" + ) + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + severities = vt.get("severities") res = w.get_severities_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', severities + "1.3.6.1.4.1.25623.1.0.100061", severities ) self.assertEqual(res, out) @@ -417,9 +418,9 @@ def test_get_severities_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - sever = {'severity_base_vector': u"\u0006"} + sever = {"severity_base_vector": u"\u0006"} w.get_severities_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', severities=sever + "1.3.6.1.4.1.25623.1.0.100061", severities=sever ) assert_called_once(logging.Logger.warning) @@ -427,21 +428,21 @@ def test_get_severities_xml_failed(self): def test_get_params_xml(self): w = DummyDaemon() out = ( - '' + "" '' - 'Do not randomize the order in which ports are ' - 'scanned' - 'no' - '' + "Do not randomize the order in which ports are " + "scanned" + "no" + "" '' - 'Data length :' - '' - '' + "Data length :" + "" + "" ) - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - params = vt.get('vt_params') - res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + params = vt.get("vt_params") + res = w.get_params_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", params) self.assertEqual(len(res), len(out)) @@ -450,15 +451,15 @@ def test_get_params_xml_failed(self): logging.Logger.warning = Mock() params = { - '1': { - 'id': '1', - 'type': 'entry', - 'default': u'\u0006', - 'name': 'dns-fuzz.timelimit', - 'description': 'Description', + "1": { + "id": "1", + "type": "entry", + "default": u"\u0006", + "name": "dns-fuzz.timelimit", + "description": "Description", } } - w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) + w.get_params_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", params) assert_called_once(logging.Logger.warning) @@ -466,9 +467,9 @@ def test_get_refs_xml(self): w = DummyDaemon() out = '' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - refs = vt.get('vt_refs') - res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + refs = vt.get("vt_refs") + res = w.get_refs_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", refs) self.assertEqual(res, out) @@ -476,14 +477,14 @@ def test_get_dependencies_xml(self): w = DummyDaemon() out = ( - '' + "" '' '' - '' + "" ) - dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1'] + dep = ["1.3.6.1.4.1.25623.1.2.3.4", "1.3.6.1.4.1.25623.4.3.2.1"] res = w.get_dependencies_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', dep + "1.3.6.1.4.1.25623.1.0.100061", dep ) self.assertEqual(res, out) @@ -492,13 +493,13 @@ def test_get_dependencies_xml_missing_dep(self): w = DummyDaemon() out = ( - '' + "" '' - '' + "" ) - dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl'] + dep = ["1.3.6.1.4.1.25623.1.2.3.4", "file_name.nasl"] res = w.get_dependencies_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', dep + "1.3.6.1.4.1.25623.1.0.100061", dep ) self.assertEqual(res, out) @@ -509,7 +510,7 @@ def test_get_dependencies_xml_failed(self): dep = [u"\u0006"] w.get_dependencies_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep + "1.3.6.1.4.1.25623.1.0.100061", vt_dependencies=dep ) assert_called_once(logging.Logger.error) @@ -517,11 +518,11 @@ def test_get_dependencies_xml_failed(self): def test_get_ctime_xml(self): w = DummyDaemon() - out = '1237458156' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - ctime = vt.get('creation_time') + out = "1237458156" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + ctime = vt.get("creation_time") res = w.get_creation_time_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', ctime + "1.3.6.1.4.1.25623.1.0.100061", ctime ) self.assertEqual(res, out) @@ -530,9 +531,9 @@ def test_get_ctime_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - ctime = u'\u0006' + ctime = u"\u0006" w.get_creation_time_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime + "1.3.6.1.4.1.25623.1.0.100061", vt_creation_time=ctime ) assert_called_once(logging.Logger.warning) @@ -540,11 +541,11 @@ def test_get_ctime_xml_failed(self): def test_get_mtime_xml(self): w = DummyDaemon() - out = '1533906565' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - mtime = vt.get('modification_time') + out = "1533906565" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + mtime = vt.get("modification_time") res = w.get_modification_time_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', mtime + "1.3.6.1.4.1.25623.1.0.100061", mtime ) self.assertEqual(res, out) @@ -553,9 +554,9 @@ def test_get_mtime_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - mtime = u'\u0006' + mtime = u"\u0006" w.get_modification_time_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', mtime + "1.3.6.1.4.1.25623.1.0.100061", mtime ) assert_called_once(logging.Logger.warning) @@ -563,11 +564,11 @@ def test_get_mtime_xml_failed(self): def test_get_summary_xml(self): w = DummyDaemon() - out = 'some summary' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - summary = vt.get('summary') + out = "some summary" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + summary = vt.get("summary") res = w.get_summary_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', summary + "1.3.6.1.4.1.25623.1.0.100061", summary ) self.assertEqual(res, out) @@ -575,19 +576,19 @@ def test_get_summary_xml(self): def test_get_summary_xml_failed(self): w = DummyDaemon() - summary = u'\u0006' + summary = u"\u0006" logging.Logger.warning = Mock() - w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) + w.get_summary_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", summary) assert_called_once(logging.Logger.warning) def test_get_impact_xml(self): w = DummyDaemon() - out = 'some impact' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - impact = vt.get('impact') - res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) + out = "some impact" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + impact = vt.get("impact") + res = w.get_impact_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", impact) self.assertEqual(res, out) @@ -595,19 +596,19 @@ def test_get_impact_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - impact = u'\u0006' - w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) + impact = u"\u0006" + w.get_impact_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", impact) assert_called_once(logging.Logger.warning) def test_get_insight_xml(self): w = DummyDaemon() - out = 'some insight' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - insight = vt.get('insight') + out = "some insight" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + insight = vt.get("insight") res = w.get_insight_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', insight + "1.3.6.1.4.1.25623.1.0.100061", insight ) self.assertEqual(res, out) @@ -616,8 +617,8 @@ def test_get_insight_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - insight = u'\u0006' - w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) + insight = u"\u0006" + w.get_insight_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", insight) assert_called_once(logging.Logger.warning) @@ -626,16 +627,16 @@ def test_get_solution_xml(self): out = ( '' - 'some solution' - '' + "some solution" + "" ) - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - solution = vt.get('solution') - solution_type = vt.get('solution_type') - solution_method = vt.get('solution_method') + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + solution = vt.get("solution") + solution_type = vt.get("solution_type") + solution_method = vt.get("solution_method") res = w.get_solution_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', + "1.3.6.1.4.1.25623.1.0.100061", solution, solution_type, solution_method, @@ -647,8 +648,8 @@ def test_get_solution_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - solution = u'\u0006' - w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) + solution = u"\u0006" + w.get_solution_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", solution) assert_called_once(logging.Logger.warning) @@ -656,11 +657,11 @@ def test_get_detection_xml(self): w = DummyDaemon() out = '' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - detection_type = vt.get('qod_type') + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + detection_type = vt.get("qod_type") res = w.get_detection_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type + "1.3.6.1.4.1.25623.1.0.100061", qod_type=detection_type ) self.assertEqual(res, out) @@ -669,19 +670,19 @@ def test_get_detection_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - detection = u'\u0006' - w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) + detection = u"\u0006" + w.get_detection_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", detection) assert_called_once(logging.Logger.warning) def test_get_affected_xml(self): w = DummyDaemon() - out = 'some affection' - vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] - affected = vt.get('affected') + out = "some affection" + vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + affected = vt.get("affected") res = w.get_affected_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', affected=affected + "1.3.6.1.4.1.25623.1.0.100061", affected=affected ) self.assertEqual(res, out) @@ -692,31 +693,31 @@ def test_get_affected_xml_failed(self): affected = u"\u0006" + "affected" w.get_affected_vt_as_xml_str( - '1.3.6.1.4.1.25623.1.0.100061', affected=affected + "1.3.6.1.4.1.25623.1.0.100061", affected=affected ) assert_called_once(logging.Logger.warning) - @patch('ospd_openvas.daemon.Path.exists') - @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') + @patch("ospd_openvas.daemon.Path.exists") + @patch("ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings") def test_feed_is_outdated_none( self, mock_set_params: MagicMock, mock_path_exists: MagicMock ): w = DummyDaemon() - w.scan_only_params['plugins_folder'] = '/foo/bar' + w.scan_only_params["plugins_folder"] = "/foo/bar" # Return None mock_path_exists.return_value = False - ret = w.feed_is_outdated('1234') + ret = w.feed_is_outdated("1234") self.assertIsNone(ret) self.assertEqual(mock_set_params.call_count, 1) self.assertEqual(mock_path_exists.call_count, 1) - @patch('ospd_openvas.daemon.Path.exists') - @patch('ospd_openvas.daemon.Path.open') + @patch("ospd_openvas.daemon.Path.exists") + @patch("ospd_openvas.daemon.Path.open") def test_feed_is_outdated_true( self, mock_path_open: MagicMock, @@ -725,23 +726,23 @@ def test_feed_is_outdated_true( read_data = 'PLUGIN_SET = "1235";' mock_path_exists.return_value = True - mock_read = MagicMock(name='Path open context manager') + mock_read = MagicMock(name="Path open context manager") mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) mock_path_open.return_value = mock_read w = DummyDaemon() # Return True - w.scan_only_params['plugins_folder'] = '/foo/bar' + w.scan_only_params["plugins_folder"] = "/foo/bar" - ret = w.feed_is_outdated('1234') + ret = w.feed_is_outdated("1234") self.assertTrue(ret) self.assertEqual(mock_path_exists.call_count, 1) self.assertEqual(mock_path_open.call_count, 1) - @patch('ospd_openvas.daemon.Path.exists') - @patch('ospd_openvas.daemon.Path.open') + @patch("ospd_openvas.daemon.Path.exists") + @patch("ospd_openvas.daemon.Path.open") def test_feed_is_outdated_false( self, mock_path_open: MagicMock, @@ -751,14 +752,14 @@ def test_feed_is_outdated_false( read_data = 'PLUGIN_SET = "1234"' mock_path_exists.return_value = True - mock_read = MagicMock(name='Path open context manager') + mock_read = MagicMock(name="Path open context manager") mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) mock_path_open.return_value = mock_read w = DummyDaemon() - w.scan_only_params['plugins_folder'] = '/foo/bar' + w.scan_only_params["plugins_folder"] = "/foo/bar" - ret = w.feed_is_outdated('1234') + ret = w.feed_is_outdated("1234") self.assertFalse(ret) self.assertEqual(mock_path_exists.call_count, 1) @@ -773,14 +774,14 @@ def test_check_feed_cache_unavailable(self): self.assertFalse(res) w.feed_is_outdated.assert_not_called() - @patch('ospd_openvas.daemon.BaseDB') - @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') + @patch("ospd_openvas.daemon.BaseDB") + @patch("ospd_openvas.daemon.ResultList.add_scan_log_to_list") def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) + w.create_scan("123-456", targets, None, []) results = [ "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host dead", @@ -788,20 +789,20 @@ def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): MockDBClass.get_result.return_value = results mock_add_scan_log_to_list.return_value = None - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") mock_add_scan_log_to_list.assert_called_with( - host='192.168.0.1', - hostname='localhost', - name='', - port='general/Host_Details', - qod='', - test_id='', - uri='', - value='Host dead', - ) - - @patch('ospd_openvas.daemon.BaseDB') - @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list') + host="192.168.0.1", + hostname="localhost", + name="", + port="general/Host_Details", + qod="", + test_id="", + uri="", + value="Host dead", + ) + + @patch("ospd_openvas.daemon.BaseDB") + @patch("ospd_openvas.daemon.ResultList.add_scan_error_to_list") def test_get_openvas_result_host_deny( self, mock_add_scan_error_to_list, MockDBClass ): @@ -809,7 +810,7 @@ def test_get_openvas_result_host_deny( target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) + w.create_scan("123-456", targets, None, []) results = [ "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.", @@ -817,23 +818,23 @@ def test_get_openvas_result_host_deny( MockDBClass.get_result.return_value = results mock_add_scan_error_to_list.return_value = None - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") mock_add_scan_error_to_list.assert_called_with( - host='127.0.0.1', - hostname='localhost', - name='', - port='', - test_id='', - uri='', - value='Host access denied.', + host="127.0.0.1", + hostname="localhost", + name="", + port="", + test_id="", + uri="", + value="Host access denied.", ) - @patch('ospd_openvas.daemon.BaseDB') + @patch("ospd_openvas.daemon.BaseDB") def test_get_openvas_result_dead_hosts(self, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) + w.create_scan("123-456", targets, None, []) results = [ "DEADHOST||| ||| ||| ||| |||4", @@ -841,21 +842,21 @@ def test_get_openvas_result_dead_hosts(self, MockDBClass): MockDBClass.get_result.return_value = results w.scan_collection.set_amount_dead_hosts = MagicMock() - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") w.scan_collection.set_amount_dead_hosts.assert_called_with( - '123-456', + "123-456", total_dead=4, ) - @patch('ospd_openvas.daemon.BaseDB') - @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') + @patch("ospd_openvas.daemon.BaseDB") + @patch("ospd_openvas.daemon.ResultList.add_scan_log_to_list") def test_get_openvas_result_host_start( self, mock_add_scan_log_to_list, MockDBClass ): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) + w.create_scan("123-456", targets, None, []) results = [ "HOST_START|||192.168.10.124||| ||| ||||||today 1", @@ -864,20 +865,20 @@ def test_get_openvas_result_host_start( MockDBClass.get_result.return_value = results mock_add_scan_log_to_list.return_value = None - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") mock_add_scan_log_to_list.assert_called_with( - host='192.168.10.124', - name='HOST_START', - value='today 1', + host="192.168.10.124", + name="HOST_START", + value="today 1", ) - @patch('ospd_openvas.daemon.BaseDB') + @patch("ospd_openvas.daemon.BaseDB") def test_get_openvas_result_hosts_count(self, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) + w.create_scan("123-456", targets, None, []) results = [ "HOSTS_COUNT||| ||| ||| ||| |||4", @@ -885,14 +886,14 @@ def test_get_openvas_result_hosts_count(self, MockDBClass): MockDBClass.get_result.return_value = results w.set_scan_total_hosts = MagicMock() - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") w.set_scan_total_hosts.assert_called_with( - '123-456', + "123-456", 4, ) - @patch('ospd_openvas.daemon.BaseDB') - @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list') + @patch("ospd_openvas.daemon.BaseDB") + @patch("ospd_openvas.daemon.ResultList.add_scan_alarm_to_list") def test_result_without_vt_oid( self, mock_add_scan_alarm_to_list, MockDBClass ): @@ -901,38 +902,38 @@ def test_result_without_vt_oid( target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) - w.scan_collection.scans_table['123-456']['results'] = list() + w.create_scan("123-456", targets, None, []) + w.scan_collection.scans_table["123-456"]["results"] = list() results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None] MockDBClass.get_result.return_value = results mock_add_scan_alarm_to_list.return_value = None - w.report_openvas_results(MockDBClass, '123-456') + w.report_openvas_results(MockDBClass, "123-456") assert_called_once(logging.Logger.warning) - @patch('ospd_openvas.db.KbDB') + @patch("ospd_openvas.db.KbDB") def test_openvas_is_alive_already_stopped(self, mock_db): w = DummyDaemon() # mock_psutil = MockPsutil.return_value mock_db.scan_is_stopped.return_value = True - ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') + ret = w.is_openvas_process_alive(mock_db, "1234", "a1-b2-c3-d4") self.assertTrue(ret) - @patch('psutil.Process') - @patch('ospd_openvas.db.KbDB') + @patch("psutil.Process") + @patch("ospd_openvas.db.KbDB") def test_openvas_is_alive_still(self, mock_db, mock_psutil): w = DummyDaemon() mock_psutil.side_effect = TypeError mock_db.scan_is_stopped.return_value = False - ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d3') + ret = w.is_openvas_process_alive(mock_db, "1234", "a1-b2-c3-d3") self.assertFalse(ret) - @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch') - @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished') - @patch('ospd_openvas.db.KbDB') + @patch("ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch") + @patch("ospd_openvas.daemon.OSPDaemon.sort_host_finished") + @patch("ospd_openvas.db.KbDB") def test_report_openvas_scan_status( self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch ): @@ -941,65 +942,65 @@ def test_report_openvas_scan_status( mock_set_scan_progress_batch.return_value = None mock_sort_host_finished.return_value = None mock_db.get_scan_status.return_value = [ - '192.168.0.1/15/1000', - '192.168.0.2/15/0', - '192.168.0.3/15/-1', - '192.168.0.4/1500/1500', + "192.168.0.1/15/1000", + "192.168.0.2/15/0", + "192.168.0.3/15/-1", + "192.168.0.4/1500/1500", ] target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan('123-456', targets, None, []) - w.report_openvas_scan_status(mock_db, '123-456') + w.create_scan("123-456", targets, None, []) + w.report_openvas_scan_status(mock_db, "123-456") mock_set_scan_progress_batch.assert_called_with( - '123-456', + "123-456", host_progress={ - '192.168.0.1': 1, - '192.168.0.3': -1, - '192.168.0.4': 100, + "192.168.0.1": 1, + "192.168.0.3": -1, + "192.168.0.4": 100, }, ) mock_sort_host_finished.assert_called_with( - '123-456', ['192.168.0.3', '192.168.0.4'] + "123-456", ["192.168.0.3", "192.168.0.4"] ) class TestFilters(TestCase): def test_format_vt_modification_time(self): ovformat = OpenVasVtsFilter(None) - td = '1517443741' + td = "1517443741" formatted = ovformat.format_vt_modification_time(td) self.assertEqual(formatted, "20180201000901") def test_get_filtered_vts_false(self): w = DummyDaemon() - vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] + vts_collection = ["1234", "1.3.6.1.4.1.25623.1.0.100061"] ovfilter = OpenVasVtsFilter(w.nvti) res = ovfilter.get_filtered_vts_list( vts_collection, "modification_time<10" ) - self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res) + self.assertNotIn("1.3.6.1.4.1.25623.1.0.100061", res) def test_get_filtered_vts_true(self): w = DummyDaemon() - vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] + vts_collection = ["1234", "1.3.6.1.4.1.25623.1.0.100061"] ovfilter = OpenVasVtsFilter(w.nvti) res = ovfilter.get_filtered_vts_list( vts_collection, "modification_time>10" ) - self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res) + self.assertIn("1.3.6.1.4.1.25623.1.0.100061", res) def test_get_severity_score_v2(self): w = DummyDaemon() vtaux = { - 'severities': { - 'severity_type': 'cvss_base_v2', - 'severity_base_vector': 'AV:N/AC:L/Au:N/C:P/I:N/A:N', + "severities": { + "severity_type": "cvss_base_v2", + "severity_base_vector": "AV:N/AC:L/Au:N/C:P/I:N/A:N", } } @@ -1009,9 +1010,9 @@ def test_get_severity_score_v2(self): def test_get_severity_score_v3(self): w = DummyDaemon() vtaux = { - 'severities': { - 'severity_type': 'cvss_base_v3', - 'severity_base_vector': 'CVSS:3.0/AV:L/AC:H/PR:H/UI:R/S:U/C:N/I:L/A:L', + "severities": { + "severity_type": "cvss_base_v3", + "severity_base_vector": "CVSS:3.0/AV:L/AC:H/PR:H/UI:R/S:U/C:N/I:L/A:L", } } From 2bd4862d6ce25f9b48624534b99abb47ffd8b11c Mon Sep 17 00:00:00 2001 From: Philipp Eder Date: Mon, 25 Jan 2021 08:56:40 +0100 Subject: [PATCH 3/3] reset switching from ' to " due to huge diff --- tests/test_daemon.py | 897 +++++++++++++++++++++---------------------- 1 file changed, 448 insertions(+), 449 deletions(-) diff --git a/tests/test_daemon.py b/tests/test_daemon.py index 93ef41a4..8af03c6e 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -38,348 +38,347 @@ from ospd_openvas.openvas import Openvas OSPD_PARAMS_OUT = { - "auto_enable_dependencies": { - "type": "boolean", - "name": "auto_enable_dependencies", - "default": 1, - "mandatory": 1, - "visible_for_client": True, - "description": "Automatically enable the plugins that are depended on", + 'auto_enable_dependencies': { + 'type': 'boolean', + 'name': 'auto_enable_dependencies', + 'default': 1, + 'mandatory': 1, + 'visible_for_client': True, + 'description': 'Automatically enable the plugins that are depended on', }, - "cgi_path": { - "type": "string", - "name": "cgi_path", - "default": "/cgi-bin:/scripts", - "mandatory": 1, - "visible_for_client": True, - "description": "Look for default CGIs in /cgi-bin and /scripts", + 'cgi_path': { + 'type': 'string', + 'name': 'cgi_path', + 'default': '/cgi-bin:/scripts', + 'mandatory': 1, + 'visible_for_client': True, + 'description': 'Look for default CGIs in /cgi-bin and /scripts', }, - "checks_read_timeout": { - "type": "integer", - "name": "checks_read_timeout", - "default": 5, - "mandatory": 1, - "visible_for_client": True, - "description": ( - "Number of seconds that the security checks will " - + "wait for when doing a recv()" + 'checks_read_timeout': { + 'type': 'integer', + 'name': 'checks_read_timeout', + 'default': 5, + 'mandatory': 1, + 'visible_for_client': True, + 'description': ( + 'Number of seconds that the security checks will ' + + 'wait for when doing a recv()' ), }, - "non_simult_ports": { - "type": "string", - "name": "non_simult_ports", - "default": "139, 445, 3389, Services/irc", - "mandatory": 1, - "visible_for_client": True, - "description": ( - "Prevent to make two connections on the same given " - + "ports at the same time." + 'non_simult_ports': { + 'type': 'string', + 'name': 'non_simult_ports', + 'default': '139, 445, 3389, Services/irc', + 'mandatory': 1, + 'visible_for_client': True, + 'description': ( + 'Prevent to make two connections on the same given ' + + 'ports at the same time.' ), }, - "open_sock_max_attempts": { - "type": "integer", - "name": "open_sock_max_attempts", - "default": 5, - "mandatory": 0, - "visible_for_client": True, - "description": ( - "Number of unsuccessful retries to open the socket " - + "before to set the port as closed." + 'open_sock_max_attempts': { + 'type': 'integer', + 'name': 'open_sock_max_attempts', + 'default': 5, + 'mandatory': 0, + 'visible_for_client': True, + 'description': ( + 'Number of unsuccessful retries to open the socket ' + + 'before to set the port as closed.' ), }, - "timeout_retry": { - "type": "integer", - "name": "timeout_retry", - "default": 5, - "mandatory": 0, - "visible_for_client": True, - "description": ( - "Number of retries when a socket connection attempt " + "timesout." + 'timeout_retry': { + 'type': 'integer', + 'name': 'timeout_retry', + 'default': 5, + 'mandatory': 0, + 'visible_for_client': True, + 'description': ( + 'Number of retries when a socket connection attempt ' + 'timesout.' ), }, - "optimize_test": { - "type": "boolean", - "name": "optimize_test", - "default": 1, - "mandatory": 0, - "visible_for_client": True, - "description": ( - "By default, optimize_test is enabled which means openvas does " - + "trust the remote host banners and is only launching plugins " - + "against the services they have been designed to check. " - + "For example it will check a web server claiming to be IIS only " - + "for IIS related flaws but will skip plugins testing for Apache " - + "flaws, and so on. This default behavior is used to optimize " - + "the scanning performance and to avoid false positives. " - + "If you are not sure that the banners of the remote host " - + "have been tampered with, you can disable this option." + 'optimize_test': { + 'type': 'boolean', + 'name': 'optimize_test', + 'default': 1, + 'mandatory': 0, + 'visible_for_client': True, + 'description': ( + 'By default, optimize_test is enabled which means openvas does ' + + 'trust the remote host banners and is only launching plugins ' + + 'against the services they have been designed to check. ' + + 'For example it will check a web server claiming to be IIS only ' + + 'for IIS related flaws but will skip plugins testing for Apache ' + + 'flaws, and so on. This default behavior is used to optimize ' + + 'the scanning performance and to avoid false positives. ' + + 'If you are not sure that the banners of the remote host ' + + 'have been tampered with, you can disable this option.' ), }, - "plugins_timeout": { - "type": "integer", - "name": "plugins_timeout", - "default": 5, - "mandatory": 0, - "visible_for_client": True, - "description": "This is the maximum lifetime, in seconds of a plugin.", + 'plugins_timeout': { + 'type': 'integer', + 'name': 'plugins_timeout', + 'default': 5, + 'mandatory': 0, + 'visible_for_client': True, + 'description': 'This is the maximum lifetime, in seconds of a plugin.', }, - "report_host_details": { - "type": "boolean", - "name": "report_host_details", - "default": 1, - "mandatory": 1, - "visible_for_client": True, - "description": "", + 'report_host_details': { + 'type': 'boolean', + 'name': 'report_host_details', + 'default': 1, + 'mandatory': 1, + 'visible_for_client': True, + 'description': '', }, - "safe_checks": { - "type": "boolean", - "name": "safe_checks", - "default": 1, - "mandatory": 1, - "visible_for_client": True, - "description": ( - "Disable the plugins with potential to crash " - + "the remote services" + 'safe_checks': { + 'type': 'boolean', + 'name': 'safe_checks', + 'default': 1, + 'mandatory': 1, + 'visible_for_client': True, + 'description': ( + 'Disable the plugins with potential to crash ' + + 'the remote services' ), }, - "scanner_plugins_timeout": { - "type": "integer", - "name": "scanner_plugins_timeout", - "default": 36000, - "mandatory": 1, - "visible_for_client": True, - "description": "Like plugins_timeout, but for ACT_SCANNER plugins.", + 'scanner_plugins_timeout': { + 'type': 'integer', + 'name': 'scanner_plugins_timeout', + 'default': 36000, + 'mandatory': 1, + 'visible_for_client': True, + 'description': 'Like plugins_timeout, but for ACT_SCANNER plugins.', }, - "time_between_request": { - "type": "integer", - "name": "time_between_request", - "default": 0, - "mandatory": 0, - "visible_for_client": True, - "description": ( - "Allow to set a wait time between two actions " - + "(open, send, close)." + 'time_between_request': { + 'type': 'integer', + 'name': 'time_between_request', + 'default': 0, + 'mandatory': 0, + 'visible_for_client': True, + 'description': ( + 'Allow to set a wait time between two actions ' + + '(open, send, close).' ), }, - "unscanned_closed": { - "type": "boolean", - "name": "unscanned_closed", - "default": 1, - "mandatory": 1, - "visible_for_client": True, - "description": "", + 'unscanned_closed': { + 'type': 'boolean', + 'name': 'unscanned_closed', + 'default': 1, + 'mandatory': 1, + 'visible_for_client': True, + 'description': '', }, - "unscanned_closed_udp": { - "type": "boolean", - "name": "unscanned_closed_udp", - "default": 1, - "mandatory": 1, - "visible_for_client": True, - "description": "", + 'unscanned_closed_udp': { + 'type': 'boolean', + 'name': 'unscanned_closed_udp', + 'default': 1, + 'mandatory': 1, + 'visible_for_client': True, + 'description': '', }, - "expand_vhosts": { - "type": "boolean", - "name": "expand_vhosts", - "default": 1, - "mandatory": 0, - "visible_for_client": True, - "description": "Whether to expand the target hosts " - + "list of vhosts with values gathered from sources " - + "such as reverse-lookup queries and VT checks " - + "for SSL/TLS certificates.", + 'expand_vhosts': { + 'type': 'boolean', + 'name': 'expand_vhosts', + 'default': 1, + 'mandatory': 0, + 'visible_for_client': True, + 'description': 'Whether to expand the target hosts ' + + 'list of vhosts with values gathered from sources ' + + 'such as reverse-lookup queries and VT checks ' + + 'for SSL/TLS certificates.', }, - "test_empty_vhost": { - "type": "boolean", - "name": "test_empty_vhost", - "default": 0, - "mandatory": 0, - "visible_for_client": True, - "description": "If set to yes, the scanner will " - + "also test the target by using empty vhost value " - + "in addition to the targets associated vhost values.", + 'test_empty_vhost': { + 'type': 'boolean', + 'name': 'test_empty_vhost', + 'default': 0, + 'mandatory': 0, + 'visible_for_client': True, + 'description': 'If set to yes, the scanner will ' + + 'also test the target by using empty vhost value ' + + 'in addition to the targets associated vhost values.', }, - "max_hosts": { - "type": "integer", - "name": "max_hosts", - "default": 30, - "mandatory": 0, - "visible_for_client": False, - "description": ( - "The maximum number of hosts to test at the same time which " - + "should be given to the client (which can override it). " - + "This value must be computed given your bandwidth, " - + "the number of hosts you want to test, your amount of " - + "memory and the performance of your processor(s)." + 'max_hosts': { + 'type': 'integer', + 'name': 'max_hosts', + 'default': 30, + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'The maximum number of hosts to test at the same time which ' + + 'should be given to the client (which can override it). ' + + 'This value must be computed given your bandwidth, ' + + 'the number of hosts you want to test, your amount of ' + + 'memory and the performance of your processor(s).' ), }, - "max_checks": { - "type": "integer", - "name": "max_checks", - "default": 10, - "mandatory": 0, - "visible_for_client": False, - "description": ( - "The number of plugins that will run against each host being " - + "tested. Note that the total number of process will be max " - + "checks x max_hosts so you need to find a balance between " - + "these two options. Note that launching too many plugins at " - + "the same time may disable the remote host, either temporarily " - + "(ie: inetd closes its ports) or definitely (the remote host " - + "crash because it is asked to do too many things at the " - + "same time), so be careful." + 'max_checks': { + 'type': 'integer', + 'name': 'max_checks', + 'default': 10, + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'The number of plugins that will run against each host being ' + + 'tested. Note that the total number of process will be max ' + + 'checks x max_hosts so you need to find a balance between ' + + 'these two options. Note that launching too many plugins at ' + + 'the same time may disable the remote host, either temporarily ' + + '(ie: inetd closes its ports) or definitely (the remote host ' + + 'crash because it is asked to do too many things at the ' + + 'same time), so be careful.' ), }, - "port_range": { - "type": "string", - "name": "port_range", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "This is the default range of ports that the scanner plugins will " - + "probe. The syntax of this option is flexible, it can be a " + 'port_range': { + 'type': 'string', + 'name': 'port_range', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'This is the default range of ports that the scanner plugins will ' + + 'probe. The syntax of this option is flexible, it can be a ' + 'single range ("1-1500"), several ports ("21,23,80"), several ' + 'ranges of ports ("1-1500,32000-33000"). Note that you can ' - + "specify UDP and TCP ports by prefixing each range by T or U. " - + "For instance, the following range will make openvas scan UDP " - + "ports 1 to 1024 and TCP ports 1 to 65535 : " + + 'specify UDP and TCP ports by prefixing each range by T or U. ' + + 'For instance, the following range will make openvas scan UDP ' + + 'ports 1 to 1024 and TCP ports 1 to 65535 : ' + '"T:1-65535,U:1-1024".' ), }, - "test_alive_hosts_only": { - "type": "boolean", - "name": "test_alive_hosts_only", - "default": 0, - "mandatory": 0, - "visible_for_client": False, - "description": ( - "If this option is set, openvas will scan the target list for " - + "alive hosts in a separate process while only testing those " - + "hosts which are identified as alive. This boosts the scan " - + "speed of target ranges with a high amount of dead hosts " - + "significantly." + 'test_alive_hosts_only': { + 'type': 'boolean', + 'name': 'test_alive_hosts_only', + 'default': 0, + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'If this option is set, openvas will scan the target list for ' + + 'alive hosts in a separate process while only testing those ' + + 'hosts which are identified as alive. This boosts the scan ' + + 'speed of target ranges with a high amount of dead hosts ' + + 'significantly.' ), }, - "source_iface": { - "type": "string", - "name": "source_iface", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "Name of the network interface that will be used as the source " - + "of connections established by openvas. The scan won't be " - + "launched if the value isn't authorized according to " - + "(sys_)ifaces_allow / (sys_)ifaces_deny if present." + 'source_iface': { + 'type': 'string', + 'name': 'source_iface', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'Name of the network interface that will be used as the source ' + + 'of connections established by openvas. The scan won\'t be ' + + 'launched if the value isn\'t authorized according to ' + + '(sys_)ifaces_allow / (sys_)ifaces_deny if present.' ), }, - "ifaces_allow": { - "type": "string", - "name": "ifaces_allow", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "Comma-separated list of interfaces names that are authorized " - + "as source_iface values." + 'ifaces_allow': { + 'type': 'string', + 'name': 'ifaces_allow', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'Comma-separated list of interfaces names that are authorized ' + + 'as source_iface values.' ), }, - "ifaces_deny": { - "type": "string", - "name": "ifaces_deny", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "Comma-separated list of interfaces names that are not " - + "authorized as source_iface values." + 'ifaces_deny': { + 'type': 'string', + 'name': 'ifaces_deny', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'Comma-separated list of interfaces names that are not ' + + 'authorized as source_iface values.' ), }, - "hosts_allow": { - "type": "string", - "name": "hosts_allow", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "Comma-separated list of the only targets that are authorized " - + "to be scanned. Supports the same syntax as the list targets. " - + "Both target hostnames and the address to which they resolve " - + "are checked. Hostnames in hosts_allow list are not resolved " - + "however." + 'hosts_allow': { + 'type': 'string', + 'name': 'hosts_allow', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'Comma-separated list of the only targets that are authorized ' + + 'to be scanned. Supports the same syntax as the list targets. ' + + 'Both target hostnames and the address to which they resolve ' + + 'are checked. Hostnames in hosts_allow list are not resolved ' + + 'however.' ), }, - "hosts_deny": { - "type": "string", - "name": "hosts_deny", - "default": "", - "mandatory": 0, - "visible_for_client": False, - "description": ( - "Comma-separated list of targets that are not authorized to " - + "be scanned. Supports the same syntax as the list targets. " - + "Both target hostnames and the address to which they resolve " - + "are checked. Hostnames in hosts_deny list are not " - + "resolved however." + 'hosts_deny': { + 'type': 'string', + 'name': 'hosts_deny', + 'default': '', + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( + 'Comma-separated list of targets that are not authorized to ' + + 'be scanned. Supports the same syntax as the list targets. ' + + 'Both target hostnames and the address to which they resolve ' + + 'are checked. Hostnames in hosts_deny list are not ' + + 'resolved however.' ), }, - "table_driven_lsc": { - "type": "boolean", - "name": "table_driven_lsc", - "default": 0, - "mandatory": 0, - "visible_for_client": False, - "description": ( + 'table_driven_lsc': { + 'type': 'boolean', + 'name': 'table_driven_lsc', + 'default': 0, + 'mandatory': 0, + 'visible_for_client': False, + 'description': ( 'If this options is set to "yes", openvas enables the local ' - + "security checks via the table-driven Notus scanner, perfoming " - + "the Notus metadata checksum check which allows the metadata " - + "upload into redis." + + 'security checks via the table-driven Notus scanner, perfoming ' + + 'the Notus metadata checksum check which allows the metadata ' + + 'upload into redis.' ), }, } class TestOspdOpenvas(TestCase): - @patch("ospd_openvas.daemon.Openvas") + @patch('ospd_openvas.daemon.Openvas') def test_set_params_from_openvas_settings(self, mock_openvas: Openvas): mock_openvas.get_settings.return_value = { - "non_simult_ports": "139, 445, 3389, Services/irc", - "plugins_folder": "/foo/bar", + 'non_simult_ports': '139, 445, 3389, Services/irc', + 'plugins_folder': '/foo/bar', } w = DummyDaemon() w.set_params_from_openvas_settings() self.assertEqual(mock_openvas.get_settings.call_count, 1) self.assertEqual(OSPD_PARAMS, OSPD_PARAMS_OUT) - self.assertEqual(w.scan_only_params.get("plugins_folder"), "/foo/bar") + self.assertEqual(w.scan_only_params.get('plugins_folder'), '/foo/bar') - @patch("ospd_openvas.daemon.Openvas") + @patch('ospd_openvas.daemon.Openvas') def test_sudo_available(self, mock_openvas): mock_openvas.check_sudo.return_value = True w = DummyDaemon() w._sudo_available = None # pylint: disable=protected-access w._is_running_as_root = False # pylint: disable=protected-access - w.sudo_available # pylint: disable=pointless-statement self.assertTrue(w.sudo_available) def test_get_custom_xml(self): out = ( - "" - "Services/www, 80" - "3" - "Settings/disable_cgi_scanning" - "Product detection" - "mantis_detect.nasl" - "0" - "" + '' + 'Services/www, 80' + '3' + 'Settings/disable_cgi_scanning' + 'Product detection' + 'mantis_detect.nasl' + '0' + '' ) w = DummyDaemon() - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] res = w.get_custom_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", vt.get("custom") + '1.3.6.1.4.1.25623.1.0.100061', vt.get('custom') ) self.assertEqual(len(res), len(out)) @@ -387,9 +386,9 @@ def test_get_custom_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - custom = {"a": u"\u0006"} + custom = {'a': u"\u0006"} w.get_custom_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", custom=custom + '1.3.6.1.4.1.25623.1.0.100061', custom=custom ) assert_called_once(logging.Logger.warning) @@ -398,18 +397,18 @@ def test_get_severities_xml(self): w = DummyDaemon() out = ( - "" + '' '' - "AV:N/AC:L/Au:N/C:N/I:N/A:N" - "Greenbone" - "1237458156" - "" - "" - ) - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - severities = vt.get("severities") + 'AV:N/AC:L/Au:N/C:N/I:N/A:N' + 'Greenbone' + '1237458156' + '' + '' + ) + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + severities = vt.get('severities') res = w.get_severities_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", severities + '1.3.6.1.4.1.25623.1.0.100061', severities ) self.assertEqual(res, out) @@ -418,9 +417,9 @@ def test_get_severities_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - sever = {"severity_base_vector": u"\u0006"} + sever = {'severity_base_vector': u"\u0006"} w.get_severities_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", severities=sever + '1.3.6.1.4.1.25623.1.0.100061', severities=sever ) assert_called_once(logging.Logger.warning) @@ -428,21 +427,21 @@ def test_get_severities_xml_failed(self): def test_get_params_xml(self): w = DummyDaemon() out = ( - "" + '' '' - "Do not randomize the order in which ports are " - "scanned" - "no" - "" + 'Do not randomize the order in which ports are ' + 'scanned' + 'no' + '' '' - "Data length :" - "" - "" + 'Data length :' + '' + '' ) - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - params = vt.get("vt_params") - res = w.get_params_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", params) + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + params = vt.get('vt_params') + res = w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) self.assertEqual(len(res), len(out)) @@ -451,15 +450,15 @@ def test_get_params_xml_failed(self): logging.Logger.warning = Mock() params = { - "1": { - "id": "1", - "type": "entry", - "default": u"\u0006", - "name": "dns-fuzz.timelimit", - "description": "Description", + '1': { + 'id': '1', + 'type': 'entry', + 'default': u'\u0006', + 'name': 'dns-fuzz.timelimit', + 'description': 'Description', } } - w.get_params_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", params) + w.get_params_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', params) assert_called_once(logging.Logger.warning) @@ -467,9 +466,9 @@ def test_get_refs_xml(self): w = DummyDaemon() out = '' - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - refs = vt.get("vt_refs") - res = w.get_refs_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", refs) + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + refs = vt.get('vt_refs') + res = w.get_refs_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', refs) self.assertEqual(res, out) @@ -477,14 +476,14 @@ def test_get_dependencies_xml(self): w = DummyDaemon() out = ( - "" + '' '' '' - "" + '' ) - dep = ["1.3.6.1.4.1.25623.1.2.3.4", "1.3.6.1.4.1.25623.4.3.2.1"] + dep = ['1.3.6.1.4.1.25623.1.2.3.4', '1.3.6.1.4.1.25623.4.3.2.1'] res = w.get_dependencies_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", dep + '1.3.6.1.4.1.25623.1.0.100061', dep ) self.assertEqual(res, out) @@ -493,13 +492,13 @@ def test_get_dependencies_xml_missing_dep(self): w = DummyDaemon() out = ( - "" + '' '' - "" + '' ) - dep = ["1.3.6.1.4.1.25623.1.2.3.4", "file_name.nasl"] + dep = ['1.3.6.1.4.1.25623.1.2.3.4', 'file_name.nasl'] res = w.get_dependencies_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", dep + '1.3.6.1.4.1.25623.1.0.100061', dep ) self.assertEqual(res, out) @@ -510,7 +509,7 @@ def test_get_dependencies_xml_failed(self): dep = [u"\u0006"] w.get_dependencies_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", vt_dependencies=dep + '1.3.6.1.4.1.25623.1.0.100061', vt_dependencies=dep ) assert_called_once(logging.Logger.error) @@ -518,11 +517,11 @@ def test_get_dependencies_xml_failed(self): def test_get_ctime_xml(self): w = DummyDaemon() - out = "1237458156" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - ctime = vt.get("creation_time") + out = '1237458156' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + ctime = vt.get('creation_time') res = w.get_creation_time_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", ctime + '1.3.6.1.4.1.25623.1.0.100061', ctime ) self.assertEqual(res, out) @@ -531,9 +530,9 @@ def test_get_ctime_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - ctime = u"\u0006" + ctime = u'\u0006' w.get_creation_time_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", vt_creation_time=ctime + '1.3.6.1.4.1.25623.1.0.100061', vt_creation_time=ctime ) assert_called_once(logging.Logger.warning) @@ -541,11 +540,11 @@ def test_get_ctime_xml_failed(self): def test_get_mtime_xml(self): w = DummyDaemon() - out = "1533906565" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - mtime = vt.get("modification_time") + out = '1533906565' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + mtime = vt.get('modification_time') res = w.get_modification_time_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", mtime + '1.3.6.1.4.1.25623.1.0.100061', mtime ) self.assertEqual(res, out) @@ -554,9 +553,9 @@ def test_get_mtime_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - mtime = u"\u0006" + mtime = u'\u0006' w.get_modification_time_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", mtime + '1.3.6.1.4.1.25623.1.0.100061', mtime ) assert_called_once(logging.Logger.warning) @@ -564,11 +563,11 @@ def test_get_mtime_xml_failed(self): def test_get_summary_xml(self): w = DummyDaemon() - out = "some summary" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - summary = vt.get("summary") + out = 'some summary' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + summary = vt.get('summary') res = w.get_summary_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", summary + '1.3.6.1.4.1.25623.1.0.100061', summary ) self.assertEqual(res, out) @@ -576,19 +575,19 @@ def test_get_summary_xml(self): def test_get_summary_xml_failed(self): w = DummyDaemon() - summary = u"\u0006" + summary = u'\u0006' logging.Logger.warning = Mock() - w.get_summary_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", summary) + w.get_summary_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', summary) assert_called_once(logging.Logger.warning) def test_get_impact_xml(self): w = DummyDaemon() - out = "some impact" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - impact = vt.get("impact") - res = w.get_impact_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", impact) + out = 'some impact' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + impact = vt.get('impact') + res = w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) self.assertEqual(res, out) @@ -596,19 +595,19 @@ def test_get_impact_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - impact = u"\u0006" - w.get_impact_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", impact) + impact = u'\u0006' + w.get_impact_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', impact) assert_called_once(logging.Logger.warning) def test_get_insight_xml(self): w = DummyDaemon() - out = "some insight" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - insight = vt.get("insight") + out = 'some insight' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + insight = vt.get('insight') res = w.get_insight_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", insight + '1.3.6.1.4.1.25623.1.0.100061', insight ) self.assertEqual(res, out) @@ -617,8 +616,8 @@ def test_get_insight_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - insight = u"\u0006" - w.get_insight_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", insight) + insight = u'\u0006' + w.get_insight_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', insight) assert_called_once(logging.Logger.warning) @@ -627,16 +626,16 @@ def test_get_solution_xml(self): out = ( '' - "some solution" - "" + 'some solution' + '' ) - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - solution = vt.get("solution") - solution_type = vt.get("solution_type") - solution_method = vt.get("solution_method") + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + solution = vt.get('solution') + solution_type = vt.get('solution_type') + solution_method = vt.get('solution_method') res = w.get_solution_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", + '1.3.6.1.4.1.25623.1.0.100061', solution, solution_type, solution_method, @@ -648,8 +647,8 @@ def test_get_solution_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - solution = u"\u0006" - w.get_solution_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", solution) + solution = u'\u0006' + w.get_solution_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', solution) assert_called_once(logging.Logger.warning) @@ -657,11 +656,11 @@ def test_get_detection_xml(self): w = DummyDaemon() out = '' - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - detection_type = vt.get("qod_type") + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + detection_type = vt.get('qod_type') res = w.get_detection_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", qod_type=detection_type + '1.3.6.1.4.1.25623.1.0.100061', qod_type=detection_type ) self.assertEqual(res, out) @@ -670,19 +669,19 @@ def test_get_detection_xml_failed(self): w = DummyDaemon() logging.Logger.warning = Mock() - detection = u"\u0006" - w.get_detection_vt_as_xml_str("1.3.6.1.4.1.25623.1.0.100061", detection) + detection = u'\u0006' + w.get_detection_vt_as_xml_str('1.3.6.1.4.1.25623.1.0.100061', detection) assert_called_once(logging.Logger.warning) def test_get_affected_xml(self): w = DummyDaemon() - out = "some affection" - vt = w.VTS["1.3.6.1.4.1.25623.1.0.100061"] - affected = vt.get("affected") + out = 'some affection' + vt = w.VTS['1.3.6.1.4.1.25623.1.0.100061'] + affected = vt.get('affected') res = w.get_affected_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", affected=affected + '1.3.6.1.4.1.25623.1.0.100061', affected=affected ) self.assertEqual(res, out) @@ -693,31 +692,31 @@ def test_get_affected_xml_failed(self): affected = u"\u0006" + "affected" w.get_affected_vt_as_xml_str( - "1.3.6.1.4.1.25623.1.0.100061", affected=affected + '1.3.6.1.4.1.25623.1.0.100061', affected=affected ) assert_called_once(logging.Logger.warning) - @patch("ospd_openvas.daemon.Path.exists") - @patch("ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings") + @patch('ospd_openvas.daemon.Path.exists') + @patch('ospd_openvas.daemon.OSPDopenvas.set_params_from_openvas_settings') def test_feed_is_outdated_none( self, mock_set_params: MagicMock, mock_path_exists: MagicMock ): w = DummyDaemon() - w.scan_only_params["plugins_folder"] = "/foo/bar" + w.scan_only_params['plugins_folder'] = '/foo/bar' # Return None mock_path_exists.return_value = False - ret = w.feed_is_outdated("1234") + ret = w.feed_is_outdated('1234') self.assertIsNone(ret) self.assertEqual(mock_set_params.call_count, 1) self.assertEqual(mock_path_exists.call_count, 1) - @patch("ospd_openvas.daemon.Path.exists") - @patch("ospd_openvas.daemon.Path.open") + @patch('ospd_openvas.daemon.Path.exists') + @patch('ospd_openvas.daemon.Path.open') def test_feed_is_outdated_true( self, mock_path_open: MagicMock, @@ -726,23 +725,23 @@ def test_feed_is_outdated_true( read_data = 'PLUGIN_SET = "1235";' mock_path_exists.return_value = True - mock_read = MagicMock(name="Path open context manager") + mock_read = MagicMock(name='Path open context manager') mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) mock_path_open.return_value = mock_read w = DummyDaemon() # Return True - w.scan_only_params["plugins_folder"] = "/foo/bar" + w.scan_only_params['plugins_folder'] = '/foo/bar' - ret = w.feed_is_outdated("1234") + ret = w.feed_is_outdated('1234') self.assertTrue(ret) self.assertEqual(mock_path_exists.call_count, 1) self.assertEqual(mock_path_open.call_count, 1) - @patch("ospd_openvas.daemon.Path.exists") - @patch("ospd_openvas.daemon.Path.open") + @patch('ospd_openvas.daemon.Path.exists') + @patch('ospd_openvas.daemon.Path.open') def test_feed_is_outdated_false( self, mock_path_open: MagicMock, @@ -752,14 +751,14 @@ def test_feed_is_outdated_false( read_data = 'PLUGIN_SET = "1234"' mock_path_exists.return_value = True - mock_read = MagicMock(name="Path open context manager") + mock_read = MagicMock(name='Path open context manager') mock_read.__enter__ = MagicMock(return_value=io.StringIO(read_data)) mock_path_open.return_value = mock_read w = DummyDaemon() - w.scan_only_params["plugins_folder"] = "/foo/bar" + w.scan_only_params['plugins_folder'] = '/foo/bar' - ret = w.feed_is_outdated("1234") + ret = w.feed_is_outdated('1234') self.assertFalse(ret) self.assertEqual(mock_path_exists.call_count, 1) @@ -774,14 +773,14 @@ def test_check_feed_cache_unavailable(self): self.assertFalse(res) w.feed_is_outdated.assert_not_called() - @patch("ospd_openvas.daemon.BaseDB") - @patch("ospd_openvas.daemon.ResultList.add_scan_log_to_list") + @patch('ospd_openvas.daemon.BaseDB') + @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) + w.create_scan('123-456', targets, None, []) results = [ "LOG|||192.168.0.1|||localhost|||general/Host_Details||||||Host dead", @@ -789,20 +788,20 @@ def test_get_openvas_result(self, mock_add_scan_log_to_list, MockDBClass): MockDBClass.get_result.return_value = results mock_add_scan_log_to_list.return_value = None - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') mock_add_scan_log_to_list.assert_called_with( - host="192.168.0.1", - hostname="localhost", - name="", - port="general/Host_Details", - qod="", - test_id="", - uri="", - value="Host dead", - ) - - @patch("ospd_openvas.daemon.BaseDB") - @patch("ospd_openvas.daemon.ResultList.add_scan_error_to_list") + host='192.168.0.1', + hostname='localhost', + name='', + port='general/Host_Details', + qod='', + test_id='', + uri='', + value='Host dead', + ) + + @patch('ospd_openvas.daemon.BaseDB') + @patch('ospd_openvas.daemon.ResultList.add_scan_error_to_list') def test_get_openvas_result_host_deny( self, mock_add_scan_error_to_list, MockDBClass ): @@ -810,7 +809,7 @@ def test_get_openvas_result_host_deny( target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) + w.create_scan('123-456', targets, None, []) results = [ "ERRMSG|||127.0.0.1|||localhost|||||||||Host access denied.", @@ -818,23 +817,23 @@ def test_get_openvas_result_host_deny( MockDBClass.get_result.return_value = results mock_add_scan_error_to_list.return_value = None - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') mock_add_scan_error_to_list.assert_called_with( - host="127.0.0.1", - hostname="localhost", - name="", - port="", - test_id="", - uri="", - value="Host access denied.", + host='127.0.0.1', + hostname='localhost', + name='', + port='', + test_id='', + uri='', + value='Host access denied.', ) - @patch("ospd_openvas.daemon.BaseDB") + @patch('ospd_openvas.daemon.BaseDB') def test_get_openvas_result_dead_hosts(self, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) + w.create_scan('123-456', targets, None, []) results = [ "DEADHOST||| ||| ||| ||| |||4", @@ -842,21 +841,21 @@ def test_get_openvas_result_dead_hosts(self, MockDBClass): MockDBClass.get_result.return_value = results w.scan_collection.set_amount_dead_hosts = MagicMock() - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') w.scan_collection.set_amount_dead_hosts.assert_called_with( - "123-456", + '123-456', total_dead=4, ) - @patch("ospd_openvas.daemon.BaseDB") - @patch("ospd_openvas.daemon.ResultList.add_scan_log_to_list") + @patch('ospd_openvas.daemon.BaseDB') + @patch('ospd_openvas.daemon.ResultList.add_scan_log_to_list') def test_get_openvas_result_host_start( self, mock_add_scan_log_to_list, MockDBClass ): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) + w.create_scan('123-456', targets, None, []) results = [ "HOST_START|||192.168.10.124||| ||| ||||||today 1", @@ -865,20 +864,20 @@ def test_get_openvas_result_host_start( MockDBClass.get_result.return_value = results mock_add_scan_log_to_list.return_value = None - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') mock_add_scan_log_to_list.assert_called_with( - host="192.168.10.124", - name="HOST_START", - value="today 1", + host='192.168.10.124', + name='HOST_START', + value='today 1', ) - @patch("ospd_openvas.daemon.BaseDB") + @patch('ospd_openvas.daemon.BaseDB') def test_get_openvas_result_hosts_count(self, MockDBClass): w = DummyDaemon() target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) + w.create_scan('123-456', targets, None, []) results = [ "HOSTS_COUNT||| ||| ||| ||| |||4", @@ -886,14 +885,14 @@ def test_get_openvas_result_hosts_count(self, MockDBClass): MockDBClass.get_result.return_value = results w.set_scan_total_hosts = MagicMock() - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') w.set_scan_total_hosts.assert_called_with( - "123-456", + '123-456', 4, ) - @patch("ospd_openvas.daemon.BaseDB") - @patch("ospd_openvas.daemon.ResultList.add_scan_alarm_to_list") + @patch('ospd_openvas.daemon.BaseDB') + @patch('ospd_openvas.daemon.ResultList.add_scan_alarm_to_list') def test_result_without_vt_oid( self, mock_add_scan_alarm_to_list, MockDBClass ): @@ -902,38 +901,38 @@ def test_result_without_vt_oid( target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) - w.scan_collection.scans_table["123-456"]["results"] = list() + w.create_scan('123-456', targets, None, []) + w.scan_collection.scans_table['123-456']['results'] = list() results = ["ALARM||| ||| ||| ||| |||some alarm|||path", None] MockDBClass.get_result.return_value = results mock_add_scan_alarm_to_list.return_value = None - w.report_openvas_results(MockDBClass, "123-456") + w.report_openvas_results(MockDBClass, '123-456') assert_called_once(logging.Logger.warning) - @patch("ospd_openvas.db.KbDB") + @patch('ospd_openvas.db.KbDB') def test_openvas_is_alive_already_stopped(self, mock_db): w = DummyDaemon() # mock_psutil = MockPsutil.return_value mock_db.scan_is_stopped.return_value = True - ret = w.is_openvas_process_alive(mock_db, "1234", "a1-b2-c3-d4") + ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d4') self.assertTrue(ret) - @patch("psutil.Process") - @patch("ospd_openvas.db.KbDB") + @patch('psutil.Process') + @patch('ospd_openvas.db.KbDB') def test_openvas_is_alive_still(self, mock_db, mock_psutil): w = DummyDaemon() mock_psutil.side_effect = TypeError mock_db.scan_is_stopped.return_value = False - ret = w.is_openvas_process_alive(mock_db, "1234", "a1-b2-c3-d3") + ret = w.is_openvas_process_alive(mock_db, '1234', 'a1-b2-c3-d3') self.assertFalse(ret) - @patch("ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch") - @patch("ospd_openvas.daemon.OSPDaemon.sort_host_finished") - @patch("ospd_openvas.db.KbDB") + @patch('ospd_openvas.daemon.OSPDaemon.set_scan_progress_batch') + @patch('ospd_openvas.daemon.OSPDaemon.sort_host_finished') + @patch('ospd_openvas.db.KbDB') def test_report_openvas_scan_status( self, mock_db, mock_sort_host_finished, mock_set_scan_progress_batch ): @@ -942,65 +941,65 @@ def test_report_openvas_scan_status( mock_set_scan_progress_batch.return_value = None mock_sort_host_finished.return_value = None mock_db.get_scan_status.return_value = [ - "192.168.0.1/15/1000", - "192.168.0.2/15/0", - "192.168.0.3/15/-1", - "192.168.0.4/1500/1500", + '192.168.0.1/15/1000', + '192.168.0.2/15/0', + '192.168.0.3/15/-1', + '192.168.0.4/1500/1500', ] target_element = w.create_xml_target() targets = OspRequest.process_target_element(target_element) - w.create_scan("123-456", targets, None, []) - w.report_openvas_scan_status(mock_db, "123-456") + w.create_scan('123-456', targets, None, []) + w.report_openvas_scan_status(mock_db, '123-456') mock_set_scan_progress_batch.assert_called_with( - "123-456", + '123-456', host_progress={ - "192.168.0.1": 1, - "192.168.0.3": -1, - "192.168.0.4": 100, + '192.168.0.1': 1, + '192.168.0.3': -1, + '192.168.0.4': 100, }, ) mock_sort_host_finished.assert_called_with( - "123-456", ["192.168.0.3", "192.168.0.4"] + '123-456', ['192.168.0.3', '192.168.0.4'] ) class TestFilters(TestCase): def test_format_vt_modification_time(self): ovformat = OpenVasVtsFilter(None) - td = "1517443741" + td = '1517443741' formatted = ovformat.format_vt_modification_time(td) self.assertEqual(formatted, "20180201000901") def test_get_filtered_vts_false(self): w = DummyDaemon() - vts_collection = ["1234", "1.3.6.1.4.1.25623.1.0.100061"] + vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] ovfilter = OpenVasVtsFilter(w.nvti) res = ovfilter.get_filtered_vts_list( vts_collection, "modification_time<10" ) - self.assertNotIn("1.3.6.1.4.1.25623.1.0.100061", res) + self.assertNotIn('1.3.6.1.4.1.25623.1.0.100061', res) def test_get_filtered_vts_true(self): w = DummyDaemon() - vts_collection = ["1234", "1.3.6.1.4.1.25623.1.0.100061"] + vts_collection = ['1234', '1.3.6.1.4.1.25623.1.0.100061'] ovfilter = OpenVasVtsFilter(w.nvti) res = ovfilter.get_filtered_vts_list( vts_collection, "modification_time>10" ) - self.assertIn("1.3.6.1.4.1.25623.1.0.100061", res) + self.assertIn('1.3.6.1.4.1.25623.1.0.100061', res) def test_get_severity_score_v2(self): w = DummyDaemon() vtaux = { - "severities": { - "severity_type": "cvss_base_v2", - "severity_base_vector": "AV:N/AC:L/Au:N/C:P/I:N/A:N", + 'severities': { + 'severity_type': 'cvss_base_v2', + 'severity_base_vector': 'AV:N/AC:L/Au:N/C:P/I:N/A:N', } } @@ -1010,9 +1009,9 @@ def test_get_severity_score_v2(self): def test_get_severity_score_v3(self): w = DummyDaemon() vtaux = { - "severities": { - "severity_type": "cvss_base_v3", - "severity_base_vector": "CVSS:3.0/AV:L/AC:H/PR:H/UI:R/S:U/C:N/I:L/A:L", + 'severities': { + 'severity_type': 'cvss_base_v3', + 'severity_base_vector': 'CVSS:3.0/AV:L/AC:H/PR:H/UI:R/S:U/C:N/I:L/A:L', } }