Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extended dicom server to accept queries and return files #26

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions honeypots/dicom_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING
from unittest.mock import patch

from pydicom import dcmread
from pydicom.filewriter import write_file_meta_info
from pynetdicom import (
ae,
Expand All @@ -25,6 +26,7 @@
RelevantPatientInformationPresentationContexts,
VerificationPresentationContexts,
QueryRetrievePresentationContexts,
tests,
)
from pynetdicom.dul import DULServiceProvider

Expand All @@ -41,6 +43,7 @@
"EVT_SOP_EXTENDED",
"EVT_SOP_COMMON",
}
GET_REQUEST_DS = dcmread(Path(tests.__file__).parent / "dicom_files" / "CTImageStorage.dcm")


class UserIdType(Enum):
Expand All @@ -52,7 +55,9 @@ class UserIdType(Enum):


SUCCESS = 0x0000
CANNOT_UNDERSTAND = 0xC000
FAILURE = 0xC000
CANCEL = 0xFE00
PENDING = 0xFF00


class QDicomServer(BaseServer):
Expand Down Expand Up @@ -80,12 +85,14 @@ def _send(self, pdu) -> None:
def _decode_pdu(self, bytestream: bytearray):
pdu, event = super()._decode_pdu(bytestream)
try:
_q_s.log(
{
"action": type(pdu).__name__.replace("_", "-"),
"data": _dicom_obj_to_dict(pdu),
}
)
pdu_type = type(pdu).__name__.replace("_", "-")
if pdu_type != "P-DATA-TF":
_q_s.log(
{
"action": pdu_type,
"data": _dicom_obj_to_dict(pdu),
}
)
except Exception as error:
_q_s.logger.debug(f"Error while decoding PDU: {error}")
return pdu, event
Expand All @@ -111,7 +118,7 @@ def process_request(
super().process_request(request, client_address)

def handle_store(event: evt.Event) -> int:
handle_event(event) # for logging
_log_event(event)
try:
output_file = _q_s.storage_dir / event.request.AffectedSOPInstanceUID
with output_file.open("wb") as fp:
Expand All @@ -129,10 +136,45 @@ def handle_store(event: evt.Event) -> int:
return SUCCESS
except Exception as error:
_q_s.logger.critical(f"Exception occurred during store event: {error}")
return CANNOT_UNDERSTAND
return FAILURE

def handle_get(event):
# C-GET event
# see docs: https://pydicom.github.io/pynetdicom/stable/reference/generated/pynetdicom._handlers.doc_handle_c_get.html#pynetdicom._handlers.doc_handle_c_get
dataset = event.identifier
log_data = {
key: getattr(dataset, key, None)
for key in (
"QueryRetrieveLevel",
"PatientID",
"StudyInstanceUID",
"SeriesInstanceUID",
)
}
_log_event(event, log_data)

if "QueryRetrieveLevel" not in dataset:
# if this is a valid GET request, there should be a retrieve level
yield FAILURE, None
return

# we simply always return the same demo dataset instead of
# checking if anything actually matched the IDs in the request
instances = [GET_REQUEST_DS, GET_REQUEST_DS]

# first yield the number of operations
total = len(instances)
yield total

# then yield the "matching" instance
for instance in instances:
if event.is_cancelled:
yield CANCEL, None
return
yield PENDING, instance

def handle_login(event: evt.Event) -> tuple[bool, bytes | None]:
# EVT_USER_ID event
# USER-ID event
# see https://pydicom.github.io/pynetdicom/stable/reference/generated/pynetdicom._handlers.doc_handle_userid.html
user_id_type = UserIdType(event.user_id_type)
if user_id_type == UserIdType.username_and_passcode:
Expand Down Expand Up @@ -178,12 +220,14 @@ def _log_id_event(data_type: str, event: evt.Event):

def handle_event(event: evt.Event, *_):
# generic event handler
_log_event(event)
return SUCCESS

def _log_event(event, additional_data: dict | None = None):
additional_data = additional_data or {}
try:
data = {
"description": event.event.description,
}
if hasattr(event, "context"):
data.update(
additional_data.update(
{
"abstract_syntax": event.context.abstract_syntax,
"transfer_syntax": event.context.transfer_syntax,
Expand All @@ -194,16 +238,21 @@ def handle_event(event: evt.Event, *_):
"action": event.event.name.replace("EVT_", "").replace("_", "-"),
"src_ip": event.assoc.requestor.address,
"src_port": event.assoc.requestor.port,
"data": data,
"data": {
"description": event.event.description,
**additional_data,
},
}
)
except Exception as error:
_q_s.logger.debug(f"exception during event logging: {error}")
return SUCCESS

special_handlers = {"EVT_USER_ID": handle_login}
special_handlers = {
evt.EVT_USER_ID.name: handle_login,
evt.EVT_C_GET.name: handle_get,
}
if _q_s.store_images:
special_handlers["EVT_C_STORE"] = handle_store
special_handlers[evt.EVT_C_STORE.name] = handle_store
handlers = [
(event_, special_handlers.get(event_.name, handle_event))
for event_ in evt._INTERVENTION_EVENTS
Expand All @@ -224,8 +273,13 @@ def handle_event(event: evt.Event, *_):

for context in app_entity.supported_contexts:
# only play the server role, not the client
context.scp_role = True
context.scu_role = False
if context in AllStoragePresentationContexts:
# except when presenting things (get request) then the server is also the SCU
context.scp_role = True
context.scu_role = True
else:
context.scp_role = True
context.scu_role = False

with patch("pynetdicom.association.DULServiceProvider", CustomDUL), patch(
"pynetdicom.ae.AssociationServer", CustomAssociationServer
Expand Down
76 changes: 66 additions & 10 deletions tests/test_dicom_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from multiprocessing import Event
from pathlib import Path
from shlex import split
from subprocess import run
Expand All @@ -8,11 +9,15 @@
import pytest
from pydicom import dcmread, Dataset
from pydicom.uid import CTImageStorage
from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelFind
from pynetdicom.sop_class import (
PatientRootQueryRetrieveInformationModelFind,
PatientRootQueryRetrieveInformationModelGet,
)
from pynetdicom.pdu_primitives import UserIdentityNegotiation
from pynetdicom import AE, tests
from pynetdicom import AE, tests, build_role, evt

from honeypots import QDicomServer
from honeypots.dicom_server import SUCCESS
from .utils import (
assert_connect_is_logged,
assert_login_is_logged,
Expand Down Expand Up @@ -41,14 +46,13 @@ def test_dicom_echo(server_logs):

logs = load_logs_from_file(server_logs)

assert len(logs) == 5
assert len(logs) == 4
connect, *events = logs
assert_connect_is_logged(connect, PORT)

assert events[0]["action"] == "A-ASSOCIATE-RQ"
assert events[1]["action"] == "P-DATA-TF"
assert events[2]["action"] == "C-ECHO"
assert events[3]["action"] == "A-RELEASE-RQ"
assert events[1]["action"] == "C-ECHO"
assert events[2]["action"] == "A-RELEASE-RQ"


@pytest.mark.parametrize(
Expand Down Expand Up @@ -80,10 +84,10 @@ def test_login(server_logs):
assert release["action"] == "A-RELEASE-RQ"


def _retry_association(ae: AE, port: int, ext_neg=None):
def _retry_association(ae: AE, port: int, ext_neg=None, handlers=None):
for _ in range(RETRIES):
# this is somehow a bit flaky so we retry here
association = ae.associate(IP, port, ext_neg=ext_neg)
association = ae.associate(IP, port, ext_neg=ext_neg, evt_handlers=handlers)
if association.is_established:
return association
sleep(1)
Expand Down Expand Up @@ -116,10 +120,62 @@ def test_store(server_logs):
association.release()

assert isinstance(response, Dataset)
assert response.Status == 0x0000 # success
assert response.Status == SUCCESS

logs = load_logs_from_file(server_logs)
assert len(logs) > 1
logs_by_action = {entry["action"]: entry for entry in logs}
logs_by_action = _get_logs_by_action(logs)
assert "store_image" in logs_by_action
assert logs_by_action["store_image"]["data"]["size"] == "39102"


def _get_logs_by_action(logs):
return {entry["action"]: entry for entry in logs}


_event = Event()


def _handle_store(event):
dataset = event.dataset
dataset.file_meta = event.file_meta
assert dataset.PatientID == "1CT1"
_event.set()
return SUCCESS


@pytest.mark.parametrize(
"server_logs",
[{"server": QDicomServer, "port": PORT + 3}],
indirect=True,
)
def test_get(server_logs):
handlers = [(evt.EVT_C_STORE, _handle_store)]

ae = AE()
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
ae.add_requested_context(CTImageStorage)
# the server sends us back the requested images so we become the SCP here
role = build_role(CTImageStorage, scp_role=True)

ds = Dataset()
ds.QueryRetrieveLevel = "SERIES"
ds.PatientID = "1234567"
ds.StudyInstanceUID = "1.2.3"
ds.SeriesInstanceUID = "1.2.3.4"

with wait_for_server(PORT + 3):
association = _retry_association(ae, PORT + 3, ext_neg=[role], handlers=handlers)
responses = list(association.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet))
_event.wait(timeout=2)
association.release()

logs = load_logs_from_file(server_logs)

assert len(logs) > 3
logs_by_action = _get_logs_by_action(logs)
assert "C-GET" in logs_by_action
assert logs_by_action["C-GET"]["data"]["PatientID"] == ds.PatientID

assert len(responses) == 3
assert _event.is_set()
Loading