From cf094df23957cd37648be397e7d2af9f91523b32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Wed, 6 Dec 2023 17:18:04 +0100 Subject: [PATCH 01/17] added unit tests for ssh, telnet and vnc servers --- setup.py | 53 +++++++++++++------------ tests/__init__.py | 0 tests/conftest.py | 21 ++++++++++ tests/test_ssh_server.py | 78 +++++++++++++++++++++++++++++++++++++ tests/test_telnet_server.py | 58 +++++++++++++++++++++++++++ tests/test_vnc_server.py | 59 ++++++++++++++++++++++++++++ tests/utils.py | 25 ++++++++++++ 7 files changed, 270 insertions(+), 24 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_ssh_server.py create mode 100644 tests/test_telnet_server.py create mode 100644 tests/test_vnc_server.py create mode 100644 tests/utils.py diff --git a/setup.py b/setup.py index 859031d..88ccc57 100644 --- a/setup.py +++ b/setup.py @@ -4,37 +4,42 @@ long_description = f.read() setup( - name='honeypots', - author='QeeqBox', - author_email='gigaqeeq@gmail.com', + name="honeypots", + author="QeeqBox", + author_email="gigaqeeq@gmail.com", description=r"30 different honeypots in one package! (dhcp, dns, elastic, ftp, http proxy, https proxy, http, https, imap, ipp, irc, ldap, memcache, mssql, mysql, ntp, oracle, pjl, pop3, postgres, rdp, redis, sip, smb, smtp, snmp, socks5, ssh, telnet, vnc)", long_description=long_description, - version='0.64', + version="0.64", license="AGPL-3.0", - license_files=('LICENSE'), + license_files=("LICENSE"), url="https://github.com/qeeqbox/honeypots", - packages=['honeypots'], - entry_points={ - "console_scripts": [ - 'honeypots=honeypots.__main__:main_logic' - ] - }, + packages=["honeypots"], + entry_points={"console_scripts": ["honeypots=honeypots.__main__:main_logic"]}, include_package_data=True, install_requires=[ - 'twisted==21.7.0', - 'psutil==5.9.0', - 'psycopg2-binary==2.9.3', - 'pycrypto==2.6.1', - 'requests==2.28.2', - 'requests[socks]==2.28.2', - 'impacket==0.9.24', - 'paramiko==3.1.0', - 'scapy==2.4.5', - 'service_identity==21.1.0', - 'netifaces==0.11.0' + "twisted==21.7.0", + "psutil==5.9.0", + "psycopg2-binary==2.9.3", + "pycrypto==2.6.1", + "requests==2.28.2", + "requests[socks]==2.28.2", + "impacket==0.9.24", + "paramiko==3.1.0", + "scapy==2.4.5", + "service_identity==21.1.0", + "netifaces==0.11.0", ], extras_require={ - 'test': ['redis', 'mysql-connector', 'elasticsearch', 'pymssql', 'ldap3', 'pysnmp'] + "test": [ + "redis", + "mysql-connector", + "elasticsearch", + "pymssql", + "ldap3", + "pysnmp", + "pytest", + "vncdotool", + ] }, - python_requires='>=3.5' + python_requires=">=3.8", ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..4acebb5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import json +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + + +@pytest.fixture +def config_for_testing() -> Path: + with TemporaryDirectory() as tmp_dir: + config = Path(tmp_dir) / "config.json" + logs_output_dir = Path(tmp_dir) / "logs" + logs_output_dir.mkdir() + testing_config = { + "logs": "file,terminal,json", + "logs_location": str(logs_output_dir.absolute()), + } + config.write_text(json.dumps(testing_config)) + yield config diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py new file mode 100644 index 0000000..e94f3df --- /dev/null +++ b/tests/test_ssh_server.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import json +from multiprocessing import Process +from pathlib import Path +from time import sleep + +import pytest +from honeypots import QSSHServer +from paramiko import SSHClient, AutoAddPolicy + +from .utils import find_free_port, load_logs_from_file + +IP = "127.0.0.1" +PORT = find_free_port() +USERNAME = "testing" +PASSWORD = "testing" +EXPECTED_KEYS = ["action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp"] +SERVER_CONFIG = { + "honeypots": { + "ssh": { + "backup_count": 10, + "ip": IP, + "log_file_name": "ssh.jsonl", + "max_bytes": 10000, + "options": ["capture_commands"], + "password": PASSWORD, + "port": str(PORT), + "username": USERNAME, + }, + } +} + + +@pytest.fixture +def custom_config(config_for_testing: Path): + config = json.loads(config_for_testing.read_text()) + config.update(SERVER_CONFIG) + config_for_testing.write_text(json.dumps(config)) + yield config_for_testing + + +@pytest.fixture +def server_logs(custom_config: Path): + _server = QSSHServer( + ip=IP, + port=str(PORT), + username=USERNAME, + password=PASSWORD, + options="", + config=str(custom_config.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield custom_config.parent / "logs" + server_process.terminate() + server_process.join() + + +def test_ssh_server(server_logs): + ssh = SSHClient() + ssh.set_missing_host_key_policy(AutoAddPolicy()) + ssh.connect(IP, port=PORT, username=USERNAME, password=PASSWORD) + ssh.close() + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + assert all(k in logs[0] for k in EXPECTED_KEYS) + assert logs[0]["dest_ip"] == IP + assert logs[0]["dest_port"] == str(PORT) + assert logs[0]["action"] == "connection" + + assert logs[1]["action"] == "login" + assert logs[1]["username"] == USERNAME diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py new file mode 100644 index 0000000..a9f5169 --- /dev/null +++ b/tests/test_telnet_server.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from multiprocessing import Process +from pathlib import Path +from time import sleep + +import pytest +from honeypots import QTelnetServer +from telnetlib import Telnet + +from .utils import find_free_port, load_logs_from_file + +IP = "127.0.0.1" +PORT = str(find_free_port()) +USERNAME = "testing" +PASSWORD = "testing" +EXPECTED_KEYS = ['action', 'dest_ip', 'dest_port', 'server', 'src_ip', 'src_port', 'timestamp'] + + +@pytest.fixture +def server_logs(config_for_testing: Path): + _server = QTelnetServer( + ip=IP, + port=PORT, + username=USERNAME, + password=PASSWORD, + options="", + config=str(config_for_testing.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield config_for_testing.parent / "logs" + server_process.terminate() + server_process.join() + + +def test_telnet_server(server_logs): + telnet_client = Telnet(IP, int(PORT)) + telnet_client.read_until(b"login: ") + telnet_client.write(USERNAME.encode() + b"\n") + telnet_client.read_until(b"Password: ") + telnet_client.write(PASSWORD.encode() + b"\n") + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + assert all(k in logs[0] for k in EXPECTED_KEYS) + assert logs[0]["dest_ip"] == IP + assert logs[0]["dest_port"] == PORT + assert logs[0]["action"] == "connection" + + assert all(k in logs[1] for k in ("username", "password")) + assert logs[1]["action"] == "login" + assert logs[1]["username"] == USERNAME + assert logs[1]["password"] == PASSWORD diff --git a/tests/test_vnc_server.py b/tests/test_vnc_server.py new file mode 100644 index 0000000..ee2eb85 --- /dev/null +++ b/tests/test_vnc_server.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from multiprocessing import Process +from pathlib import Path +from time import sleep + +import pytest +from vncdotool import api + +from honeypots import QVNCServer +from .utils import find_free_port, load_logs_from_file + +IP = "127.0.0.1" +PORT = str(find_free_port()) +USERNAME = "testing" +PASSWORD = "testing" +EXPECTED_KEYS = ['action', 'dest_ip', 'dest_port', 'server', 'src_ip', 'src_port', 'timestamp'] + + +@pytest.fixture +def server_logs(config_for_testing: Path): + _server = QVNCServer( + ip=IP, + port=PORT, + username=USERNAME, + password=PASSWORD, + options="", + config=str(config_for_testing.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield config_for_testing.parent / "logs" + server_process.terminate() + server_process.join() + + +def _connect_to_vnc(): + client = api.connect('{}::{}'.format(IP, PORT), password=PASSWORD) + client.disconnect() + + +def test_vnc_server(server_logs): + # This VNC API creates a blocking daemon thread that can't be trivially stopped, + # so we just run it in a process and terminate that instead + process = Process(target=_connect_to_vnc) + process.start() + sleep(1) # give the server process some time to write logs + process.terminate() + process.join(timeout=5) + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 1 + assert all(k in logs[0] for k in EXPECTED_KEYS) + assert logs[0]["dest_ip"] == IP + assert logs[0]["dest_port"] == PORT + assert logs[0]["action"] == "connection" diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..fd6cf99 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import json +import socket +from pathlib import Path + + +def find_free_port(start_port: int = 50_000, end_port: int = 60_000, sock_type: int = socket.SOCK_STREAM) -> int: + for port in range(start_port, end_port + 1): + with socket.socket(socket.AF_INET, sock_type) as s: + try: + s.bind(("", port)) + return port + except OSError: + pass + raise Exception("No free port found") + + +def load_logs_from_file(file: Path) -> list[dict]: + logs = [] + for line in file.read_text().splitlines(): + if not line: + continue + logs.append(json.loads(line)) + return logs From 325c3a41c3cb58958b6c644cec6ad8e9491270d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 7 Dec 2023 11:23:35 +0100 Subject: [PATCH 02/17] unit tests: refactoring --- tests/conftest.py | 29 +++++++++++++++++++ tests/test_ssh_server.py | 55 +++++++++---------------------------- tests/test_telnet_server.py | 52 ++++++++++++----------------------- tests/test_vnc_server.py | 40 +++++++-------------------- tests/utils.py | 28 ++++++++++++------- 5 files changed, 88 insertions(+), 116 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 4acebb5..4514729 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,14 @@ from __future__ import annotations import json +from multiprocessing import Process from pathlib import Path from tempfile import TemporaryDirectory import pytest +from .utils import IP, PASSWORD, USERNAME + @pytest.fixture def config_for_testing() -> Path: @@ -19,3 +22,29 @@ def config_for_testing() -> Path: } config.write_text(json.dumps(testing_config)) yield config + + +def _update_config(custom_config: dict, config_path: Path): + config = json.loads(config_path.read_text()) + config.update(custom_config) + config_path.write_text(json.dumps(config)) + + +@pytest.fixture +def server_logs(request, config_for_testing: Path): + custom_config = request.param.get("custom_config", {}) + if custom_config: + _update_config(custom_config, config_for_testing) + _server = request.param["server"]( + ip=IP, + port=request.param["port"], + username=USERNAME, + password=PASSWORD, + options="", + config=str(config_for_testing.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield config_for_testing.parent / "logs" + server_process.terminate() + server_process.join() diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py index e94f3df..cf0fb94 100644 --- a/tests/test_ssh_server.py +++ b/tests/test_ssh_server.py @@ -1,21 +1,14 @@ from __future__ import annotations -import json -from multiprocessing import Process -from pathlib import Path from time import sleep import pytest -from honeypots import QSSHServer -from paramiko import SSHClient, AutoAddPolicy +from paramiko import AutoAddPolicy, SSHClient -from .utils import find_free_port, load_logs_from_file +from honeypots import QSSHServer +from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD, USERNAME -IP = "127.0.0.1" -PORT = find_free_port() -USERNAME = "testing" -PASSWORD = "testing" -EXPECTED_KEYS = ["action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp"] +PORT = 50022 SERVER_CONFIG = { "honeypots": { "ssh": { @@ -32,31 +25,11 @@ } -@pytest.fixture -def custom_config(config_for_testing: Path): - config = json.loads(config_for_testing.read_text()) - config.update(SERVER_CONFIG) - config_for_testing.write_text(json.dumps(config)) - yield config_for_testing - - -@pytest.fixture -def server_logs(custom_config: Path): - _server = QSSHServer( - ip=IP, - port=str(PORT), - username=USERNAME, - password=PASSWORD, - options="", - config=str(custom_config.absolute()), - ) - server_process = Process(target=_server.run_server) - server_process.start() - yield custom_config.parent / "logs" - server_process.terminate() - server_process.join() - - +@pytest.mark.parametrize( + "server_logs", + [{"server": QSSHServer, "port": str(PORT), "custom_config": SERVER_CONFIG}], + indirect=True, +) def test_ssh_server(server_logs): ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) @@ -69,10 +42,8 @@ def test_ssh_server(server_logs): logs = load_logs_from_file(log_files[0]) assert len(logs) == 2 - assert all(k in logs[0] for k in EXPECTED_KEYS) - assert logs[0]["dest_ip"] == IP - assert logs[0]["dest_port"] == str(PORT) - assert logs[0]["action"] == "connection" + connect, login = logs + assert_connect_is_logged(connect, str(PORT)) - assert logs[1]["action"] == "login" - assert logs[1]["username"] == USERNAME + assert login["action"] == "login" + assert login["username"] == USERNAME diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py index a9f5169..3fa11d1 100644 --- a/tests/test_telnet_server.py +++ b/tests/test_telnet_server.py @@ -1,45 +1,35 @@ from __future__ import annotations -from multiprocessing import Process -from pathlib import Path from time import sleep import pytest from honeypots import QTelnetServer from telnetlib import Telnet -from .utils import find_free_port, load_logs_from_file +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) -IP = "127.0.0.1" -PORT = str(find_free_port()) -USERNAME = "testing" -PASSWORD = "testing" -EXPECTED_KEYS = ['action', 'dest_ip', 'dest_port', 'server', 'src_ip', 'src_port', 'timestamp'] - - -@pytest.fixture -def server_logs(config_for_testing: Path): - _server = QTelnetServer( - ip=IP, - port=PORT, - username=USERNAME, - password=PASSWORD, - options="", - config=str(config_for_testing.absolute()), - ) - server_process = Process(target=_server.run_server) - server_process.start() - yield config_for_testing.parent / "logs" - server_process.terminate() - server_process.join() +PORT = "50023" +@pytest.mark.parametrize( + "server_logs", + [{"server": QTelnetServer, "port": PORT}], + indirect=True, +) def test_telnet_server(server_logs): telnet_client = Telnet(IP, int(PORT)) telnet_client.read_until(b"login: ") telnet_client.write(USERNAME.encode() + b"\n") telnet_client.read_until(b"Password: ") telnet_client.write(PASSWORD.encode() + b"\n") + sleep(1) # give the server process some time to write logs log_files = [f for f in server_logs.iterdir()] @@ -47,12 +37,6 @@ def test_telnet_server(server_logs): logs = load_logs_from_file(log_files[0]) assert len(logs) == 2 - assert all(k in logs[0] for k in EXPECTED_KEYS) - assert logs[0]["dest_ip"] == IP - assert logs[0]["dest_port"] == PORT - assert logs[0]["action"] == "connection" - - assert all(k in logs[1] for k in ("username", "password")) - assert logs[1]["action"] == "login" - assert logs[1]["username"] == USERNAME - assert logs[1]["password"] == PASSWORD + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_vnc_server.py b/tests/test_vnc_server.py index ee2eb85..14f9324 100644 --- a/tests/test_vnc_server.py +++ b/tests/test_vnc_server.py @@ -1,44 +1,27 @@ from __future__ import annotations from multiprocessing import Process -from pathlib import Path from time import sleep import pytest from vncdotool import api from honeypots import QVNCServer -from .utils import find_free_port, load_logs_from_file - -IP = "127.0.0.1" -PORT = str(find_free_port()) -USERNAME = "testing" -PASSWORD = "testing" -EXPECTED_KEYS = ['action', 'dest_ip', 'dest_port', 'server', 'src_ip', 'src_port', 'timestamp'] - - -@pytest.fixture -def server_logs(config_for_testing: Path): - _server = QVNCServer( - ip=IP, - port=PORT, - username=USERNAME, - password=PASSWORD, - options="", - config=str(config_for_testing.absolute()), - ) - server_process = Process(target=_server.run_server) - server_process.start() - yield config_for_testing.parent / "logs" - server_process.terminate() - server_process.join() +from .utils import assert_connect_is_logged, IP, load_logs_from_file, PASSWORD + +PORT = "55900" def _connect_to_vnc(): - client = api.connect('{}::{}'.format(IP, PORT), password=PASSWORD) + client = api.connect(f"{IP}::{PORT}", password=PASSWORD) client.disconnect() +@pytest.mark.parametrize( + "server_logs", + [{"server": QVNCServer, "port": PORT}], + indirect=True, +) def test_vnc_server(server_logs): # This VNC API creates a blocking daemon thread that can't be trivially stopped, # so we just run it in a process and terminate that instead @@ -53,7 +36,4 @@ def test_vnc_server(server_logs): logs = load_logs_from_file(log_files[0]) assert len(logs) == 1 - assert all(k in logs[0] for k in EXPECTED_KEYS) - assert logs[0]["dest_ip"] == IP - assert logs[0]["dest_port"] == PORT - assert logs[0]["action"] == "connection" + assert_connect_is_logged(logs[0], PORT) diff --git a/tests/utils.py b/tests/utils.py index fd6cf99..bd2a0ed 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,16 +4,10 @@ import socket from pathlib import Path - -def find_free_port(start_port: int = 50_000, end_port: int = 60_000, sock_type: int = socket.SOCK_STREAM) -> int: - for port in range(start_port, end_port + 1): - with socket.socket(socket.AF_INET, sock_type) as s: - try: - s.bind(("", port)) - return port - except OSError: - pass - raise Exception("No free port found") +IP = "127.0.0.1" +USERNAME = "testing" +PASSWORD = "testing" +EXPECTED_KEYS = ["action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp"] def load_logs_from_file(file: Path) -> list[dict]: @@ -23,3 +17,17 @@ def load_logs_from_file(file: Path) -> list[dict]: continue logs.append(json.loads(line)) return logs + + +def assert_connect_is_logged(connect: dict[str, str], port: str): + assert all(k in connect for k in EXPECTED_KEYS) + assert connect["dest_ip"] == IP + assert connect["dest_port"] == port + assert connect["action"] == "connection" + + +def assert_login_is_logged(login: dict[str, str]): + assert all(k in login for k in ("username", "password")) + assert login["action"] == "login" + assert login["username"] == USERNAME + assert login["password"] == PASSWORD From 7a915e4b09731fe24b0a3a13e82c452544892ead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 7 Dec 2023 11:23:43 +0100 Subject: [PATCH 03/17] added .gitignore file --- .gitignore | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..40b1dff --- /dev/null +++ b/.gitignore @@ -0,0 +1,67 @@ +### Python template +__pycache__/ + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDEs +.spyderproject +.spyproject +.idea/ +.vscode/ +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ From 15d345b92028f017486aab88103d62b9916b602d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 7 Dec 2023 14:03:42 +0100 Subject: [PATCH 04/17] unit tests: added redis, sip, smb, smtp, snmp and socks5 --- setup.py | 3 +- tests/test_redis_server.py | 43 ++++++++++++++++++++++++++ tests/test_sip_server.py | 61 +++++++++++++++++++++++++++++++++++++ tests/test_smb_server.py | 44 ++++++++++++++++++++++++++ tests/test_smtp_server.py | 44 ++++++++++++++++++++++++++ tests/test_snmp_server.py | 40 ++++++++++++++++++++++++ tests/test_socks5_server.py | 44 ++++++++++++++++++++++++++ tests/utils.py | 10 +++--- 8 files changed, 284 insertions(+), 5 deletions(-) create mode 100644 tests/test_redis_server.py create mode 100644 tests/test_sip_server.py create mode 100644 tests/test_smb_server.py create mode 100644 tests/test_smtp_server.py create mode 100644 tests/test_snmp_server.py create mode 100644 tests/test_socks5_server.py diff --git a/setup.py b/setup.py index 88ccc57..e6477c1 100644 --- a/setup.py +++ b/setup.py @@ -36,8 +36,9 @@ "elasticsearch", "pymssql", "ldap3", - "pysnmp", + "pysnmplib", "pytest", + "redis", "vncdotool", ] }, diff --git a/tests/test_redis_server.py b/tests/test_redis_server.py new file mode 100644 index 0000000..19e0585 --- /dev/null +++ b/tests/test_redis_server.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from time import sleep + +import pytest +from honeypots import QRedisServer +from redis import AuthenticationError, StrictRedis + +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "56379" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QRedisServer, "port": PORT}], + indirect=True, +) +def test_redis_server(server_logs): + try: + redis = StrictRedis.from_url(f"redis://{USERNAME}:{PASSWORD}@{IP}:{PORT}/1") + for _ in redis.scan_iter("user:*"): + pass + except AuthenticationError: + pass + + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_sip_server.py b/tests/test_sip_server.py new file mode 100644 index 0000000..e31b940 --- /dev/null +++ b/tests/test_sip_server.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from _socket import IPPROTO_UDP +from socket import AF_INET, SOCK_DGRAM, socket +from time import sleep + +import pytest +from honeypots import QSIPServer + +from .utils import IP, load_logs_from_file + +PORT = "55060" +EXPECTED_KEYS = ("action", "server", "src_ip", "src_port", "timestamp") +CALL_ID = "1@0.0.0.0" +CONTACT = "sip:user_3@test.test.test" +FROM = f"{CONTACT};tag=none" +TO = "" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSIPServer, "port": PORT}], + indirect=True, +) +def test_sip_server(server_logs): + sleep(1) # give the server some time to start + + sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + sock.sendto( + "INVITE sip:user_1@test.test SIP/2.0\r\n" + f"To: {TO}\r\n" + f"From: {FROM}\r\n" + f"Call-ID: {CALL_ID}\r\n" + "CSeq: 1 INVITE\r\n" + f"Contact: {CONTACT}\r\n" + "Via: SIP/2.0/TCP 0.0.0.0;branch=34uiddhjczqw3mq23\r\n" + "Content-Length: 1\r\n\r\nT".encode(), + (IP, int(PORT)), + ) + sock.close() + + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + connect, request = logs + + assert all(k in connect for k in EXPECTED_KEYS) + assert connect["action"] == "connection" + assert connect["server"] == "sip_server" + + assert request["action"] == "request" + assert request["src_ip"] == IP + assert "data" in request + assert request["data"]["call-id"] == CALL_ID + assert request["data"]["contact"] == CONTACT + assert request["data"]["from"] == FROM + assert request["data"]["to"] == TO diff --git a/tests/test_smb_server.py b/tests/test_smb_server.py new file mode 100644 index 0000000..43d0077 --- /dev/null +++ b/tests/test_smb_server.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from impacket.smbconnection import SMBConnection +from time import sleep + +import pytest + +from honeypots import QSMBServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50445" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSMBServer, "port": PORT}], + indirect=True, +) +def test_smb_server(server_logs): + sleep(5) # give the server some time to start + + smb_client = SMBConnection(IP, IP, sess_port=PORT) + smb_client.login(USERNAME, PASSWORD) + smb_client.close() + + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 3 + for entry in logs: + assert_connect_is_logged(entry, PORT) + + assert "Incoming connection" in logs[0]["data"] + assert "AUTHENTICATE_MESSAGE" in logs[1]["data"] + assert "authenticated successfully" in logs[2]["data"] diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py new file mode 100644 index 0000000..dbbfdbc --- /dev/null +++ b/tests/test_smtp_server.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from smtplib import SMTP +from time import sleep + +import pytest +from honeypots import QSMTPServer + +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50025" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSMTPServer, "port": PORT}], + indirect=True, +) +def test_smtp_server(server_logs): + sleep(1) # give server time to start + + client = SMTP(IP, int(PORT)) + client.ehlo() + client.login(USERNAME, PASSWORD) + client.sendmail('fromtest', 'totest', 'Nothing') + client.quit() + + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_snmp_server.py b/tests/test_snmp_server.py new file mode 100644 index 0000000..24e8cee --- /dev/null +++ b/tests/test_snmp_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import pytest +from pysnmp.hlapi import CommunityData, ContextData, getCmd, ObjectIdentity, ObjectType, SnmpEngine, UdpTransportTarget + +from honeypots import QSNMPServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, +) + +PORT = "50161" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSNMPServer, "port": PORT}], + indirect=True, +) +def test_snmp_server(server_logs): + g = getCmd( + SnmpEngine(), + CommunityData("public"), + UdpTransportTarget((IP, int(PORT))), + ContextData(), + ObjectType(ObjectIdentity("1.3.6.1.4.1.9.9.618.1.4.1.0")), + ) + next(g) + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) >= 2 + connect, query, *_ = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert query["data"] == {"community": "public", "oids": "1.3.6.1.4.1.9.9.618.1.4.1.0", "version": "1"} diff --git a/tests/test_socks5_server.py b/tests/test_socks5_server.py new file mode 100644 index 0000000..95a3474 --- /dev/null +++ b/tests/test_socks5_server.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from time import sleep + +import pytest +import requests + +from honeypots import QSOCKS5Server +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "51080" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QSOCKS5Server, "port": PORT}], + indirect=True, +) +def test_socks5_server(server_logs): + try: + requests.get( + "http://127.0.0.1/", + proxies={"http": f"socks5://{USERNAME}:{PASSWORD}@{IP}:{PORT}"}, + ) + except requests.exceptions.ConnectionError: + pass + + sleep(1) # give the server process some time to write logs + + log_files = [f for f in server_logs.iterdir()] + assert len(log_files) == 1 + logs = load_logs_from_file(log_files[0]) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/utils.py b/tests/utils.py index bd2a0ed..f7d93e6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,13 +1,12 @@ from __future__ import annotations import json -import socket from pathlib import Path IP = "127.0.0.1" USERNAME = "testing" PASSWORD = "testing" -EXPECTED_KEYS = ["action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp"] +EXPECTED_KEYS = ("action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp") def load_logs_from_file(file: Path) -> list[dict]: @@ -19,8 +18,10 @@ def load_logs_from_file(file: Path) -> list[dict]: return logs -def assert_connect_is_logged(connect: dict[str, str], port: str): - assert all(k in connect for k in EXPECTED_KEYS) +def assert_connect_is_logged( + connect: dict[str, str], port: str, expected_keys: list[str] | tuple[str, ...] = EXPECTED_KEYS +): + assert all(k in connect for k in expected_keys) assert connect["dest_ip"] == IP assert connect["dest_port"] == port assert connect["action"] == "connection" @@ -31,3 +32,4 @@ def assert_login_is_logged(login: dict[str, str]): assert login["action"] == "login" assert login["username"] == USERNAME assert login["password"] == PASSWORD + assert login["status"] == "success" From 8118d9c09a1f81160324dfe7acb908400db310f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 7 Dec 2023 17:39:30 +0100 Subject: [PATCH 05/17] unit tests: more refactoring --- tests/conftest.py | 44 +++++++++++++++++-------------------- tests/test_redis_server.py | 9 +++----- tests/test_sip_server.py | 37 ++++++++++++++----------------- tests/test_smb_server.py | 4 +--- tests/test_smtp_server.py | 4 +--- tests/test_snmp_server.py | 4 +--- tests/test_socks5_server.py | 4 +--- tests/test_ssh_server.py | 4 +--- tests/test_telnet_server.py | 4 +--- tests/test_vnc_server.py | 4 +--- tests/utils.py | 25 +++++++++++++++++++-- 11 files changed, 70 insertions(+), 73 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 4514729..2358328 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,17 +1,19 @@ from __future__ import annotations import json +from contextlib import contextmanager from multiprocessing import Process from pathlib import Path from tempfile import TemporaryDirectory +from typing import Iterator import pytest from .utils import IP, PASSWORD, USERNAME -@pytest.fixture -def config_for_testing() -> Path: +@contextmanager +def config_for_testing(custom_config: dict) -> Iterator[Path]: with TemporaryDirectory() as tmp_dir: config = Path(tmp_dir) / "config.json" logs_output_dir = Path(tmp_dir) / "logs" @@ -19,32 +21,26 @@ def config_for_testing() -> Path: testing_config = { "logs": "file,terminal,json", "logs_location": str(logs_output_dir.absolute()), + **custom_config, } config.write_text(json.dumps(testing_config)) yield config -def _update_config(custom_config: dict, config_path: Path): - config = json.loads(config_path.read_text()) - config.update(custom_config) - config_path.write_text(json.dumps(config)) - - @pytest.fixture -def server_logs(request, config_for_testing: Path): +def server_logs(request): custom_config = request.param.get("custom_config", {}) - if custom_config: - _update_config(custom_config, config_for_testing) - _server = request.param["server"]( - ip=IP, - port=request.param["port"], - username=USERNAME, - password=PASSWORD, - options="", - config=str(config_for_testing.absolute()), - ) - server_process = Process(target=_server.run_server) - server_process.start() - yield config_for_testing.parent / "logs" - server_process.terminate() - server_process.join() + with config_for_testing(custom_config) as config_file: + _server = request.param["server"]( + ip=IP, + port=request.param["port"], + username=USERNAME, + password=PASSWORD, + options="", + config=str(config_file.absolute()), + ) + server_process = Process(target=_server.run_server) + server_process.start() + yield config_file.parent / "logs" + server_process.terminate() + server_process.join() diff --git a/tests/test_redis_server.py b/tests/test_redis_server.py index 19e0585..128a8f9 100644 --- a/tests/test_redis_server.py +++ b/tests/test_redis_server.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import suppress from time import sleep import pytest @@ -24,18 +25,14 @@ indirect=True, ) def test_redis_server(server_logs): - try: + with suppress(AuthenticationError): redis = StrictRedis.from_url(f"redis://{USERNAME}:{PASSWORD}@{IP}:{PORT}/1") for _ in redis.scan_iter("user:*"): pass - except AuthenticationError: - pass sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, login = logs diff --git a/tests/test_sip_server.py b/tests/test_sip_server.py index e31b940..111cb6d 100644 --- a/tests/test_sip_server.py +++ b/tests/test_sip_server.py @@ -1,13 +1,11 @@ from __future__ import annotations -from _socket import IPPROTO_UDP -from socket import AF_INET, SOCK_DGRAM, socket from time import sleep import pytest -from honeypots import QSIPServer -from .utils import IP, load_logs_from_file +from honeypots import QSIPServer +from .utils import connect_to, IP, load_logs_from_file PORT = "55060" EXPECTED_KEYS = ("action", "server", "src_ip", "src_port", "timestamp") @@ -25,25 +23,24 @@ def test_sip_server(server_logs): sleep(1) # give the server some time to start - sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) - sock.sendto( - "INVITE sip:user_1@test.test SIP/2.0\r\n" - f"To: {TO}\r\n" - f"From: {FROM}\r\n" - f"Call-ID: {CALL_ID}\r\n" - "CSeq: 1 INVITE\r\n" - f"Contact: {CONTACT}\r\n" - "Via: SIP/2.0/TCP 0.0.0.0;branch=34uiddhjczqw3mq23\r\n" - "Content-Length: 1\r\n\r\nT".encode(), - (IP, int(PORT)), - ) - sock.close() + with connect_to(IP, PORT, udp=True) as connection: + payload = ( + "INVITE sip:user_1@test.test SIP/2.0\r\n" + f"To: {TO}\r\n" + f"From: {FROM}\r\n" + f"Call-ID: {CALL_ID}\r\n" + "CSeq: 1 INVITE\r\n" + f"Contact: {CONTACT}\r\n" + "Via: SIP/2.0/TCP 0.0.0.0;branch=34uiddhjczqw3mq23\r\n" + "Content-Length: 1\r\n" + "\r\n" + "T" + ) + connection.send(payload.encode()) sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, request = logs diff --git a/tests/test_smb_server.py b/tests/test_smb_server.py index 43d0077..aa88062 100644 --- a/tests/test_smb_server.py +++ b/tests/test_smb_server.py @@ -31,9 +31,7 @@ def test_smb_server(server_logs): sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 3 for entry in logs: diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py index dbbfdbc..d3caf74 100644 --- a/tests/test_smtp_server.py +++ b/tests/test_smtp_server.py @@ -34,9 +34,7 @@ def test_smtp_server(server_logs): sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, login = logs diff --git a/tests/test_snmp_server.py b/tests/test_snmp_server.py index 24e8cee..2ad05c7 100644 --- a/tests/test_snmp_server.py +++ b/tests/test_snmp_server.py @@ -28,9 +28,7 @@ def test_snmp_server(server_logs): ) next(g) - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) >= 2 connect, query, *_ = logs diff --git a/tests/test_socks5_server.py b/tests/test_socks5_server.py index 95a3474..6398469 100644 --- a/tests/test_socks5_server.py +++ b/tests/test_socks5_server.py @@ -34,9 +34,7 @@ def test_socks5_server(server_logs): sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, login = logs diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py index cf0fb94..a1b965a 100644 --- a/tests/test_ssh_server.py +++ b/tests/test_ssh_server.py @@ -37,9 +37,7 @@ def test_ssh_server(server_logs): ssh.close() sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, login = logs diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py index 3fa11d1..f16ce28 100644 --- a/tests/test_telnet_server.py +++ b/tests/test_telnet_server.py @@ -32,9 +32,7 @@ def test_telnet_server(server_logs): sleep(1) # give the server process some time to write logs - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 2 connect, login = logs diff --git a/tests/test_vnc_server.py b/tests/test_vnc_server.py index 14f9324..834f40f 100644 --- a/tests/test_vnc_server.py +++ b/tests/test_vnc_server.py @@ -31,9 +31,7 @@ def test_vnc_server(server_logs): process.terminate() process.join(timeout=5) - log_files = [f for f in server_logs.iterdir()] - assert len(log_files) == 1 - logs = load_logs_from_file(log_files[0]) + logs = load_logs_from_file(server_logs) assert len(logs) == 1 assert_connect_is_logged(logs[0], PORT) diff --git a/tests/utils.py b/tests/utils.py index f7d93e6..7daf915 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,7 +1,10 @@ from __future__ import annotations import json +from _socket import IPPROTO_UDP +from contextlib import contextmanager from pathlib import Path +from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket IP = "127.0.0.1" USERNAME = "testing" @@ -9,15 +12,33 @@ EXPECTED_KEYS = ("action", "dest_ip", "dest_port", "server", "src_ip", "src_port", "timestamp") -def load_logs_from_file(file: Path) -> list[dict]: +def load_logs_from_file(log_folder: Path) -> list[dict]: + log_files = [f for f in log_folder.iterdir()] + assert len(log_files) == 1 + log_file = log_files[0] logs = [] - for line in file.read_text().splitlines(): + for line in log_file.read_text().splitlines(): if not line: continue logs.append(json.loads(line)) return logs +@contextmanager +def connect_to(host: str, port: str, udp: bool = False) -> socket: + client = None + try: + if udp: + client = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + else: + client = socket(AF_INET, SOCK_STREAM) + client.connect((host, int(port))) + yield client + finally: + if client: + client.close() + + def assert_connect_is_logged( connect: dict[str, str], port: str, expected_keys: list[str] | tuple[str, ...] = EXPECTED_KEYS ): From 681b5b8f1acd84fd42555fc6a1418453290d1d55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 7 Dec 2023 17:49:16 +0100 Subject: [PATCH 06/17] unit tests: added ntp, oracle, pjl, pop3, postgres and rdp --- tests/test_ntp_server.py | 46 ++++++++++++++++++++++++++++++ tests/test_oracle_server.py | 53 +++++++++++++++++++++++++++++++++++ tests/test_pjl_server.py | 46 ++++++++++++++++++++++++++++++ tests/test_pop3_server.py | 42 +++++++++++++++++++++++++++ tests/test_postgres_server.py | 40 ++++++++++++++++++++++++++ tests/test_rdp_server.py | 42 +++++++++++++++++++++++++++ 6 files changed, 269 insertions(+) create mode 100644 tests/test_ntp_server.py create mode 100644 tests/test_oracle_server.py create mode 100644 tests/test_pjl_server.py create mode 100644 tests/test_pop3_server.py create mode 100644 tests/test_postgres_server.py create mode 100644 tests/test_rdp_server.py diff --git a/tests/test_ntp_server.py b/tests/test_ntp_server.py new file mode 100644 index 0000000..3fbe003 --- /dev/null +++ b/tests/test_ntp_server.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import math +from struct import unpack +from time import sleep, time + +import pytest + +from honeypots import QNTPServer +from .utils import ( + connect_to, + IP, + load_logs_from_file, +) + +PORT = "50123" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QNTPServer, "port": PORT}], + indirect=True, +) +def test_ntp_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT, udp=True) as connection: + connection.send(b"\x1b" + 47 * b"\0") + data, _ = connection.recvfrom(256) + output_time = unpack("!12I", data)[10] - 2208988800 + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert all(k in connect for k in ["action", "server", "src_ip", "src_port", "timestamp"]) + assert connect["action"] == "connection" + assert connect["src_ip"] == IP + + assert query["action"] == "query" + assert query["status"] == "success" + assert query["data"] == {"mode": "3", "version": "3"} + + assert math.isclose(output_time, time(), abs_tol=10) diff --git a/tests/test_oracle_server.py b/tests/test_oracle_server.py new file mode 100644 index 0000000..ab9cdd1 --- /dev/null +++ b/tests/test_oracle_server.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QOracleServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, + USERNAME, +) + +PORT = "51521" +PROGRAM = "foo" +SERVICE = "bar" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QOracleServer, "port": PORT}], + indirect=True, +) +def test_oracle_server(server_logs): + sleep(1) # give the server some time to start + + payload = ( + "\x00\x00\x03\x04\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x01F\xb9\xd9@\x00@\x06\x81\xd6" + "\x7f\x00\x00\x01\x7f\x00\x00\x01\xbf\xce\x06\x13\xacW\xde\xc0Z\xb5\x0cI\x80\x18\x02\x00\xff:\x00\x00" + "\x01\x01\x08\n\x1bdZ^\x1bdZ^\x01\x12\x00\x00\x01\x00\x00\x00\x01>\x01,\x0cA \x00\xff\xff\x7f\x08\x00" + "\x00\x01\x00\x00\xc8\x00J\x00\x00\x14\x00AA\xa7C\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x01" + f"(DESCRIPTION=(CONNECT_DATA=(SERVICE_NAME={SERVICE})(CID=(PROGRAM={PROGRAM})(HOST=xxxxxxxxxxxxxx)" + f"(USER={USERNAME}))(CONNECTION_ID=xxxxxxxxxxxxxxxxxxxxxxxx))(ADDRESS=(PROTOCOL=tcp)(HOST={IP})(PORT={PORT})))" + ) + with connect_to(IP, PORT) as connection: + connection.send(payload.encode()) + response, _ = connection.recvfrom(10000) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + + assert login["action"] == "login" + assert login["data"] == {"local_user": USERNAME, "program": PROGRAM, "service_name": SERVICE} + + assert response == b"\x00\x08\x00\x00\x04\x00\x00\x00" diff --git a/tests/test_pjl_server.py b/tests/test_pjl_server.py new file mode 100644 index 0000000..e6b902f --- /dev/null +++ b/tests/test_pjl_server.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from socket import AF_INET, SOCK_STREAM, socket +from time import sleep + +import pytest + +from honeypots import QPJLServer +from .utils import ( + assert_connect_is_logged, + connect_to, IP, + load_logs_from_file, +) + +PORT = "59100" +SERVER_CONFIG = { + "honeypots": { + "pjl": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPJLServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_pjl_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.send(b'\x1b%-12345X@PJL prodinfo') + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert query["data"] == {"command": "@PJL prodinfo"} + assert query["status"] == "success" diff --git a/tests/test_pop3_server.py b/tests/test_pop3_server.py new file mode 100644 index 0000000..e3723d8 --- /dev/null +++ b/tests/test_pop3_server.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from contextlib import suppress +from poplib import error_proto, POP3 +from time import sleep + +import pytest +from honeypots import QPOP3Server + +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50110" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPOP3Server, "port": PORT}], + indirect=True, +) +def test_pop3_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(error_proto): + client = POP3(IP, int(PORT)) + client.user(USERNAME) + client.pass_(PASSWORD) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_postgres_server.py b/tests/test_postgres_server.py new file mode 100644 index 0000000..0e84791 --- /dev/null +++ b/tests/test_postgres_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pytest +from psycopg2 import connect, OperationalError + +from honeypots import QPostgresServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "55432" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QPostgresServer, "port": PORT}], + indirect=True, +) +def test_postgres_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(OperationalError): + db = connect(host=IP, port=PORT, user=USERNAME, password=PASSWORD) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect_, login = logs + assert_connect_is_logged(connect_, PORT) + assert_login_is_logged(login) diff --git a/tests/test_rdp_server.py b/tests/test_rdp_server.py new file mode 100644 index 0000000..471621a --- /dev/null +++ b/tests/test_rdp_server.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from contextlib import suppress +from socket import AF_INET, SOCK_STREAM, socket +from time import sleep + +import pytest + +from honeypots import QRDPServer +from .utils import ( + assert_connect_is_logged, + connect_to, IP, + load_logs_from_file, +) + +PORT = "53389" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QRDPServer, "port": PORT}], + indirect=True, +) +def test_rdp_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.send(b"test") + connection.send(b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00") + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, stshash = logs + assert_connect_is_logged(connect, PORT) + + assert stshash["action"] == "stshash" + assert stshash["mstshash"] == "success" + assert "stshash" in stshash["data"] + assert "foobar" in stshash["data"]["stshash"] From 188ee7d63896650d46d8bcb9a784254971ece1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 10:28:27 +0100 Subject: [PATCH 07/17] unit tests: added imap, ipp, irc, ldap, memcache, mssql and mysql --- tests/test_imap_server.py | 56 ++++++++++++++++++++++++++++++++ tests/test_ipp_server.py | 61 +++++++++++++++++++++++++++++++++++ tests/test_irc_server.py | 47 +++++++++++++++++++++++++++ tests/test_ldap_server.py | 52 +++++++++++++++++++++++++++++ tests/test_memcache_server.py | 40 +++++++++++++++++++++++ tests/test_mssql_server.py | 47 +++++++++++++++++++++++++++ tests/test_mysql_server.py | 48 +++++++++++++++++++++++++++ tests/utils.py | 14 ++++---- 8 files changed, 358 insertions(+), 7 deletions(-) create mode 100644 tests/test_imap_server.py create mode 100644 tests/test_ipp_server.py create mode 100644 tests/test_irc_server.py create mode 100644 tests/test_ldap_server.py create mode 100644 tests/test_memcache_server.py create mode 100644 tests/test_mssql_server.py create mode 100644 tests/test_mysql_server.py diff --git a/tests/test_imap_server.py b/tests/test_imap_server.py new file mode 100644 index 0000000..f0a74e4 --- /dev/null +++ b/tests/test_imap_server.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from contextlib import suppress +from imaplib import IMAP4 +from time import sleep + +import pytest + +from honeypots import QIMAPServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50143" +SERVER_CONFIG = { + "honeypots": { + "imap": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QIMAPServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_imap_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(IMAP4.error): + imap_test = IMAP4(IP, int(PORT)) + imap_test.login(USERNAME, PASSWORD) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 4 + connect, cmd1, cmd2, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) + + assert cmd1["action"] == "command" + assert cmd1["data"]["cmd"] == "CAPABILITY" + assert cmd1["data"]["data"] == "None" + + assert cmd2["action"] == "command" + assert cmd2["data"]["cmd"] == "LOGIN" + assert cmd2["data"]["data"] == 'testing "testing"' diff --git a/tests/test_ipp_server.py b/tests/test_ipp_server.py new file mode 100644 index 0000000..5481c91 --- /dev/null +++ b/tests/test_ipp_server.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from time import sleep + +import pytest +import requests + +from honeypots import QIPPServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, +) + +PORT = "50631" +SERVER_CONFIG = { + "honeypots": { + "ipp": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QIPPServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_ipp_server(server_logs): + sleep(1) # give the server some time to start + + body = ( + b"\x02\x00\x00\x0b\x00\x01/p\x01G\x00\x12attributes-charset\x00\x05utf-8H\x00\x1b" + b"attributes-natural-language\x00\x02enE\x00\x0bprinter-uri\x00\x15" + b"ipp://127.0.0.1:631/D\x00\x14requested-attributes\x00\x03allD\x00\x00\x00\x12media-col-database\x03" + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Content-Length": f"{len(body)}", + "Host": f"{IP}:{PORT}", + "Connection": "close", + } + requests.post(f"http://{IP}:{PORT}/", data=body, headers=headers) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert query["data"] == { + "request": ( + "VERSION 2.0|REQUEST 0x12f70|OPERATION Get-Printer-Attributes|GROUP operation-attributes-tag|" + "ATTR attributes-charset utf-8|ATTR attributes-natural-language en|ATTR printer-uri ipp://127.0.0.1:631/D" + ) + } diff --git a/tests/test_irc_server.py b/tests/test_irc_server.py new file mode 100644 index 0000000..c7afa8f --- /dev/null +++ b/tests/test_irc_server.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QIRCServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, + PASSWORD, +) + +PORT = "56667" +SERVER_CONFIG = { + "honeypots": { + "irc": { + "options": ["capture_commands"], + }, + } +} + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QIRCServer, "port": PORT, "custom_config": SERVER_CONFIG}], + indirect=True, +) +def test_irc_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.setblocking(False) + connection.send(f"PASS {PASSWORD}\n".encode()) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, command = logs + assert_connect_is_logged(connect, PORT) + + assert command["action"] == "command" + assert command["data"] == {"command": "PASS", "params": "['testing']", "prefix": ""} diff --git a/tests/test_ldap_server.py b/tests/test_ldap_server.py new file mode 100644 index 0000000..a83f47e --- /dev/null +++ b/tests/test_ldap_server.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pytest +from ldap3 import ALL, Connection, Server +from ldap3.core.exceptions import LDAPInsufficientAccessRightsResult + +from honeypots import QLDAPServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "50389" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QLDAPServer, "port": PORT}], + indirect=True, +) +def test_ldap_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(LDAPInsufficientAccessRightsResult): + connection = Connection( + Server(IP, port=int(PORT), get_info=ALL), + authentication="SIMPLE", + user=USERNAME, + password=PASSWORD, + check_names=True, + lazy=False, + client_strategy="SYNC", + raise_exceptions=True, + ) + connection.open() + connection.bind() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_memcache_server.py b/tests/test_memcache_server.py new file mode 100644 index 0000000..688a850 --- /dev/null +++ b/tests/test_memcache_server.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QMemcacheServer +from .utils import ( + assert_connect_is_logged, + connect_to, + IP, + load_logs_from_file, +) + +PORT = "61211" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QMemcacheServer, "port": PORT}], + indirect=True, +) +def test_memcache_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT) as connection: + connection.send(b"stats\r\n") + data, _ = connection.recvfrom(10000) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, stats = logs + assert_connect_is_logged(connect, PORT) + + assert stats["action"] == "stats" + + assert b"STAT libevent 2.1.8-stable" in data diff --git a/tests/test_mssql_server.py b/tests/test_mssql_server.py new file mode 100644 index 0000000..c1a2a53 --- /dev/null +++ b/tests/test_mssql_server.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import pymssql +import pytest + +from honeypots import QMSSQLServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "51433" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QMSSQLServer, "port": PORT}], + indirect=True, +) +def test_mssql_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(pymssql.OperationalError): + connection = pymssql.connect( + host=IP, + port=str(PORT), + user=USERNAME, + password=PASSWORD, + database="dbname", + ) + connection.close() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/test_mysql_server.py b/tests/test_mysql_server.py new file mode 100644 index 0000000..9736add --- /dev/null +++ b/tests/test_mysql_server.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from contextlib import suppress +from time import sleep + +import mysql.connector +import pytest + +from honeypots import QMysqlServer +from .utils import ( + assert_connect_is_logged, + assert_login_is_logged, + IP, + load_logs_from_file, + PASSWORD, + USERNAME, +) + +PORT = "53306" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QMysqlServer, "port": PORT}], + indirect=True, +) +def test_mysql_server(server_logs): + sleep(1) # give the server some time to start + + with suppress(mysql.connector.errors.OperationalError): + connection = mysql.connector.connect( + user=USERNAME, + password=PASSWORD, + host=IP, + port=PORT, + database="test", + connect_timeout=1000, + ) + connection.close() + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, login = logs + assert_connect_is_logged(connect, PORT) + assert_login_is_logged(login) diff --git a/tests/utils.py b/tests/utils.py index 7daf915..31469c3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -26,17 +26,17 @@ def load_logs_from_file(log_folder: Path) -> list[dict]: @contextmanager def connect_to(host: str, port: str, udp: bool = False) -> socket: - client = None + connection = None try: if udp: - client = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + connection = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) else: - client = socket(AF_INET, SOCK_STREAM) - client.connect((host, int(port))) - yield client + connection = socket(AF_INET, SOCK_STREAM) + connection.connect((host, int(port))) + yield connection finally: - if client: - client.close() + if connection: + connection.close() def assert_connect_is_logged( From 375190f3c907e6bdfc97a394aac77acd935b1cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 10:40:13 +0100 Subject: [PATCH 08/17] unit tests: added github action to run the tests --- .github/workflows/tests.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..30274dd --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,30 @@ +name: Tests CI +run-name: Tests CI +on: + pull_request: + branches: + - master + push: + branches: + - master + +jobs: + tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Display Python version + run: python -c "import sys; print(sys.version)" + - name: Install dependencies + run: python -m pip install --upgrade pip setuptools wheel + - name: Installation + run: python -m pip install . + - name: Unit Tests + run: pytest -v ./tests From 623075eab960daa2f0d9f9aec9232bd17a866e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 10:42:47 +0100 Subject: [PATCH 09/17] unit tests: added option test action to trigger manually --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 30274dd..a5e768f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,6 +7,7 @@ on: push: branches: - master + workflow_dispatch: jobs: tests: From 862576acd2948638d4b549762e8f46a230abd64d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 12:01:38 +0100 Subject: [PATCH 10/17] tests action: replace outdated pycrypto dep and install dev deps --- .github/workflows/tests.yml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a5e768f..d051fc5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -26,6 +26,6 @@ jobs: - name: Install dependencies run: python -m pip install --upgrade pip setuptools wheel - name: Installation - run: python -m pip install . + run: python -m pip install ".[test]" - name: Unit Tests run: pytest -v ./tests diff --git a/setup.py b/setup.py index e6477c1..4bd7faf 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ "twisted==21.7.0", "psutil==5.9.0", "psycopg2-binary==2.9.3", - "pycrypto==2.6.1", + "pycryptodome==3.19.0", "requests==2.28.2", "requests[socks]==2.28.2", "impacket==0.9.24", From 23661a21d1cce2a175172e37bedb5e17513fc432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 13:12:48 +0100 Subject: [PATCH 11/17] tests action: don't fail fast and give ssh server time to start --- .github/workflows/tests.yml | 1 + tests/test_ssh_server.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d051fc5..326f8a6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,6 +15,7 @@ jobs: strategy: matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] + fail-fast: false steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py index a1b965a..0606247 100644 --- a/tests/test_ssh_server.py +++ b/tests/test_ssh_server.py @@ -31,10 +31,13 @@ indirect=True, ) def test_ssh_server(server_logs): + sleep(1) # give the server some time to start + ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) ssh.connect(IP, port=PORT, username=USERNAME, password=PASSWORD) ssh.close() + sleep(1) # give the server process some time to write logs logs = load_logs_from_file(server_logs) From 30e39f2f668b5305aba9078fdc33b6b6171c2fbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 13:13:49 +0100 Subject: [PATCH 12/17] tests action: main branch name fix --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 326f8a6..b84285a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,10 +3,10 @@ run-name: Tests CI on: pull_request: branches: - - master + - main push: branches: - - master + - main workflow_dispatch: jobs: From 0b2ff6c14b4264768fe945e8324fbab682989e2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 14:17:54 +0100 Subject: [PATCH 13/17] https server: client IP bug fix --- honeypots/https_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/honeypots/https_server.py b/honeypots/https_server.py index 05c3a72..30b4d85 100644 --- a/honeypots/https_server.py +++ b/honeypots/https_server.py @@ -177,7 +177,7 @@ def check_bytes(string): request.responseHeaders.addRawHeader('Server', _q_s.mocking_server) if request.method == b'GET' or request.method == b'POST': - _q_s.logs.info({'server': 'https_server', 'action': request.method.decode(), 'src_ip': request.client_ip, 'src_port': request.getClientAddress().port, 'dest_ip': _q_s.ip, 'dest_port': _q_s.port}) + _q_s.logs.info({'server': 'https_server', 'action': request.method.decode(), 'src_ip': client_ip, 'src_port': request.getClientAddress().port, 'dest_ip': _q_s.ip, 'dest_port': _q_s.port}) if request.method == b'GET': if request.uri == b'/login.html': From e49cfe55bbc6974fdf10fbb0c7f9a34e40ecd525 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 15:25:33 +0100 Subject: [PATCH 14/17] dhcp server: too short query fix --- honeypots/dhcp_server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/honeypots/dhcp_server.py b/honeypots/dhcp_server.py index 759d8d9..89d0fdb 100644 --- a/honeypots/dhcp_server.py +++ b/honeypots/dhcp_server.py @@ -9,22 +9,19 @@ // contributors list qeeqbox/honeypots/graphs/contributors // ------------------------------------------------------------- ''' - from warnings import filterwarnings filterwarnings(action='ignore', module='.*OpenSSL.*') filterwarnings(action='ignore', module='.*socket.*') from twisted.internet.protocol import DatagramProtocol from twisted.internet import reactor -from time import time from twisted.python import log as tlog -from struct import unpack +from struct import unpack, error as StructError from socket import inet_aton from subprocess import Popen from os import path, getenv from honeypots.helper import close_port_wrapper, get_free_port, kill_server_wrapper, server_arguments, setup_logger, disable_logger, set_local_vars, check_if_server_is_running from uuid import uuid4 -from contextlib import suppress class QDHCPServer(): @@ -95,7 +92,10 @@ def parse_options(self, raw): return options def datagramReceived(self, data, addr): - mac_address = unpack('!28x6s', data[:34])[0].hex(':') + try: + mac_address = unpack('!28x6s', data[:34])[0].hex(':') + except StructError: + mac_address = "None" data = self.parse_options(data[240:]) data.update({'mac_address': mac_address}) _q_s.logs.info({'server': 'dhcp_server', 'action': 'query', 'status': 'success', 'src_ip': addr[0], 'src_port': addr[1], 'dest_ip': _q_s.ip, 'dest_port': _q_s.port, 'data': data}) From e21cf84f674eaec439552729a5ce096d43d77b20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 15:26:27 +0100 Subject: [PATCH 15/17] unit tests: added dhcp, dns, elastic, ftp, http proxy, http and https --- setup.py | 1 + tests/test_dhcp_server.py | 38 +++++++++++++++++++++++ tests/test_dns_server.py | 44 +++++++++++++++++++++++++++ tests/test_elastic_server.py | 45 +++++++++++++++++++++++++++ tests/test_ftp_server.py | 53 ++++++++++++++++++++++++++++++++ tests/test_http_proxy_server.py | 44 +++++++++++++++++++++++++++ tests/test_http_server.py | 54 +++++++++++++++++++++++++++++++++ tests/test_https_server.py | 54 +++++++++++++++++++++++++++++++++ tests/utils.py | 3 +- 9 files changed, 334 insertions(+), 2 deletions(-) create mode 100644 tests/test_dhcp_server.py create mode 100644 tests/test_dns_server.py create mode 100644 tests/test_elastic_server.py create mode 100644 tests/test_ftp_server.py create mode 100644 tests/test_http_proxy_server.py create mode 100644 tests/test_http_server.py create mode 100644 tests/test_https_server.py diff --git a/setup.py b/setup.py index 4bd7faf..4d7afde 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ "test": [ "redis", "mysql-connector", + "dnspython==2.4.2", "elasticsearch", "pymssql", "ldap3", diff --git a/tests/test_dhcp_server.py b/tests/test_dhcp_server.py new file mode 100644 index 0000000..c43511f --- /dev/null +++ b/tests/test_dhcp_server.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from time import sleep + +import pytest + +from honeypots import QDHCPServer +from .utils import ( + connect_to, + EXPECTED_KEYS, + IP, + load_logs_from_file, +) + +PORT = "50067" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QDHCPServer, "port": PORT}], + indirect=True, +) +def test_dhcp_server(server_logs): + sleep(1) # give the server some time to start + + with connect_to(IP, PORT, udp=True) as connection: + connection.send(b"\x03" * 240) + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 1 + (query,) = logs + assert all(k in query for k in EXPECTED_KEYS) + assert query["action"] == "query" + assert query["status"] == "success" + assert query["data"] == {"mac_address": "03:03:03:03:03:03"} diff --git a/tests/test_dns_server.py b/tests/test_dns_server.py new file mode 100644 index 0000000..6ecde93 --- /dev/null +++ b/tests/test_dns_server.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from time import sleep + +import pytest +from dns.resolver import Resolver + +from honeypots import QDNSServer +from .utils import ( + assert_connect_is_logged, + IP, + load_logs_from_file, +) + +PORT = "50053" + + +@pytest.mark.parametrize( + "server_logs", + [{"server": QDNSServer, "port": PORT}], + indirect=True, +) +def test_dns_server(server_logs): + sleep(1) # give the server some time to start + + resolver = Resolver(configure=False) + resolver.nameservers = [IP] + resolver.port = int(PORT) + domain = "example.org" + response = resolver.resolve(domain, "a") + + sleep(1) # give the server process some time to write logs + + logs = load_logs_from_file(server_logs) + + assert len(logs) == 2 + connect, query = logs + assert_connect_is_logged(connect, PORT) + + assert query["action"] == "query" + assert "data" in query + assert " Date: Fri, 8 Dec 2023 15:56:35 +0100 Subject: [PATCH 16/17] unit tests: refactoring --- tests/test_ftp_server.py | 5 +++-- tests/test_http_server.py | 3 +-- tests/test_https_server.py | 3 +-- tests/test_ipp_server.py | 1 - tests/test_pjl_server.py | 6 +++--- tests/test_pop3_server.py | 2 +- tests/test_rdp_server.py | 9 +++++---- tests/test_redis_server.py | 2 +- tests/test_smb_server.py | 2 +- tests/test_smtp_server.py | 4 ++-- tests/test_socks5_server.py | 5 ++--- tests/test_telnet_server.py | 4 ++-- 12 files changed, 22 insertions(+), 24 deletions(-) diff --git a/tests/test_ftp_server.py b/tests/test_ftp_server.py index 2e62d1c..afeb808 100644 --- a/tests/test_ftp_server.py +++ b/tests/test_ftp_server.py @@ -8,7 +8,8 @@ from honeypots import QFTPServer from .utils import ( assert_connect_is_logged, - assert_login_is_logged, IP, + assert_login_is_logged, + IP, load_logs_from_file, PASSWORD, USERNAME, @@ -29,7 +30,7 @@ [{"server": QFTPServer, "port": PORT, "custom_config": SERVER_CONFIG}], indirect=True, ) -def test_http_proxy_server(server_logs): +def test_ftp_server(server_logs): sleep(1) # give the server some time to start client = FTP() diff --git a/tests/test_http_server.py b/tests/test_http_server.py index 6ee57a2..1064248 100644 --- a/tests/test_http_server.py +++ b/tests/test_http_server.py @@ -6,7 +6,6 @@ import requests from honeypots import QHTTPServer - from .utils import ( assert_connect_is_logged, assert_login_is_logged, @@ -35,7 +34,7 @@ def test_http_server(server_logs): sleep(1) # give the server some time to start url = f"http://{IP}:{PORT}" - data = {'username': USERNAME, 'password': PASSWORD} + data = {"username": USERNAME, "password": PASSWORD} requests.post(f"{url}/login.html", verify=False, data=data) sleep(1) # give the server process some time to write logs diff --git a/tests/test_https_server.py b/tests/test_https_server.py index a20a17b..18a97ba 100644 --- a/tests/test_https_server.py +++ b/tests/test_https_server.py @@ -6,7 +6,6 @@ import requests from honeypots import QHTTPSServer - from .utils import ( assert_connect_is_logged, assert_login_is_logged, @@ -35,7 +34,7 @@ def test_https_server(server_logs): sleep(1) # give the server some time to start url = f"https://{IP}:{PORT}" - data = {'username': USERNAME, 'password': PASSWORD} + data = {"username": USERNAME, "password": PASSWORD} requests.post(f"{url}/login.html", verify=False, data=data) sleep(1) # give the server process some time to write logs diff --git a/tests/test_ipp_server.py b/tests/test_ipp_server.py index 5481c91..b50ce16 100644 --- a/tests/test_ipp_server.py +++ b/tests/test_ipp_server.py @@ -8,7 +8,6 @@ from honeypots import QIPPServer from .utils import ( assert_connect_is_logged, - connect_to, IP, load_logs_from_file, ) diff --git a/tests/test_pjl_server.py b/tests/test_pjl_server.py index e6b902f..7940060 100644 --- a/tests/test_pjl_server.py +++ b/tests/test_pjl_server.py @@ -1,6 +1,5 @@ from __future__ import annotations -from socket import AF_INET, SOCK_STREAM, socket from time import sleep import pytest @@ -8,7 +7,8 @@ from honeypots import QPJLServer from .utils import ( assert_connect_is_logged, - connect_to, IP, + connect_to, + IP, load_logs_from_file, ) @@ -31,7 +31,7 @@ def test_pjl_server(server_logs): sleep(1) # give the server some time to start with connect_to(IP, PORT) as connection: - connection.send(b'\x1b%-12345X@PJL prodinfo') + connection.send(b"\x1b%-12345X@PJL prodinfo") sleep(1) # give the server process some time to write logs diff --git a/tests/test_pop3_server.py b/tests/test_pop3_server.py index e3723d8..d9532e6 100644 --- a/tests/test_pop3_server.py +++ b/tests/test_pop3_server.py @@ -5,8 +5,8 @@ from time import sleep import pytest -from honeypots import QPOP3Server +from honeypots import QPOP3Server from .utils import ( assert_connect_is_logged, assert_login_is_logged, diff --git a/tests/test_rdp_server.py b/tests/test_rdp_server.py index 471621a..774c878 100644 --- a/tests/test_rdp_server.py +++ b/tests/test_rdp_server.py @@ -1,7 +1,5 @@ from __future__ import annotations -from contextlib import suppress -from socket import AF_INET, SOCK_STREAM, socket from time import sleep import pytest @@ -9,7 +7,8 @@ from honeypots import QRDPServer from .utils import ( assert_connect_is_logged, - connect_to, IP, + connect_to, + IP, load_logs_from_file, ) @@ -26,7 +25,9 @@ def test_rdp_server(server_logs): with connect_to(IP, PORT) as connection: connection.send(b"test") - connection.send(b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00") + connection.send( + b"\x03\x00\x00*%\xe0\x00\x00\x00\x00\x00Cookie: mstshash=foobar\r\n\x01\x00\x08\x00\x03\x00\x00" + ) sleep(1) # give the server process some time to write logs diff --git a/tests/test_redis_server.py b/tests/test_redis_server.py index 128a8f9..4ab3294 100644 --- a/tests/test_redis_server.py +++ b/tests/test_redis_server.py @@ -4,9 +4,9 @@ from time import sleep import pytest -from honeypots import QRedisServer from redis import AuthenticationError, StrictRedis +from honeypots import QRedisServer from .utils import ( assert_connect_is_logged, assert_login_is_logged, diff --git a/tests/test_smb_server.py b/tests/test_smb_server.py index aa88062..883871e 100644 --- a/tests/test_smb_server.py +++ b/tests/test_smb_server.py @@ -1,9 +1,9 @@ from __future__ import annotations -from impacket.smbconnection import SMBConnection from time import sleep import pytest +from impacket.smbconnection import SMBConnection from honeypots import QSMBServer from .utils import ( diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py index d3caf74..4dbe58c 100644 --- a/tests/test_smtp_server.py +++ b/tests/test_smtp_server.py @@ -4,8 +4,8 @@ from time import sleep import pytest -from honeypots import QSMTPServer +from honeypots import QSMTPServer from .utils import ( assert_connect_is_logged, assert_login_is_logged, @@ -29,7 +29,7 @@ def test_smtp_server(server_logs): client = SMTP(IP, int(PORT)) client.ehlo() client.login(USERNAME, PASSWORD) - client.sendmail('fromtest', 'totest', 'Nothing') + client.sendmail("fromtest", "totest", "Nothing") client.quit() sleep(1) # give the server process some time to write logs diff --git a/tests/test_socks5_server.py b/tests/test_socks5_server.py index 6398469..b8de9a9 100644 --- a/tests/test_socks5_server.py +++ b/tests/test_socks5_server.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import suppress from time import sleep import pytest @@ -24,13 +25,11 @@ indirect=True, ) def test_socks5_server(server_logs): - try: + with suppress(requests.exceptions.ConnectionError): requests.get( "http://127.0.0.1/", proxies={"http": f"socks5://{USERNAME}:{PASSWORD}@{IP}:{PORT}"}, ) - except requests.exceptions.ConnectionError: - pass sleep(1) # give the server process some time to write logs diff --git a/tests/test_telnet_server.py b/tests/test_telnet_server.py index f16ce28..a21d339 100644 --- a/tests/test_telnet_server.py +++ b/tests/test_telnet_server.py @@ -1,11 +1,11 @@ from __future__ import annotations +from telnetlib import Telnet from time import sleep import pytest -from honeypots import QTelnetServer -from telnetlib import Telnet +from honeypots import QTelnetServer from .utils import ( assert_connect_is_logged, assert_login_is_logged, From 156c8e1504f6c433f4c4da914175bd716f113fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 8 Dec 2023 16:31:03 +0100 Subject: [PATCH 17/17] unit tests: smtp and pop3: test for capture_commands --- tests/test_pop3_server.py | 18 +++++++++++++++--- tests/test_smtp_server.py | 27 ++++++++++++++++++++++++--- tests/test_ssh_server.py | 7 ------- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/tests/test_pop3_server.py b/tests/test_pop3_server.py index d9532e6..220a02c 100644 --- a/tests/test_pop3_server.py +++ b/tests/test_pop3_server.py @@ -17,11 +17,18 @@ ) PORT = "50110" +SERVER_CONFIG = { + "honeypots": { + "pop3": { + "options": ["capture_commands"], + }, + } +} @pytest.mark.parametrize( "server_logs", - [{"server": QPOP3Server, "port": PORT}], + [{"server": QPOP3Server, "port": PORT, "custom_config": SERVER_CONFIG}], indirect=True, ) def test_pop3_server(server_logs): @@ -36,7 +43,12 @@ def test_pop3_server(server_logs): logs = load_logs_from_file(server_logs) - assert len(logs) == 2 - connect, login = logs + assert len(logs) == 4 + connect, cmd1, cmd2, login = logs assert_connect_is_logged(connect, PORT) assert_login_is_logged(login) + + assert cmd1["action"] == "command" + assert cmd1["data"] == {"args": "testing", "cmd": "USER"} + assert cmd2["action"] == "command" + assert cmd2["data"] == {"args": "testing", "cmd": "PASS"} diff --git a/tests/test_smtp_server.py b/tests/test_smtp_server.py index 4dbe58c..ed041c0 100644 --- a/tests/test_smtp_server.py +++ b/tests/test_smtp_server.py @@ -1,5 +1,6 @@ from __future__ import annotations +from base64 import b64decode from smtplib import SMTP from time import sleep @@ -16,11 +17,25 @@ ) PORT = "50025" +SERVER_CONFIG = { + "honeypots": { + "smtp": { + "options": ["capture_commands"], + }, + } +} +EXPECTED_DATA = [ + {"arg": "FROM:", "command": "MAIL", "data": "None"}, + {"arg": "TO:", "command": "RCPT", "data": "None"}, + {"arg": "None", "command": "DATA", "data": "None"}, + {"arg": "None", "command": "NOTHING", "data": "None"}, + {"arg": "None", "command": "QUIT", "data": "None"}, +] @pytest.mark.parametrize( "server_logs", - [{"server": QSMTPServer, "port": PORT}], + [{"server": QSMTPServer, "port": PORT, "custom_config": SERVER_CONFIG}], indirect=True, ) def test_smtp_server(server_logs): @@ -36,7 +51,13 @@ def test_smtp_server(server_logs): logs = load_logs_from_file(server_logs) - assert len(logs) == 2 - connect, login = logs + assert len(logs) == 8 + connect, auth, login, *additional = logs assert_connect_is_logged(connect, PORT) assert_login_is_logged(login) + + assert auth["data"]["command"] == "AUTH" + assert b64decode(auth["data"]["data"]).decode() == f"\x00{USERNAME}\x00{PASSWORD}" + + for entry, expected in zip(additional, EXPECTED_DATA): + assert entry diff --git a/tests/test_ssh_server.py b/tests/test_ssh_server.py index 0606247..5f32cfc 100644 --- a/tests/test_ssh_server.py +++ b/tests/test_ssh_server.py @@ -12,14 +12,7 @@ SERVER_CONFIG = { "honeypots": { "ssh": { - "backup_count": 10, - "ip": IP, - "log_file_name": "ssh.jsonl", - "max_bytes": 10000, "options": ["capture_commands"], - "password": PASSWORD, - "port": str(PORT), - "username": USERNAME, }, } }