Skip to content

Commit

Permalink
playing with copilot for docs + tests (#2488)
Browse files Browse the repository at this point in the history
  • Loading branch information
doomedraven authored Feb 9, 2025
1 parent db1ab36 commit 53c3106
Show file tree
Hide file tree
Showing 17 changed files with 840 additions and 59 deletions.
98 changes: 79 additions & 19 deletions lib/cuckoo/common/cape_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
BUFSIZE = int(cfg.processing.analysis_size_limit)


def hash_file(method, path):
def hash_file(method, path:str) -> str:
"""Calculates an hash on a file by path.
@param method: callable hashing method
@param path: file path
Expand All @@ -143,6 +143,17 @@ def convert(data):


def is_duplicated_binary(file_info: dict, cape_file: dict, append_file: bool) -> bool:
"""
Determines if a binary file is a duplicate based on various criteria.
Args:
file_info (dict): Information about the file being checked.
cape_file (dict): Information about the existing CAPE file.
append_file (bool): Flag indicating whether to append the file.
Returns:
bool: False if the file is determined to be a duplicate, otherwise returns the value of append_file.
"""
if HAVE_PYDEEP:
ssdeep_grade = pydeep.compare(file_info["ssdeep"].encode(), cape_file["ssdeep"].encode())
if ssdeep_grade >= ssdeep_threshold:
Expand All @@ -162,9 +173,25 @@ def is_duplicated_binary(file_info: dict, cape_file: dict, append_file: bool) ->
return append_file


def static_config_parsers(cape_name, file_path, file_data):
def static_config_parsers(cape_name: str, file_path:str, file_data: bytes) -> dict:
"""
Process CAPE Yara hits and extract configuration data using various parsers.
This function attempts to extract configuration data from a given file using different parsers
such as CAPE extractors, DC3-MWCP, and Malwareconfigs. The function returns a dictionary containing
the extracted configuration data.
Args:
cape_name (str): The name of the CAPE parser to use.
file_path (str): The path to the file being analyzed.
file_data (bytes): The binary data of the file being analyzed.
Returns:
dict: A dictionary containing the extracted configuration data. If no configuration data is
extracted, an empty dictionary is returned.
"""
"""Process CAPE Yara hits"""
cape_config = {cape_name: {}}
cape_config = {}
parser_loaded = False
# CAPE - pure python parsers
# MWCP
Expand All @@ -184,14 +211,14 @@ def static_config_parsers(cape_name, file_path, file_data):
# python3 map object returns iterator by default, not list and not serializeable in JSON.
if isinstance(value, map):
value = list(value)
cape_config[cape_name].update({key: [value]})
cape_config.setdefault(cape_name, {}).update({key: [value]})
parser_loaded = True
elif isinstance(cape_configraw, dict):
for key, value in cape_configraw.items():
# python3 map object returns iterator by default, not list and not serializeable in JSON.
if isinstance(value, map):
value = list(value)
cape_config[cape_name].update({key: [value]})
cape_config.setdefault(cape_name, {}).update({key: [value]})
parser_loaded = True
except Exception as e:
log.error("CAPE: parsing error on %s with %s: %s", file_path, cape_name, e, exc_info=True)
Expand All @@ -215,7 +242,7 @@ def static_config_parsers(cape_name, file_path, file_data):
del reportmeta["other"]

tmp_dict.update(reportmeta)
cape_config[cape_name] = convert(tmp_dict)
cape_config.setdefault(cape_name, {}).update(convert(tmp_dict))
log.debug("CAPE: DC3-MWCP parser for %s completed", cape_name)
else:
error_lines = report.errors[0].split("\n")
Expand Down Expand Up @@ -252,10 +279,10 @@ def static_config_parsers(cape_name, file_path, file_data):
# ToDo remove
if isinstance(malwareconfig_config, list):
for key, value in malwareconfig_config[0].items():
cape_config[cape_name].update({key: [value]})
cape_config.setdefault(cape_name, {}).update({key: [value]})
elif isinstance(malwareconfig_config, dict):
for key, value in malwareconfig_config.items():
cape_config[cape_name].update({key: [value]})
cape_config.setdefault(cape_name, {}).update({key: [value]})
except Exception as e:
if "rules" in str(e):
log.warning("You probably need to compile yara-python with dotnet support")
Expand All @@ -267,9 +294,6 @@ def static_config_parsers(cape_name, file_path, file_data):
cape_name,
str(e),
)

if cape_config.get(cape_name) == {}:
return {}
"""
elif HAVE_MALDUCK and not parser_loaded and cape_name.lower() in malduck_modules_names:
log.debug("Running Malduck on %s", file_path)
Expand All @@ -290,14 +314,26 @@ def static_config_parsers(cape_name, file_path, file_data):
del ext
if tmp_config:
for key, value in tmp_config[0].items():
cape_config[cape_name].update({key: [value]})
cape_config.setdefault(cape_name, {}).update({key: [value]})
"""
if not cape_config[cape_name]:
return {}

return cape_config


def static_config_lookup(file_path, sha256=False):
def static_config_lookup(file_path: str, sha256: str=False) -> dict:
"""
Look up static configuration information for a given file based on its SHA-256 hash.
This function calculates the SHA-256 hash of the file at the specified path if not provided,
and then queries either a MongoDB or Elasticsearch database to retrieve configuration information.
Args:
file_path (str): The path to the file for which to look up configuration information.
sha256 (str, optional): The SHA-256 hash of the file. If not provided, it will be calculated.
Returns:
dict or None: A dictionary containing the configuration information if found, otherwise None.
"""
if not sha256:
sha256 = hashlib.sha256(open(file_path, "rb").read()).hexdigest()

Expand Down Expand Up @@ -327,13 +363,26 @@ def static_config_lookup(file_path, sha256=False):
named_static_extractors = []


def static_extraction(path):
config = False
def static_extraction(path:str) -> dict:
"""
Extracts static configuration from a file using YARA rules and named static extractors.
Args:
path (str): The file path to be analyzed.
Returns:
dict or bool: The extracted configuration as a dictionary if successful,
False if no configuration is found or an error occurs.
Raises:
Exception: Logs any exceptions that occur during the extraction process.
"""
config = {}
try:
hits = File(path).get_yara(category="CAPE")
path_name = Path(path).name
if not hits and path_name not in named_static_extractors:
return False
return config
file_data = path_read_file(path)
if path_name in named_static_extractors:
config = static_config_parsers(path_name, path, file_data)
Expand All @@ -349,7 +398,18 @@ def static_extraction(path):
return config


def cape_name_from_yara(details, pid, results):
def cape_name_from_yara(details: dict, pid: int, results: dict) -> str:
"""
Extracts the CAPE name from YARA hit details and associates it with a process ID (pid) in the results dictionary.
Args:
details (dict): A dictionary containing YARA hit details, expected to have a key "cape_yara" with a list of hits.
pid (int): The process ID to associate the CAPE name with.
results (dict): A dictionary to store the association between detections and process IDs.
Returns:
str: The CAPE name extracted from the YARA hit, or None if no CAPE name is found.
"""
for hit in details.get("cape_yara", []) or []:
if File.yara_hit_provides_detection(hit):
if "detections2pid" not in results:
Expand Down
16 changes: 15 additions & 1 deletion lib/cuckoo/common/dotnet_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@
log = logging.getLogger("dotnet_utils")


def dotnet_user_strings(file: str = False, data: bytes = False, dn_whitelisting: list = []):
def dotnet_user_strings(file: str = False, data: bytes = False, dn_whitelisting: list = []) -> list:
"""
Extracts user strings from a .NET file or data blob using dnfile.
Args:
file (str): Path to the .NET file. Default is False.
data (bytes): Byte data of the .NET file. Default is False.
dn_whitelisting (list): List of string patterns to whitelist. Default is an empty list.
Returns:
list: A list of extracted user strings that are not in the whitelist.
Raises:
Exception: If there is an error processing the .NET file or data.
"""

if not HAVE_DNFILE:
return []
Expand Down
67 changes: 66 additions & 1 deletion lib/cuckoo/common/extractor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@

# dotnet
def get_mdtoken(data: bytes) -> int:
"""
Extracts a metadata token from the given byte data.
The function interprets the first 4 bytes of the input data as an unsigned
integer in little-endian format and then masks it with 0xFFFFFF to obtain
the metadata token.
Args:
data (bytes): The byte data from which to extract the metadata token.
Returns:
int: The extracted metadata token.
"""
return struct.unpack_from("<I", data)[0] & 0xFFFFFF


Expand All @@ -15,6 +28,20 @@ def get_data_offset(pe: pefile.PE, string_offset: int, addr: int) -> int:

def calc_section_alignment(pe: pefile.PE, offset: int, addr: int) -> int:
"""
Calculate the alignment between two sections in a PE file.
Args:
pe (pefile.PE): The PE file object.
offset (int): The offset value, typically calculated as
struct.unpack("i", blob[0x43:0x47])[0] + 0x47.
addr (int): The address where data starts, which can be a YARA address match.
Returns:
int: The calculated alignment between the sections. Returns 0 if sections are not found or an error occurs.
Raises:
Exception: If an error occurs during the calculation, it will be caught and printed.
offset is: Ex struct.unpack("i", blob[0x43:0x47])[0] + 0x47
addr is where data starts, can be YARA address match
"""
Expand All @@ -31,7 +58,20 @@ def calc_section_alignment(pe: pefile.PE, offset: int, addr: int) -> int:
return alignment


def function_offset_from_VA(addr, blob, pe):
def function_offset_from_VA(addr: int, blob:bytes, pe:pefile.PE):
"""
Calculate the function offset from a given virtual address (VA) in a PE file.
Args:
addr (int): The virtual address to start from.
blob (bytes): The binary data blob containing the instructions.
pe (PE): The PE file object, typically from the pefile module.
Returns:
tuple: A tuple containing:
- function_addr (int): The calculated function address.
- offset (int): The offset of the next instruction after the function call.
"""
shift_pos = blob.find(b"\xE8") + 1
function_addr = pe.get_rva_from_offset(addr + shift_pos) + pe.OPTIONAL_HEADER.ImageBase
# print(f"Getting offset for function: {hex(function_addr)}")
Expand All @@ -41,6 +81,19 @@ def function_offset_from_VA(addr, blob, pe):


def function_offset_from_offset(addr: int, binary: bytes, pe: pefile.PE):
"""
Calculates the virtual address and file offset of a subfunction call within a binary.
Args:
addr (int): The starting address to search for the CALL instruction.
binary (bytes): The binary data of the executable.
pe (pefile.PE): The PE file object representing the executable.
Returns:
tuple: A tuple containing:
- call_virtual_address (int): The virtual address of the CALL instruction.
- subfunc_file_offset (int): The file offset of the subfunction being called.
"""
# where our subcall starts - example: 8
shift_pos = binary[addr:].find(b"\xE8")
call_file_offset = addr + shift_pos
Expand All @@ -56,6 +109,18 @@ def function_offset_from_offset(addr: int, binary: bytes, pe: pefile.PE):


def find_function_xrefs(data, start, end):
"""
Finds function cross-references (xrefs) within a specified range in the given binary data.
Args:
data (bytes): The binary data to search for function xrefs.
start (int): The starting address (inclusive) of the range to search.
end (int): The ending address (exclusive) of the range to search.
Returns:
dict: A dictionary where keys are target addresses of CALL instructions and values are lists of addresses
where these CALL instructions are located.
"""
function_xrefs = {}
# The re.finditer function only finds *non-overlapping* matches, which fails to find some CALL instructions
for rva in range(start, end):
Expand Down
10 changes: 10 additions & 0 deletions lib/cuckoo/common/fraunhofer_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@


def get_dga_lookup_dict():
"""
Retrieves the DGA (Domain Generation Algorithm) lookup dictionary from a gzipped JSON file.
The function constructs the file path to the DGA lookup dictionary, checks if the file exists,
and if it does, reads and decompresses the file, then loads its contents as a JSON object.
If the file does not exist, it returns an empty dictionary.
Returns:
dict: The DGA lookup dictionary if the file exists, otherwise an empty dictionary.
"""
dga_lookup_path = os.path.join(CUCKOO_ROOT, "data", "dga_lookup_dict.json.gz")
if path_exists(dga_lookup_path):
with gzip.GzipFile(dga_lookup_path, "r") as fin:
Expand Down
18 changes: 18 additions & 0 deletions lib/cuckoo/common/hypervisor_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@


def proxmox_shutdown_vm(machineName: str):
"""
Shuts down a virtual machine on a Proxmox server.
Args:
machineName (str): The name of the virtual machine to shut down.
Raises:
Exception: If there is an error during the shutdown process.
Notes:
- This function does not support multiple Proxmox servers.
- The Proxmox server configuration is expected to be available in the `proxmox_conf` object.
- The function retrieves the VM ID from the `proxmox_conf.Node_1` configuration using the provided machine name.
- The function sends a POST request to the Proxmox API to obtain an authentication ticket and CSRF prevention token.
- The function then sends another POST request to shut down the specified virtual machine.
- If the shutdown is successful, a message is printed to indicate success.
- If an error occurs, it is caught and printed.
"""

proxmox_server = proxmox_conf.proxmox.hostname
# Not supporting multiple servers
Expand Down
5 changes: 1 addition & 4 deletions lib/cuckoo/common/irc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@

from lib.cuckoo.common.utils import convert_to_printable

try:
import re2 as re
except ImportError:
import re
import re

log = logging.getLogger("Processing.Pcap.irc.protocol")

Expand Down
Loading

0 comments on commit 53c3106

Please sign in to comment.