Skip to content

Commit

Permalink
Fixed PST parsing (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
zbalkan authored Dec 16, 2024
1 parent 340e0e4 commit 224f295
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
29 changes: 14 additions & 15 deletions src/formats/pst.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def fetch_all_block_data(self, bid: BID) -> list[bytes]:
raise PANHuntException(
'Expecting data block, got block type %s' % xblock.block_type)
data_list.append(xblock.data_block)
elif block.block_type == Block.btypeXXBLOCK:
elif block.block_type == BlockType.XXBLOCK:
for xxbid in block.rgbid:
xxblock: Block = self.fetch_block(xxbid)
if xxblock.block_type != BlockType.XBLOCK:
Expand Down Expand Up @@ -1220,7 +1220,7 @@ def get_row_cell_value(self, data_bytes: Optional[bytes], tcoldesc: TCOLDESC) ->

if not ptype.is_variable and not ptype.is_multi:
if ptype.byte_count <= 8:
return ptype.value(DATA_bytes)
return ptype.value(data_bytes)

hid = HID(data_bytes)
return ptype.value(self.hn.get_hid_data(hid))
Expand Down Expand Up @@ -1805,9 +1805,9 @@ class Attachment:
DisplayName: str
AttachMethod: int
AttachmentSize: int
AttachFilename: str
AttachLongFilename: str
Filename: str
__AttachFilename: Optional[str] = None
__AttachLongFilename: Optional[str] = None
Filename: Optional[str] = None
BinaryData: Optional[bytes] = None
AttachMimeTag: Optional[str]
AttachExtension: str
Expand Down Expand Up @@ -1836,21 +1836,20 @@ def __init__(self, ltp: LTP, slentry: SLENTRY) -> None:
afn: Optional[PCBTHData] = self.pc.get_raw_data(
PropIdEnum.PidTagAttachFilename.value)
if afn:
self.AttachFilename = panutils.as_str(afn.value) # 8.3 short name
self.__AttachFilename = panutils.as_str(afn.value) # 8.3 short name

alfn: Optional[PCBTHData] = self.pc.get_raw_data(
PropIdEnum.PidTagAttachLongFilename.value)
if alfn:
self.AttachLongFilename = panutils.as_str(alfn.value)
self.__AttachLongFilename = panutils.as_str(alfn.value)

if self.__AttachLongFilename:
self.Filename = self.__AttachLongFilename
elif self.__AttachFilename:
self.Filename = self.__AttachFilename

if self.AttachLongFilename:
self.Filename = self.AttachLongFilename
else:
self.Filename = self.AttachFilename
if self.Filename:
self.Filename = os.path.basename(self.Filename)
else:
self.Filename = '[NoFilename_Method%s]' % self.AttachMethod

if self.AttachMethod == Message.afByValue:
atm: Optional[PCBTHData] = self.pc.get_raw_data(
Expand Down Expand Up @@ -2466,7 +2465,7 @@ def __init__(self, pst_file: str) -> None:
except PermissionError:
self.fd.close()
raise PANHuntException(
f'The PST file is in use (probably by Outlook application).')
'The PST file is in use (probably by Outlook application).')

self.header = Header(self.fd)
if not self.header.validPST:
Expand Down Expand Up @@ -2531,7 +2530,7 @@ def export_all_attachments(self, path: str = '', overwrite: bool = True) -> Gene
if attachment:
if attachment.BinaryData and len(attachment.BinaryData) != 0:
filepath: str = os.path.join(
path, attachment.Filename)
path, attachment.Filename or '')
if overwrite:
if os.path.exists(filepath):
os.remove(filepath)
Expand Down
4 changes: 3 additions & 1 deletion src/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from archive import Archive, GzipArchive, TarArchive, XzArchive, ZipArchive
from enums import FileTypeEnum
from scanner import PlainTextFileScanner, EmlScanner, MboxScanner, MsgScanner, PdfScanner, PstScanner, ScannerBase
from scanner import EmlScanner, MboxScanner, MsgScanner, PdfScanner, PlainTextFileScanner, PstScanner, ScannerBase

# This dictionary is defined at the module level to ensure that only one instance
# of internal_map exists throughout the program's runtime. This prevents unnecessary
Expand Down Expand Up @@ -74,6 +74,8 @@ def __get_filetype(mime_type_text: str, extension: str) -> FileTypeEnum:
if mime_subtype in ["octet-stream"]:
if extension in [".mbox"]:
return FileTypeEnum.Mbox
if extension in [".pst"]:
return FileTypeEnum.MsPst
else:
return FileTypeEnum.Unknown
elif mime_subtype in ['vnd.openxmlformats-officedocument.wordprocessingml.document']:
Expand Down
20 changes: 15 additions & 5 deletions src/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
''' If file size is 30MB or bigger, read line by line for better memory management '''
BLOCK_SIZE: int = 31_457_280 # 30MB

MIN_PAN_LENGTH = 15


class ScannerBase(ABC):

Expand All @@ -32,18 +34,24 @@ def scan(self, job: Job, encoding: str = 'utf8') -> list[PAN]:

text: str
if job.payload:
# Nested attachments may have placeholder data hat is just bytes
# It is better to fall back to utf8 encoding in such cases
if encoding == 'binary':
encoding = 'utf8'

before = len(job.payload)
text = job.payload.decode(
encoding=encoding, errors='backslashreplace')
after = len(text)

if len(text) < MIN_PAN_LENGTH:
return []

finder = PanFinder()
matches.extend(finder.find(text))
else:
s: os.stat_result = os.stat(path=job.abspath)
file_size: int = s.st_size

if file_size == 0:
if file_size < MIN_PAN_LENGTH:
return []

if 0 < file_size < BLOCK_SIZE:
Expand Down Expand Up @@ -164,6 +172,7 @@ def scan(self, job: Job, encoding: str = 'utf8') -> list[PAN]:
matches: list[PAN] = []

if self.pst.header.validPST:
pst_path: str = os.path.abspath(job.abspath)
for folder in self.pst.folder_generator():
for message in self.pst.message_generator(folder=folder):
if message.Subject:
Expand All @@ -182,15 +191,16 @@ def scan(self, job: Job, encoding: str = 'utf8') -> list[PAN]:
matches.extend(body_matches)

if message.HasAttachments:
basename = ':'.join([pst_path, message_path])
for _, subattachment in enumerate(message.subattachments):
if subattachment.Filename:
att: Optional[pstAttachment] = message.get_attachment(
subattachment=subattachment)
if att:
if att and att.Filename:
# Create a job for the attachment and add it to the JobQueue
job = Job(
basename=att.Filename, # Use the attachment filename
dirname=job.basename, # The parent filename
dirname=basename, # The parent filename
payload=att.BinaryData # Pass the binary content directly
)
JobQueue().enqueue(job)
Expand Down

0 comments on commit 224f295

Please sign in to comment.