-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmonothreaded-example.py
172 lines (161 loc) · 7.05 KB
/
monothreaded-example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""
HLI-toDICOM application entrypoint. Contains the business logic to iterate through all the instances of the DICOM study to be exported.
SPDX-License-Identifier: MIT-0
"""
import getopt
import pydicom
from pydicom.sequence import Sequence
from pydicom import Dataset , DataElement
from pydicom.dataset import FileDataset, FileMetaDataset
from pydicom.uid import UID
from pydicom.pixel_data_handlers.util import convert_color_space , apply_color_lut
import json
import logging
import boto3
from openjpeg import decode
import io
import sys
import time
import os
from PIL import Image
import gzip
import base64
logging.basicConfig( level="INFO" )
ThreadCount = 10
HLIFrameFetcherThreadList = []
def main():
datastoreId=None
studyId=None
ThreadCount = 20
argumentList = sys.argv[1:]
short_options = "d:s:t:"
long_options = ["datastoreId", "studyId", "series"]
try:
arguments, values = getopt.getopt(argumentList, short_options, long_options)
# checking each argument
for currentArgument, currentValue in arguments:
if currentArgument in ("-s", "--studyId"):
studyId=currentValue
elif currentArgument in ("-d", "--datastoreId"):
datastoreId = currentValue
elif currentArgument in ("-t", "--thread"):
ThreadCount = int(currentValue)
except getopt.error as err:
# output error, and return with an error code
print(str(err))
if ((datastoreId is None ) or (studyId is None)):
print("DatstoreId (-d or --datastoreId) and studyId (-s or --studyId) must be provided.")
return
client = boto3.client('medical-imaging')
logging.info("Reading the JSON metadata file")
json_dicom_header = hliGetMetadata(datastoreId , studyId , client )
logging.info("Parsing the Header Tags.")
ds = Dataset()
vrlist = []
file_meta = FileMetaDataset()
ds = FileDataset(None, {}, file_meta=file_meta, preamble=b"\0" * 128)
DICOMStudyId = json_dicom_header["ImageSetID"]
PatientLevel = json_dicom_header["Patient"]["DICOM"]
getTags(PatientLevel, ds , vrlist)
StudyLevel = json_dicom_header["Study"]["DICOM"]
getTags(StudyLevel, ds , vrlist)
for series in json_dicom_header["Study"]["Series"]:
getTags(json_dicom_header["Study"]["Series"][series]["DICOM"] , ds , vrlist)
for instance in json_dicom_header["Study"]["Series"][series]["Instances"]:
logging.info(f"Converting instance {instance}")
getDICOMVRs(json_dicom_header["Study"]["Series"][series]["Instances"][instance]["DICOMVRs"] , vrlist)
frameId = json_dicom_header["Study"]["Series"][series]["Instances"][instance]["ImageFrames"][0]["ID"]
getTags( json_dicom_header["Study"]["Series"][series]["Instances"][instance]["DICOM"] , ds , vrlist)
ds.file_meta.TransferSyntaxUID = pydicom.uid.ExplicitVRLittleEndian
ds.is_little_endian = True
ds.is_implicit_VR = False
file_meta.MediaStorageSOPInstanceUID = UID(instance)
pixels = HLIGetFramePixels(datastoreId,DICOMStudyId,frameId, client)
start_time = time.time()
ds.PixelData = pixels.tobytes()
os.makedirs( f"./out/{studyId}" , mode=0o775, exist_ok=True )
ds.save_as(f"./out/{studyId}/{instance}.dcm")
invert = False
if 'PhotometricInterpretation' in ds and ds.PhotometricInterpretation == "MONOCHROME1":
invert=True
saveAsPngPIL(ds , f"./out/{studyId}/{instance}.png", invert=invert )
ds = FileDataset(None, {}, file_meta=file_meta, preamble=b"\0" * 128)
vrlist.clear()
end_time = time.time()
logging.info(f"Outpout save : {stopwatch(start_time,end_time)} ms")
def getDICOMVRs(taglevel, vrlist):
for theKey in taglevel:
vrlist.append( [ theKey , taglevel[theKey] ])
logging.debug(f"[getDICOMVRs] - List of private tags VRs: {vrlist}\r\n")
def getTags(tagLevel, ds , vrlist):
start_time = time.time()
for theKey in tagLevel:
try:
try:
tagvr = pydicom.datadict.dictionary_VR(theKey)
except: #In case the vr is not in the pydicom dictionnary, it might be a private tag , listed in the vrlist
tagvr = None
for vr in vrlist:
if theKey == vr[0] :
tagvr = vr[1]
datavalue=tagLevel[theKey]
if(tagvr == 'SQ'):
logging.debug(f"{theKey} : {tagLevel[theKey]} , {vrlist}")
seqs = []
for underSeq in tagLevel[theKey]:
seqds = Dataset()
getTags(underSeq, seqds, vrlist)
seqs.append(seqds)
datavalue = Sequence(seqs)
continue
if( tagvr in [ 'OB' , 'OD' , 'OF', 'OL', 'OW', 'UN' ] ):
base64_str = tagLevel[theKey]
base64_bytes = base64_str.encode('utf-8')
datavalue = base64.decodebytes(base64_bytes)
data_element = DataElement(theKey , tagvr , datavalue )
if data_element.tag.group != 2:
ds.add(data_element)
except Exception as err:
logging.warning(f"[getTags] - {err}")
continue
end_time = time.time()
logging.info(f"Dataset build : {stopwatch(start_time,end_time)} ms")
def hliGetMetadata(datastoreId, studyId , client):
start_time = time.time()
hli_study_metadata = client.get_image_set_metadata(datastoreId=datastoreId , imageSetId=studyId)
json_study_metadata = gzip.decompress(hli_study_metadata["imageSetMetadataBlob"].read())
json_study_metadata = json.loads(json_study_metadata)
end_time = time.time()
print(f"Metadata fetch : {end_time-start_time}")
return json_study_metadata
def HLIGetFramePixels(datastoreId, studyId, imageFrameId, client):
start_time = time.time()
res = client.get_image_frame(
datastoreId=datastoreId,
imageSetId=studyId,
imageFrameId=imageFrameId)
end_time = time.time()
logging.info(f"Frame fetch : {stopwatch(start_time,end_time)} ms")
start_time = time.time()
b = io.BytesIO()
b.write(res['imageFrameBlob'].read())
b.seek(0)
d = decode(b)
end_time = time.time()
logging.info(f"Frame decode : {stopwatch(start_time,end_time)} ms")
return d
def stopwatch( start_time, end_time ):
time_lapsed = end_time - start_time
return time_lapsed*1000
def saveAsPngPIL(ds: Dataset , destination : str , invert : bool = False):
import numpy as np
shape = ds.pixel_array.shape
image_2d = ds.pixel_array.astype(float)
image_2d_scaled = (np.maximum(image_2d,0) / image_2d.max()) * 255.0
image_2d_scaled = np.uint8(image_2d_scaled)
if invert == True:
image_2d_scaled = np.max(image_2d_scaled) - image_2d_scaled
img = Image.fromarray(image_2d_scaled)
img.save(destination, 'png')
if __name__ == "__main__":
main()