Skip to content

Commit

Permalink
Add ZIP file handling draft, although be aware there is currently no …
Browse files Browse the repository at this point in the history
…support for encrypted ZIP files.

Trying to extract encrypted ZIP files will just hang with the current implementation.

So in the case what we find an encrypted ZIP file, we just fall back to an UnknownChunk, with a defined end_offset.

In the future, we should be able to handle an encrypted ZIP chunk, carve it out, but not necessarily extract it if we don't know the password.

Perhaps at some point we may want to attempt to brute-force encrypted ZIP files with a password list.
  • Loading branch information
Takeshi Shiomitsu authored and takeshi committed Nov 23, 2021
1 parent e1d368c commit f4e05ce
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 53 deletions.
17 changes: 16 additions & 1 deletion unblob/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math

import io, os

def snull(content: bytes):
"""Strip null bytes from the end of the string."""
Expand All @@ -9,3 +9,18 @@ def snull(content: bytes):
def round_up(size: int, alignment: int):
"""Round up size to the alignment boundary."""
return alignment * math.ceil(size / alignment)


def find_first(file: io.BufferedReader, pattern: bytes) -> int:
chunk_size = 0x1000
compensation = len(pattern) - 1
bytes_searched = 0
while True:
# Prepend the padding from the last chunk, to make sure that we find the pattern, even if
# it straddles the chunk boundary.
data = file.read(chunk_size)
marker = data.find(pattern)
if marker != -1:
return marker + bytes_searched
file.seek(-compensation, os.SEEK_CUR)
bytes_searched += chunk_size - compensation
106 changes: 54 additions & 52 deletions unblob/handlers/archive/zip.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import io
import logging
from typing import List, Union
from zipfile import ZipFile

from dissect.cstruct import cstruct
from structlog import get_logger
from typing import List, Set, Tuple, Union

from ...models import UnknownChunk, ValidChunk
from ...models import ValidChunk, UnknownChunk
from ...file_utils import find_first

logger = get_logger()

Expand Down Expand Up @@ -97,78 +96,81 @@
MAXIMUM_VERSION = 0xFF


def _find_end_of_zip(file: io.BufferedReader, start_offset: int) -> int:
"""Find the end of the zip file
by looking for the end of central directory header bytes, verifying, then
returning the end of the end of central directory header structure.
"""
file.seek(start_offset)
content = file.read()
end_marker = content.find(b"\x50\x4b\x05\x06")
if end_marker == -1:
logging.debug(
f"ZIP (0x{start_offset:x}): No End of Central Directory headers in the rest of the stream."
)
return 0

file.seek(start_offset + end_marker)
header = cparser.end_of_central_directory(file)

try:
header.zip_file_comment.decode("utf-8")
except UnicodeDecodeError:
return _find_end_of_zip(file, start_offset + end_marker + 22)

return start_offset + end_marker + len(header)
def _calculate_zipfile_end(file: io.BufferedReader, start_offset: int) -> int:
# If we just pass a firmware blob with multiple ZIP files in it to zipfile.ZipFile, it seems
# that it will basically scan for the final EOCD record header, and assume that that's where
# the file ends.
# E.g. in the case our firmware image looks like this:
# | ZIPFILE | SOMETHING ELSE | ZIPFILE |
# zipfile.ZipFile() will assume:
# | THIS IS ALL THE SAME ZIPFILE |
# For obvious reasons, this is not helpful in our case. We need to try to guess the length of
# the ZIP file chunk within our firmware image, independently, and then carve that chunk out.

file.seek(start_offset)

def _guess_zip_size(file: io.BufferedReader, start_offset: int):
# If we just pass a full firmware blob to zipfile.ZipFile, somehow,
# the way that it is parsed means that only the final zipfile in the
# blob is recognised, if at all. Sometimes, if the firmware is just
# a big blob of lots of other things, then ZipFile will just throw an
# error. Basically, ZipFile is really bad at dealing with anything
# which isn't actually a ZIP.
# In our case, we want to find the first instance of the EOCD record header, not the last!
zip_end = find_first(file, b"\x50\x4b\x05\x06") + start_offset
file.seek(zip_end)
eocd = cparser.end_of_central_directory(file)
return file.tell()

# For this reason, we need to try to guess the length of the ZIP file
# chunk within our firmware image, and then carve that chunk out.
# Then, we make this is a BytesIO stream, so we can just pass this
# stream to ZipFile.

file_names = set()
def _enumerate_files(file: io.BufferedReader, start_offset: int) -> Tuple[Set, Set]:
# TODO: When we
normal_files = set()
encrypted_files = set()
zip_end = _find_end_of_zip(file, start_offset)

file.seek(start_offset)
content = io.BytesIO(file.read(zip_end - start_offset))
with zipfile.ZipFile(file) as zip_file:
for zip_info in zip_file.infolist():
if zip_info.flag_bits & 0b0001:
encrypted_files.add(zip_info.filename)
normal_files.add(zip_info.filename)

with ZipFile(content) as z:
logger.info("Found ZIP filenames", filenames=[x.filename for x in z.infolist()])
for g in z.infolist():
if g.flag_bits & 0b0001:
encrypted_files.add(g.filename)
file_names.add(g.filename)

size = zip_end - start_offset
return size
return (normal_files, encrypted_files)


def calculate_chunk(
file: io.BufferedReader, start_offset: int
) -> Union[ValidChunk, UnknownChunk]:

header = cparser.local_file_header(file)
if header.version_needed_to_extract > MAXIMUM_VERSION:
return UnknownChunk(
start_offset=start_offset,
reason=f"ZIP (0x{start_offset:x}): Version too high!",
)

size = _guess_zip_size(file, start_offset)
end_of_zip = _calculate_zipfile_end(file, start_offset)

file.seek(start_offset)

encrypted_files = set()
all_files = set()

this_zip_chunk = io.BytesIO(file.read(end_of_zip - start_offset))
with zipfile.ZipFile(this_zip_chunk) as zip:
for zipinfo in zip.infolist():
if zipinfo.flag_bits & 0b0001:
encrypted_files.add(zipinfo.filename)
all_files.add(zipinfo.filename)

if len(encrypted_files) > 0:
# TODO: We can't handle encrypted ZIP files yet, so we fall back to the UnknownChunk in the
# cases where there are encrypted files in the ZIP.
return UnknownChunk(
start_offset=start_offset,
end_offset=end_of_zip,
reason=f"ZIP contains encrypted files.",
)

return ValidChunk(
start_offset=start_offset,
end_offset=start_offset + size,
end_offset=end_of_zip,
)


def make_extract_command(inpath: str, outdir: str) -> List[str]:
# TODO: This will just hang waiting for user input if any the ZIP is encrypted.
return ["unzip", inpath, "-d", outdir]

0 comments on commit f4e05ce

Please sign in to comment.