From f4e05ce23eebfc7fc03cf992532d69fd66c7b38b Mon Sep 17 00:00:00 2001 From: Takeshi Shiomitsu Date: Thu, 18 Nov 2021 17:01:00 +0100 Subject: [PATCH] Add ZIP file handling draft, although be aware there is currently no support for encrypted ZIP files. Trying to extract encrypted ZIP files will just hang with the current implementation. So in the case what we find an encrypted ZIP file, we just fall back to an UnknownChunk, with a defined end_offset. In the future, we should be able to handle an encrypted ZIP chunk, carve it out, but not necessarily extract it if we don't know the password. Perhaps at some point we may want to attempt to brute-force encrypted ZIP files with a password list. --- unblob/file_utils.py | 17 +++++- unblob/handlers/archive/zip.py | 106 +++++++++++++++++---------------- 2 files changed, 70 insertions(+), 53 deletions(-) diff --git a/unblob/file_utils.py b/unblob/file_utils.py index 8b8a4b3d69..c3a91a1295 100644 --- a/unblob/file_utils.py +++ b/unblob/file_utils.py @@ -1,5 +1,5 @@ import math - +import io, os def snull(content: bytes): """Strip null bytes from the end of the string.""" @@ -9,3 +9,18 @@ def snull(content: bytes): def round_up(size: int, alignment: int): """Round up size to the alignment boundary.""" return alignment * math.ceil(size / alignment) + + +def find_first(file: io.BufferedReader, pattern: bytes) -> int: + chunk_size = 0x1000 + compensation = len(pattern) - 1 + bytes_searched = 0 + while True: + # Prepend the padding from the last chunk, to make sure that we find the pattern, even if + # it straddles the chunk boundary. + data = file.read(chunk_size) + marker = data.find(pattern) + if marker != -1: + return marker + bytes_searched + file.seek(-compensation, os.SEEK_CUR) + bytes_searched += chunk_size - compensation diff --git a/unblob/handlers/archive/zip.py b/unblob/handlers/archive/zip.py index 1d3384d04f..e5bc799792 100644 --- a/unblob/handlers/archive/zip.py +++ b/unblob/handlers/archive/zip.py @@ -1,12 +1,11 @@ import io -import logging -from typing import List, Union -from zipfile import ZipFile from dissect.cstruct import cstruct from structlog import get_logger +from typing import List, Set, Tuple, Union -from ...models import UnknownChunk, ValidChunk +from ...models import ValidChunk, UnknownChunk +from ...file_utils import find_first logger = get_logger() @@ -97,65 +96,45 @@ MAXIMUM_VERSION = 0xFF -def _find_end_of_zip(file: io.BufferedReader, start_offset: int) -> int: - """Find the end of the zip file - by looking for the end of central directory header bytes, verifying, then - returning the end of the end of central directory header structure. - """ - file.seek(start_offset) - content = file.read() - end_marker = content.find(b"\x50\x4b\x05\x06") - if end_marker == -1: - logging.debug( - f"ZIP (0x{start_offset:x}): No End of Central Directory headers in the rest of the stream." - ) - return 0 - - file.seek(start_offset + end_marker) - header = cparser.end_of_central_directory(file) - - try: - header.zip_file_comment.decode("utf-8") - except UnicodeDecodeError: - return _find_end_of_zip(file, start_offset + end_marker + 22) - - return start_offset + end_marker + len(header) +def _calculate_zipfile_end(file: io.BufferedReader, start_offset: int) -> int: + # If we just pass a firmware blob with multiple ZIP files in it to zipfile.ZipFile, it seems + # that it will basically scan for the final EOCD record header, and assume that that's where + # the file ends. + # E.g. in the case our firmware image looks like this: + # | ZIPFILE | SOMETHING ELSE | ZIPFILE | + # zipfile.ZipFile() will assume: + # | THIS IS ALL THE SAME ZIPFILE | + # For obvious reasons, this is not helpful in our case. We need to try to guess the length of + # the ZIP file chunk within our firmware image, independently, and then carve that chunk out. + file.seek(start_offset) -def _guess_zip_size(file: io.BufferedReader, start_offset: int): - # If we just pass a full firmware blob to zipfile.ZipFile, somehow, - # the way that it is parsed means that only the final zipfile in the - # blob is recognised, if at all. Sometimes, if the firmware is just - # a big blob of lots of other things, then ZipFile will just throw an - # error. Basically, ZipFile is really bad at dealing with anything - # which isn't actually a ZIP. + # In our case, we want to find the first instance of the EOCD record header, not the last! + zip_end = find_first(file, b"\x50\x4b\x05\x06") + start_offset + file.seek(zip_end) + eocd = cparser.end_of_central_directory(file) + return file.tell() - # For this reason, we need to try to guess the length of the ZIP file - # chunk within our firmware image, and then carve that chunk out. - # Then, we make this is a BytesIO stream, so we can just pass this - # stream to ZipFile. - file_names = set() +def _enumerate_files(file: io.BufferedReader, start_offset: int) -> Tuple[Set, Set]: + # TODO: When we + normal_files = set() encrypted_files = set() - zip_end = _find_end_of_zip(file, start_offset) file.seek(start_offset) - content = io.BytesIO(file.read(zip_end - start_offset)) + with zipfile.ZipFile(file) as zip_file: + for zip_info in zip_file.infolist(): + if zip_info.flag_bits & 0b0001: + encrypted_files.add(zip_info.filename) + normal_files.add(zip_info.filename) - with ZipFile(content) as z: - logger.info("Found ZIP filenames", filenames=[x.filename for x in z.infolist()]) - for g in z.infolist(): - if g.flag_bits & 0b0001: - encrypted_files.add(g.filename) - file_names.add(g.filename) - - size = zip_end - start_offset - return size + return (normal_files, encrypted_files) def calculate_chunk( file: io.BufferedReader, start_offset: int ) -> Union[ValidChunk, UnknownChunk]: + header = cparser.local_file_header(file) if header.version_needed_to_extract > MAXIMUM_VERSION: return UnknownChunk( @@ -163,12 +142,35 @@ def calculate_chunk( reason=f"ZIP (0x{start_offset:x}): Version too high!", ) - size = _guess_zip_size(file, start_offset) + end_of_zip = _calculate_zipfile_end(file, start_offset) + + file.seek(start_offset) + + encrypted_files = set() + all_files = set() + + this_zip_chunk = io.BytesIO(file.read(end_of_zip - start_offset)) + with zipfile.ZipFile(this_zip_chunk) as zip: + for zipinfo in zip.infolist(): + if zipinfo.flag_bits & 0b0001: + encrypted_files.add(zipinfo.filename) + all_files.add(zipinfo.filename) + + if len(encrypted_files) > 0: + # TODO: We can't handle encrypted ZIP files yet, so we fall back to the UnknownChunk in the + # cases where there are encrypted files in the ZIP. + return UnknownChunk( + start_offset=start_offset, + end_offset=end_of_zip, + reason=f"ZIP contains encrypted files.", + ) + return ValidChunk( start_offset=start_offset, - end_offset=start_offset + size, + end_offset=end_of_zip, ) def make_extract_command(inpath: str, outdir: str) -> List[str]: + # TODO: This will just hang waiting for user input if any the ZIP is encrypted. return ["unzip", inpath, "-d", outdir]