diff --git a/lib/cuckoo/common/cape_utils.py b/lib/cuckoo/common/cape_utils.py index 38c8ef99400..f3796f72e70 100644 --- a/lib/cuckoo/common/cape_utils.py +++ b/lib/cuckoo/common/cape_utils.py @@ -116,7 +116,7 @@ BUFSIZE = int(cfg.processing.analysis_size_limit) -def hash_file(method, path): +def hash_file(method, path:str) -> str: """Calculates an hash on a file by path. @param method: callable hashing method @param path: file path @@ -143,6 +143,17 @@ def convert(data): def is_duplicated_binary(file_info: dict, cape_file: dict, append_file: bool) -> bool: + """ + Determines if a binary file is a duplicate based on various criteria. + + Args: + file_info (dict): Information about the file being checked. + cape_file (dict): Information about the existing CAPE file. + append_file (bool): Flag indicating whether to append the file. + + Returns: + bool: False if the file is determined to be a duplicate, otherwise returns the value of append_file. + """ if HAVE_PYDEEP: ssdeep_grade = pydeep.compare(file_info["ssdeep"].encode(), cape_file["ssdeep"].encode()) if ssdeep_grade >= ssdeep_threshold: @@ -162,9 +173,25 @@ def is_duplicated_binary(file_info: dict, cape_file: dict, append_file: bool) -> return append_file -def static_config_parsers(cape_name, file_path, file_data): +def static_config_parsers(cape_name: str, file_path:str, file_data: bytes) -> dict: + """ + Process CAPE Yara hits and extract configuration data using various parsers. + + This function attempts to extract configuration data from a given file using different parsers + such as CAPE extractors, DC3-MWCP, and Malwareconfigs. The function returns a dictionary containing + the extracted configuration data. + + Args: + cape_name (str): The name of the CAPE parser to use. + file_path (str): The path to the file being analyzed. + file_data (bytes): The binary data of the file being analyzed. + + Returns: + dict: A dictionary containing the extracted configuration data. If no configuration data is + extracted, an empty dictionary is returned. + """ """Process CAPE Yara hits""" - cape_config = {cape_name: {}} + cape_config = {} parser_loaded = False # CAPE - pure python parsers # MWCP @@ -184,14 +211,14 @@ def static_config_parsers(cape_name, file_path, file_data): # python3 map object returns iterator by default, not list and not serializeable in JSON. if isinstance(value, map): value = list(value) - cape_config[cape_name].update({key: [value]}) + cape_config.setdefault(cape_name, {}).update({key: [value]}) parser_loaded = True elif isinstance(cape_configraw, dict): for key, value in cape_configraw.items(): # python3 map object returns iterator by default, not list and not serializeable in JSON. if isinstance(value, map): value = list(value) - cape_config[cape_name].update({key: [value]}) + cape_config.setdefault(cape_name, {}).update({key: [value]}) parser_loaded = True except Exception as e: log.error("CAPE: parsing error on %s with %s: %s", file_path, cape_name, e, exc_info=True) @@ -215,7 +242,7 @@ def static_config_parsers(cape_name, file_path, file_data): del reportmeta["other"] tmp_dict.update(reportmeta) - cape_config[cape_name] = convert(tmp_dict) + cape_config.setdefault(cape_name, {}).update(convert(tmp_dict)) log.debug("CAPE: DC3-MWCP parser for %s completed", cape_name) else: error_lines = report.errors[0].split("\n") @@ -252,10 +279,10 @@ def static_config_parsers(cape_name, file_path, file_data): # ToDo remove if isinstance(malwareconfig_config, list): for key, value in malwareconfig_config[0].items(): - cape_config[cape_name].update({key: [value]}) + cape_config.setdefault(cape_name, {}).update({key: [value]}) elif isinstance(malwareconfig_config, dict): for key, value in malwareconfig_config.items(): - cape_config[cape_name].update({key: [value]}) + cape_config.setdefault(cape_name, {}).update({key: [value]}) except Exception as e: if "rules" in str(e): log.warning("You probably need to compile yara-python with dotnet support") @@ -267,9 +294,6 @@ def static_config_parsers(cape_name, file_path, file_data): cape_name, str(e), ) - - if cape_config.get(cape_name) == {}: - return {} """ elif HAVE_MALDUCK and not parser_loaded and cape_name.lower() in malduck_modules_names: log.debug("Running Malduck on %s", file_path) @@ -290,14 +314,26 @@ def static_config_parsers(cape_name, file_path, file_data): del ext if tmp_config: for key, value in tmp_config[0].items(): - cape_config[cape_name].update({key: [value]}) + cape_config.setdefault(cape_name, {}).update({key: [value]}) """ - if not cape_config[cape_name]: - return {} + return cape_config -def static_config_lookup(file_path, sha256=False): +def static_config_lookup(file_path: str, sha256: str=False) -> dict: + """ + Look up static configuration information for a given file based on its SHA-256 hash. + + This function calculates the SHA-256 hash of the file at the specified path if not provided, + and then queries either a MongoDB or Elasticsearch database to retrieve configuration information. + + Args: + file_path (str): The path to the file for which to look up configuration information. + sha256 (str, optional): The SHA-256 hash of the file. If not provided, it will be calculated. + + Returns: + dict or None: A dictionary containing the configuration information if found, otherwise None. + """ if not sha256: sha256 = hashlib.sha256(open(file_path, "rb").read()).hexdigest() @@ -327,13 +363,26 @@ def static_config_lookup(file_path, sha256=False): named_static_extractors = [] -def static_extraction(path): - config = False +def static_extraction(path:str) -> dict: + """ + Extracts static configuration from a file using YARA rules and named static extractors. + + Args: + path (str): The file path to be analyzed. + + Returns: + dict or bool: The extracted configuration as a dictionary if successful, + False if no configuration is found or an error occurs. + + Raises: + Exception: Logs any exceptions that occur during the extraction process. + """ + config = {} try: hits = File(path).get_yara(category="CAPE") path_name = Path(path).name if not hits and path_name not in named_static_extractors: - return False + return config file_data = path_read_file(path) if path_name in named_static_extractors: config = static_config_parsers(path_name, path, file_data) @@ -349,7 +398,18 @@ def static_extraction(path): return config -def cape_name_from_yara(details, pid, results): +def cape_name_from_yara(details: dict, pid: int, results: dict) -> str: + """ + Extracts the CAPE name from YARA hit details and associates it with a process ID (pid) in the results dictionary. + + Args: + details (dict): A dictionary containing YARA hit details, expected to have a key "cape_yara" with a list of hits. + pid (int): The process ID to associate the CAPE name with. + results (dict): A dictionary to store the association between detections and process IDs. + + Returns: + str: The CAPE name extracted from the YARA hit, or None if no CAPE name is found. + """ for hit in details.get("cape_yara", []) or []: if File.yara_hit_provides_detection(hit): if "detections2pid" not in results: diff --git a/lib/cuckoo/common/dotnet_utils.py b/lib/cuckoo/common/dotnet_utils.py index d2ba9be05a9..f053eb31e1a 100644 --- a/lib/cuckoo/common/dotnet_utils.py +++ b/lib/cuckoo/common/dotnet_utils.py @@ -13,7 +13,21 @@ log = logging.getLogger("dotnet_utils") -def dotnet_user_strings(file: str = False, data: bytes = False, dn_whitelisting: list = []): +def dotnet_user_strings(file: str = False, data: bytes = False, dn_whitelisting: list = []) -> list: + """ + Extracts user strings from a .NET file or data blob using dnfile. + + Args: + file (str): Path to the .NET file. Default is False. + data (bytes): Byte data of the .NET file. Default is False. + dn_whitelisting (list): List of string patterns to whitelist. Default is an empty list. + + Returns: + list: A list of extracted user strings that are not in the whitelist. + + Raises: + Exception: If there is an error processing the .NET file or data. + """ if not HAVE_DNFILE: return [] diff --git a/lib/cuckoo/common/extractor_utils.py b/lib/cuckoo/common/extractor_utils.py index 69ae3428a00..1b21d7066c8 100644 --- a/lib/cuckoo/common/extractor_utils.py +++ b/lib/cuckoo/common/extractor_utils.py @@ -5,6 +5,19 @@ # dotnet def get_mdtoken(data: bytes) -> int: + """ + Extracts a metadata token from the given byte data. + + The function interprets the first 4 bytes of the input data as an unsigned + integer in little-endian format and then masks it with 0xFFFFFF to obtain + the metadata token. + + Args: + data (bytes): The byte data from which to extract the metadata token. + + Returns: + int: The extracted metadata token. + """ return struct.unpack_from(" int: def calc_section_alignment(pe: pefile.PE, offset: int, addr: int) -> int: """ + Calculate the alignment between two sections in a PE file. + + Args: + pe (pefile.PE): The PE file object. + offset (int): The offset value, typically calculated as + struct.unpack("i", blob[0x43:0x47])[0] + 0x47. + addr (int): The address where data starts, which can be a YARA address match. + + Returns: + int: The calculated alignment between the sections. Returns 0 if sections are not found or an error occurs. + + Raises: + Exception: If an error occurs during the calculation, it will be caught and printed. + offset is: Ex struct.unpack("i", blob[0x43:0x47])[0] + 0x47 addr is where data starts, can be YARA address match """ @@ -31,7 +58,20 @@ def calc_section_alignment(pe: pefile.PE, offset: int, addr: int) -> int: return alignment -def function_offset_from_VA(addr, blob, pe): +def function_offset_from_VA(addr: int, blob:bytes, pe:pefile.PE): + """ + Calculate the function offset from a given virtual address (VA) in a PE file. + + Args: + addr (int): The virtual address to start from. + blob (bytes): The binary data blob containing the instructions. + pe (PE): The PE file object, typically from the pefile module. + + Returns: + tuple: A tuple containing: + - function_addr (int): The calculated function address. + - offset (int): The offset of the next instruction after the function call. + """ shift_pos = blob.find(b"\xE8") + 1 function_addr = pe.get_rva_from_offset(addr + shift_pos) + pe.OPTIONAL_HEADER.ImageBase # print(f"Getting offset for function: {hex(function_addr)}") @@ -41,6 +81,19 @@ def function_offset_from_VA(addr, blob, pe): def function_offset_from_offset(addr: int, binary: bytes, pe: pefile.PE): + """ + Calculates the virtual address and file offset of a subfunction call within a binary. + + Args: + addr (int): The starting address to search for the CALL instruction. + binary (bytes): The binary data of the executable. + pe (pefile.PE): The PE file object representing the executable. + + Returns: + tuple: A tuple containing: + - call_virtual_address (int): The virtual address of the CALL instruction. + - subfunc_file_offset (int): The file offset of the subfunction being called. + """ # where our subcall starts - example: 8 shift_pos = binary[addr:].find(b"\xE8") call_file_offset = addr + shift_pos @@ -56,6 +109,18 @@ def function_offset_from_offset(addr: int, binary: bytes, pe: pefile.PE): def find_function_xrefs(data, start, end): + """ + Finds function cross-references (xrefs) within a specified range in the given binary data. + + Args: + data (bytes): The binary data to search for function xrefs. + start (int): The starting address (inclusive) of the range to search. + end (int): The ending address (exclusive) of the range to search. + + Returns: + dict: A dictionary where keys are target addresses of CALL instructions and values are lists of addresses + where these CALL instructions are located. + """ function_xrefs = {} # The re.finditer function only finds *non-overlapping* matches, which fails to find some CALL instructions for rva in range(start, end): diff --git a/lib/cuckoo/common/fraunhofer_helper.py b/lib/cuckoo/common/fraunhofer_helper.py index 57f1f25e7fb..c9eabe18165 100644 --- a/lib/cuckoo/common/fraunhofer_helper.py +++ b/lib/cuckoo/common/fraunhofer_helper.py @@ -22,6 +22,16 @@ def get_dga_lookup_dict(): + """ + Retrieves the DGA (Domain Generation Algorithm) lookup dictionary from a gzipped JSON file. + + The function constructs the file path to the DGA lookup dictionary, checks if the file exists, + and if it does, reads and decompresses the file, then loads its contents as a JSON object. + If the file does not exist, it returns an empty dictionary. + + Returns: + dict: The DGA lookup dictionary if the file exists, otherwise an empty dictionary. + """ dga_lookup_path = os.path.join(CUCKOO_ROOT, "data", "dga_lookup_dict.json.gz") if path_exists(dga_lookup_path): with gzip.GzipFile(dga_lookup_path, "r") as fin: diff --git a/lib/cuckoo/common/hypervisor_config.py b/lib/cuckoo/common/hypervisor_config.py index 1f649177d76..53a36949bf5 100644 --- a/lib/cuckoo/common/hypervisor_config.py +++ b/lib/cuckoo/common/hypervisor_config.py @@ -8,6 +8,24 @@ def proxmox_shutdown_vm(machineName: str): + """ + Shuts down a virtual machine on a Proxmox server. + + Args: + machineName (str): The name of the virtual machine to shut down. + + Raises: + Exception: If there is an error during the shutdown process. + + Notes: + - This function does not support multiple Proxmox servers. + - The Proxmox server configuration is expected to be available in the `proxmox_conf` object. + - The function retrieves the VM ID from the `proxmox_conf.Node_1` configuration using the provided machine name. + - The function sends a POST request to the Proxmox API to obtain an authentication ticket and CSRF prevention token. + - The function then sends another POST request to shut down the specified virtual machine. + - If the shutdown is successful, a message is printed to indicate success. + - If an error occurs, it is caught and printed. + """ proxmox_server = proxmox_conf.proxmox.hostname # Not supporting multiple servers diff --git a/lib/cuckoo/common/irc.py b/lib/cuckoo/common/irc.py index 5345829aefa..542e1099f0f 100644 --- a/lib/cuckoo/common/irc.py +++ b/lib/cuckoo/common/irc.py @@ -10,10 +10,7 @@ from lib.cuckoo.common.utils import convert_to_printable -try: - import re2 as re -except ImportError: - import re +import re log = logging.getLogger("Processing.Pcap.irc.protocol") diff --git a/lib/cuckoo/common/load_extra_modules.py b/lib/cuckoo/common/load_extra_modules.py index b2d920f8628..a3d02ad76b5 100644 --- a/lib/cuckoo/common/load_extra_modules.py +++ b/lib/cuckoo/common/load_extra_modules.py @@ -11,6 +11,25 @@ def ratdecodedr_load_decoders(path: str): + """ + Loads and returns a dictionary of RAT decoder modules from the specified path. + + This function walks recursively through all modules and packages in the given path, + imports them, and collects classes that are subclasses of the `Decoder` class from + the `malwareconfig.common` module. It skips packages and handles import errors gracefully. + + Args: + path (str): The path to the directory containing the RAT decoder modules. + + Returns: + dict: A dictionary where the keys are decoder names and the values are dictionaries + containing the following information about each decoder: + - obj: The decoder class object. + - decoder_name: The name of the decoder. + - decoder_description: A description of the decoder. + - decoder_version: The version of the decoder. + - decoder_author: The author of the decoder. + """ from malwareconfig.common import Decoder dec_modules = {} @@ -39,6 +58,28 @@ def ratdecodedr_load_decoders(path: str): def cape_load_custom_decoders(CUCKOO_ROOT: str): + """ + Loads custom decoders for CAPE from specified directories within the CUCKOO_ROOT path. + + This function searches for Python modules in the "modules/processing/parsers/CAPE" and + "custom/parsers" directories within the CUCKOO_ROOT path. It imports these modules and + stores them in a dictionary where the keys are the module names with spaces replaced by + underscores, and the values are the imported modules. + + Args: + CUCKOO_ROOT (str): The root directory of the CUCKOO installation. + + Returns: + dict: A dictionary where the keys are the names of the decoders and the values are + the imported modules. + + Raises: + ImportError: If a module cannot be imported. + IndexError: If there is an indexing error during module import. + AttributeError: If there is an attribute error during module import. + SyntaxError: If there is a syntax error in the module code. + Exception: For any other exceptions that occur during module import. + """ cape_modules = {} cape_decoders = os.path.join(CUCKOO_ROOT, "modules", "processing", "parsers", "CAPE") @@ -72,6 +113,19 @@ def cape_load_custom_decoders(CUCKOO_ROOT: str): def malduck_load_decoders(CUCKOO_ROOT: str): + """ + Loads and imports malduck decoder modules from the specified CUCKOO_ROOT directory. + + Args: + CUCKOO_ROOT (str): The root directory of the CUCKOO installation. + + Returns: + dict: A dictionary where the keys are the names of the decoder modules and the values are the imported module objects. + + Raises: + ImportError: If a module cannot be imported. + IndexError: If there is an issue with the module name. + """ malduck_modules = {} malduck_decoders = os.path.join(CUCKOO_ROOT, "modules", "processing", "parsers", "malduck") @@ -87,6 +141,25 @@ def malduck_load_decoders(CUCKOO_ROOT: str): def file_extra_info_load_modules(CUCKOO_ROOT: str): + """ + Loads extra file information modules from the specified CUCKOO_ROOT directory. + + This function searches for Python modules in the "file_extra_info_modules" directory + within the given CUCKOO_ROOT path. It imports and returns a list of modules that are + enabled based on their internal configuration or the selfextract_conf settings. + + Args: + CUCKOO_ROOT (str): The root directory of the CUCKOO installation. + + Returns: + list: A list of imported modules that are enabled. If the directory does not exist, + an empty list is returned. + + Raises: + ImportError: If a module cannot be imported. + IndexError: If there is an indexing error during module import. + AttributeError: If an attribute is missing during module import. + """ file_extra_modules = [] extra_modules = os.path.join(CUCKOO_ROOT, "lib", "cuckoo", "common", "integrations", "file_extra_info_modules") if not Path(extra_modules).exists(): diff --git a/lib/cuckoo/common/mapTTPs.py b/lib/cuckoo/common/mapTTPs.py index 0aecf218413..186f8b94e9b 100644 --- a/lib/cuckoo/common/mapTTPs.py +++ b/lib/cuckoo/common/mapTTPs.py @@ -15,6 +15,19 @@ # Read the config file def mapTTP(oldTTPs: list, mbcs: list): + """ + Maps old TTPs (Tactics, Techniques, and Procedures) to a new format and groups them by signature. + + Args: + oldTTPs (list): A list of dictionaries containing old TTPs. Each dictionary should have a "ttp" key. + mbcs (list): A list of MBCs (Malware Behavior Catalog) mapped by signature. + + Returns: + list: A list of dictionaries where each dictionary contains: + - "signature" (str): The signature of the TTP. + - "ttps" (list): A list of unique TTPs associated with the signature. + - "mbcs" (list): A list of MBCs associated with the signature. + """ ttpsList = [] grouped_ttps = defaultdict(list) diff --git a/lib/cuckoo/common/quarantine.py b/lib/cuckoo/common/quarantine.py index 93c31e503c5..1d39dadcb06 100644 --- a/lib/cuckoo/common/quarantine.py +++ b/lib/cuckoo/common/quarantine.py @@ -20,7 +20,7 @@ HAVE_OLEFILE = True except ImportError: HAVE_OLEFILE = False - print("Missed olefile dependency: pip3 install olefile") + print("Missed olefile dependency: poetry run pip install olefile") def bytearray_xor(data, key): diff --git a/lib/cuckoo/common/replace_patterns_utils.py b/lib/cuckoo/common/replace_patterns_utils.py index b2f3cd30fb0..bc83ed1fbd5 100644 --- a/lib/cuckoo/common/replace_patterns_utils.py +++ b/lib/cuckoo/common/replace_patterns_utils.py @@ -59,7 +59,6 @@ def check_deny_pattern(container: list, pattern: str): return if any(deny_file in pattern for deny_file in FILES_DENYLIST): return - # import code;code.interact(local=dict(locals(), **globals())) if pattern.endswith(FILES_ENDING_DENYLIST): return if not _is_regkey_ok(pattern): diff --git a/lib/cuckoo/common/scoring.py b/lib/cuckoo/common/scoring.py index 00f18926abd..e014ae81923 100644 --- a/lib/cuckoo/common/scoring.py +++ b/lib/cuckoo/common/scoring.py @@ -1,4 +1,28 @@ def calc_scoring(results: dict, matched: list): + """ + Calculate the final malware score and status based on the analysis results and matched signatures. + + The scoring is determined by the type of file and the categories of signatures it triggers. The methodology is as follows: + 1. Malicious-Known: The file is detected by YARA. + - Score: 10/10 (Malicious) + 2. Malicious-Unknown: The file triggers signatures with specific malicious categories. + - Categories: ["malware", "ransomware", "infostealer", "rat", "trojan", "rootkit", "bootkit", "wiper", "banker", "bypass", "anti-sandbox", "keylogger"] + - Score: 7-9/10 (Malicious) + 3. Suspicious-Unknown: The file triggers signatures with specific suspicious categories. + - Categories: ["network", "encryption", "anti-vm", "anti-analysis", "anti-av", "anti-debug", "anti-emulation", "persistence", "stealth", "discovery", "injection", "generic", "account", "bot", "browser", "allocation", "command", "execution"] + - Score: 4-6/10 (Suspicious) + 4. Benign: The file is likely trusted and digitally signed. + - Score: 0-3/10 (Benign) + 5. Undetected/Failed: The file does not trigger any signatures. + - Score: 0/10 (Undetected/Failed) + + Parameters: + results (dict): The analysis results containing details about the file and its behavior. + matched (list): A list of matched signatures with their categories, severity, confidence, and weight. + + Returns: + tuple: A tuple containing the final malware score (float) and the status (str). + """ finalMalscore = 0.0 status = None fileType = results.get("target", {}).get("file", {}).get("type") diff --git a/lib/cuckoo/common/socket_utils.py b/lib/cuckoo/common/socket_utils.py index 73613aa9122..610a4e4c706 100644 --- a/lib/cuckoo/common/socket_utils.py +++ b/lib/cuckoo/common/socket_utils.py @@ -12,6 +12,22 @@ def send_socket_command(socket_path: str, command: str, *args, **kwargs): + """ + Sends a command via a Unix domain socket to a root-executed component. + + Args: + socket_path (str): The path to the Unix domain socket. + command (str): The command to send. + args: Additional positional arguments to include in the command. + kwargs: Additional keyword arguments to include in the command. + + Returns: + dict: The response from the socket, parsed from JSON. If there is a timeout or connection error, + a dictionary with an "exception" key will be returned. + + Logs: + Critical errors if the socket path does not exist or if unable to connect to the Unix socket. + """ """Aux function to send commands via socket to root executed components""" if not path_exists(socket_path): log.critical("Unable to passthrough root command (%s) as the rooter unix socket: %s doesn't exist", socket_path, command) diff --git a/lib/cuckoo/common/suricata_detection.py b/lib/cuckoo/common/suricata_detection.py index fd5c7521e37..661e40c92f1 100644 --- a/lib/cuckoo/common/suricata_detection.py +++ b/lib/cuckoo/common/suricata_detection.py @@ -1,7 +1,4 @@ -try: - import re2 as re -except ImportError: - import re +import re suricata_passlist = ( "agenttesla", @@ -83,6 +80,7 @@ def get_suricata_family(signature): """ + Extracts the family name from a Suricata alert string. Args: signature: suricata alert string Return diff --git a/lib/cuckoo/common/web_utils.py b/lib/cuckoo/common/web_utils.py index ea68e5db0e5..aaee95d7aa3 100644 --- a/lib/cuckoo/common/web_utils.py +++ b/lib/cuckoo/common/web_utils.py @@ -172,7 +172,23 @@ def my_rate_minutes(group, request): _load_vms_exits_lock = threading.Lock() -def load_vms_exits(force=False): +def load_vms_exits(force:bool = False): + """ + Load the VM exits information. + + This function loads the VM exit nodes information and stores it in the global + variable `_all_nodes_exits`. If the information is already loaded and the + `force` parameter is not set to True, it returns the cached information. + Otherwise, it reloads the information. + + Args: + force (bool): If set to True, forces the reloading of the VM exits + information even if it is already loaded. Default is False. + + Returns: + dict: A dictionary where the keys are exit node names and the values are + lists of node names associated with each exit node. + """ global _all_nodes_exits with _load_vms_exits_lock: if _all_nodes_exits is not None and not force: @@ -196,7 +212,22 @@ def load_vms_exits(force=False): _load_vms_tags_lock = threading.Lock() -def load_vms_tags(force=False): +def load_vms_tags(force:bool=False): + """ + Load and return the tags associated with all virtual machines (VMs). + + This function retrieves tags from both a distributed database (if enabled) + and a local database, combines them, and returns a sorted list of unique tags. + The result is cached globally and can be forced to refresh by setting the + `force` parameter to True. + + Args: + force (bool): If True, forces the function to reload the tags from the + databases even if they are already cached. Default is False. + + Returns: + list: A sorted list of unique tags associated with all VMs. + """ global _all_vms_tags with _load_vms_tags_lock: if _all_vms_tags is not None and not force: @@ -220,6 +251,19 @@ def load_vms_tags(force=False): def top_asn(date_since: datetime = False, results_limit: int = 20) -> dict: + """ + Retrieves the top Autonomous System Numbers (ASNs) based on the number of occurrences in the database. + + This function queries a MongoDB collection to aggregate and count the occurrences of ASNs in the network hosts. + The results are cached for 10 minutes to improve performance. + + Args: + date_since (datetime, optional): A datetime object to filter results starting from this date. Defaults to False. + results_limit (int, optional): The maximum number of ASNs to return. Defaults to 20. + + Returns: + dict: A dictionary containing the top ASNs and their counts. Returns False if the MongoDB is not enabled or if the "top_asn" configuration is disabled. + """ if web_cfg.general.get("top_asn", False) is False: return False @@ -334,7 +378,7 @@ def top_detections(date_since: datetime = False, results_limit: int = 20) -> dic # ToDo extend this to directly extract per day -def get_stats_per_category(category: str, date_since): +def get_stats_per_category(category: str, date_since: datetime) -> List[Dict[str, int]]: """ Retrieves statistical data for a given category from the MongoDB collection "analysis" starting from a specified date. @@ -531,7 +575,7 @@ def statistics(s_days: int) -> dict: # Same jsonize function from api.py except we can now return Django # HttpResponse objects as well. (Shortcut to return errors) -def jsonize(data, response=False): +def jsonize(data: dict, response: bool=False): """Converts data dict to JSON. @param data: data dict @return: JSON formatted data or HttpResponse object with json data @@ -542,7 +586,16 @@ def jsonize(data, response=False): return json.dumps(data, sort_keys=False, indent=4) -def get_file_content(paths): +def get_file_content(paths: list) -> bytes: + """ + Retrieves the content of the first existing file from a list of file paths. + + Args: + paths (str or list of str): A single file path or a list of file paths to check. + + Returns: + bytes or bool: The content of the first existing file as bytes, or False if no file exists. + """ content = False if not isinstance(paths, list): paths = [paths] @@ -554,7 +607,7 @@ def get_file_content(paths): return content -def fix_section_permission(path): +def fix_section_permission(path: str): """ Adjusts the permissions of the .rdata section in a PE file to include write permissions. @@ -583,7 +636,22 @@ def fix_section_permission(path): log.info(e) -def get_magic_type(data): +def get_magic_type(data: bytes) -> str: + """ + Determine the MIME type of the given data using the `magic` library. + + This function attempts to identify the MIME type of the provided data. If the data + represents a file path and the file exists, it uses `magic.from_file` to determine + the MIME type. Otherwise, it uses `magic.from_buffer` to determine the MIME type + from the data buffer. + + Args: + data (bytes): The data to analyze, which can be a file path or a data buffer. + + Returns: + str: The MIME type of the data if successfully determined. + bool: False if an error occurs during MIME type determination. + """ try: if path_exists(data): return magic.from_file(data) @@ -596,7 +664,30 @@ def get_magic_type(data): def download_file(**kwargs): - """Example of kwargs + """ + Downloads a file based on the provided arguments and handles various conditions and errors. + + Keyword Arguments: + errors (list): List to store error messages. + content (bytes): Content of the file to be downloaded. + request (object): Request object containing details of the request. + task_id (list): List to store task IDs. + url (str): URL to download the file from. + params (dict): Parameters to be sent in the request. + headers (dict): Headers to be sent in the request. + service (str): Name of the service to download the file from. + path (str): Path to save the downloaded file. + fhash (str): Expected hash of the file to verify integrity. + options (str): Additional options for the download. + only_extraction (bool): Flag to indicate if only extraction is needed. + user_id (int): ID of the user requesting the download. + source_url (str): Source URL of the file. + + Returns: + tuple: A tuple containing the status ("ok" or "error") and a dictionary with task IDs and errors. + """ + """ + Example of kwargs { "errors": [], "content": content, @@ -818,7 +909,7 @@ def download_file(**kwargs): return "ok", {"task_ids": kwargs["task_ids"], "errors": extra_details.get("errors", [])} -def save_script_to_storage(task_ids, kwargs): +def save_script_to_storage(task_ids: list, kwargs): """ Save pre_script and during_script contents to a temporary storage. @@ -857,14 +948,46 @@ def save_script_to_storage(task_ids, kwargs): _ = Path(os.path.join(script_temp_path, f"during_script{file_ext}")).write_bytes(kwargs["during_script_content"]) -def url_defang(url): +def url_defang(url: str): + """ + Defangs a given URL by replacing common defanged components with their original counterparts. + + This function performs the following replacements: + - "[.]" with "." + - "[." with "." + - ".]" with "." + - "hxxp" with "http" + - "hxtp" with "http" + + Additionally, if the URL does not start with "http", it prepends "http://" to the URL. + + Args: + url (str): The defanged URL to be processed. + + Returns: + str: The refanged URL. + """ url = url.replace("[.]", ".").replace("[.", ".").replace(".]", ".").replace("hxxp", "http").replace("hxtp", "http") if not url.startswith("http"): url = f"http://{url}" return url -def _download_file(route, url, options): +def _download_file(route: str, url: str, options: str): + """ + Downloads a file from the specified URL using optional proxy settings and custom headers. + + Args: + route (str): The route to determine proxy settings. If "tor", uses Tor network. + If in socks5s, uses the specified SOCKS5 proxy settings. + url (str): The URL of the file to download. + options (str): Comma-separated string of options to customize headers. + Options starting with "dne_" will be added to headers. + + Returns: + bytes: The content of the downloaded file if the request is successful. + bool: False if the request fails or an exception occurs. + """ socks5s = _load_socks5_operational() proxies = {} response = False @@ -901,7 +1024,23 @@ def _download_file(route, url, options): return response -def category_all_files(task_id, category, base_path): +def category_all_files(task_id: str, category:str, base_path:str): + """ + Retrieve all file paths for a given task and category. + + Args: + task_id (str): The ID of the task to retrieve files for. + category (str): The category of files to retrieve. Special handling for "CAPE" category. + base_path (str): The base path to prepend to the file paths. + + Returns: + list: A list of file paths corresponding to the given task and category. + + Notes: + - If the category is "CAPE", it will be internally mapped to "CAPE.payloads". + - The function currently supports MongoDB as the database backend. + - Elasticsearch support is mentioned but not implemented. + """ analysis = False query_category = category if category == "CAPE": @@ -1107,7 +1246,22 @@ def validate_task_by_path(tid): ) -def perform_search(term, value, search_limit=False, user_id=False, privs=False, web=True, projection=None): +def perform_search(term: str, value: str, search_limit: int=0, user_id: int=0, privs:bool=False, web:bool=True, projection: dict=None): + """ + Perform a search based on the provided term and value. + + Args: + term (str): The search term to use. + value (str): The value to search for. + search_limit (int, optional): The maximum number of search results to return. Defaults to 0. + user_id (int, optional): The user ID to filter tasks by. Defaults to 0. + privs (bool, optional): Indicates if the user has privileges. Defaults to False. + web (bool, optional): Indicates if the search is performed via the web interface. Defaults to True. + projection (dict, optional): Fields to include or exclude in the search results. Defaults to None. + + Returns: + list: A list of search results matching the criteria. + """ if repconf.mongodb.enabled and repconf.elasticsearchdb.enabled and essearch and not term: multi_match_search = {"query": {"multi_match": {"query": value, "fields": ["*"]}}} numhits = es.search(index=get_analysis_index(), body=multi_match_search, size=0)["hits"]["total"] @@ -1239,6 +1393,20 @@ def force_int(value): def force_bool(value): + """ + Converts a given value to a boolean. + + Args: + value: The value to be converted. It can be of any type. + + Returns: + bool: The boolean representation of the input value. Returns True if the value is one of + ("true", "yes", "on", "1") (case insensitive). Returns False if the value is one of + ("false", "no", "off", "0") (case insensitive), or if the value is None or empty. + + Logs: + A warning is logged if the value cannot be converted from string to bool. + """ if isinstance(value, bool): return value @@ -1255,6 +1423,38 @@ def force_bool(value): def parse_request_arguments(request, keyword="POST"): + """ + Parses request arguments from a Django or API request object. + + Args: + request (HttpRequest): The request object containing the arguments. + keyword (str, optional): The attribute of the request object to extract arguments from. Defaults to "POST". + + Returns: + tuple: A tuple containing the following parsed arguments: + - static (str): Static argument. + - package (str): Package argument. + - timeout (int): Timeout argument. + - priority (int): Priority argument. + - options (str): Options argument. + - machine (str): Machine argument. + - platform (str): Platform argument. + - tags (str): Tags argument. + - custom (str): Custom argument. + - memory (bool): Memory argument. + - clock (str): Clock argument. + - enforce_timeout (bool): Enforce timeout argument. + - shrike_url (str): Shrike URL argument. + - shrike_msg (str): Shrike message argument. + - shrike_sid (str): Shrike SID argument. + - shrike_refer (str): Shrike refer argument. + - unique (bool): Unique argument. + - referrer (str): Referrer argument. + - tlp (str): TLP argument. + - tags_tasks (str): Tags tasks argument. + - route (str): Route argument. + - cape (str): CAPE argument. + """ # Django uses request.POST and API uses request.data static = getattr(request, keyword).get("static", "") referrer = validate_referrer(getattr(request, keyword).get("referrer")) @@ -1319,7 +1519,16 @@ def parse_request_arguments(request, keyword="POST"): ) -def get_hash_list(hashes): +def get_hash_list(hashes: str) -> list: + """ + Parses a string of hashes separated by commas or spaces and returns a list of cleaned hash values. + + Args: + hashes (str): A string containing hash values separated by commas or spaces. + + Returns: + list: A list of cleaned hash values. If a hash value is a URL ending with a slash, the hash is extracted from the URL. + """ hashlist = [] if "," in hashes: hashlist = list(filter(None, hashes.replace(" ", "").strip().split(","))) @@ -1342,28 +1551,61 @@ def get_hash_list(hashes): } -def _malwarebazaar_dl(hash): - sample = None +def _malwarebazaar_dl(hash: str) -> bytes: + """ + Downloads a malware sample from MalwareBazaar using the provided hash. + + Args: + hash (str): The hash of the malware sample to download. The hash can be an MD5, SHA1, or SHA256. + Returns: + bytes: The downloaded malware sample as bytes, or None if the sample could not be downloaded. + + Raises: + Exception: If there is an error during the download or extraction process, it will be logged. + """ + sample = b"" try: data = requests.post( "https://mb-api.abuse.ch/api/v1/", data={"query": "get_file", _bazaar_map[len(hash)]: hash}, headers={"API-KEY": web_cfg.download_services.malwarebazaar_api_key, "User-Agent": "CAPE Sandbox"}, ) - if data.ok and b"file_not_found" not in data.content: + if data.ok: try: - with pyzipper.AESZipFile(io.BytesIO(data.content)) as zf: + if isinstance(data.content, bytes): + if b"file_not_found" not in data.content[:50]: + return sample + tmp_sample = io.BytesIO(data.content) + elif isinstance(data.content, io.BytesIO): + tmp_sample = data.content + else: + return sample + with pyzipper.AESZipFile(tmp_sample) as zf: zf.setpassword(b"infected") sample = zf.read(zf.namelist()[0]) except pyzipper.zipfile.BadZipFile: - print(data.content) + print("_malwarebazaar_dl", data.content[:100]) except Exception as e: logging.error(e, exc_info=True) return sample -def thirdpart_aux(samples, prefix, opt_filename, details, settings): +def thirdpart_aux(samples: str, prefix: str, opt_filename:str, details: dict, settings) -> dict: + """ + Processes a list of samples by downloading or retrieving their content from local storage, + and updates the details dictionary with the file path, hash, and other relevant information. + + Args: + samples (list): A list of sample hashes to process. + prefix (str): A prefix indicating the source of the samples (e.g., "vt" for VirusTotal, "bazaar" for MalwareBazaar). + opt_filename (str): An optional filename to use for the downloaded files. If not provided, the hash will be used as the filename. + details (dict): A dictionary to store details about the processed samples, including path, hash, content, errors, etc. + settings (object): An object containing configuration settings, including the temporary path. + + Returns: + dict: The updated details dictionary with information about the processed samples, including any errors encountered. + """ folder = os.path.join(settings.TEMP_PATH, "cape-external") if not path_exists(folder): path_mkdir(folder, exist_ok=True) @@ -1411,7 +1653,7 @@ def thirdpart_aux(samples, prefix, opt_filename, details, settings): return details -def download_from_vt(samples, details, opt_filename, settings): +def download_from_vt(samples:str, details:dict, opt_filename:str, settings) -> dict: """ Downloads samples from VirusTotal using the provided API key. @@ -1438,7 +1680,7 @@ def download_from_vt(samples, details, opt_filename, settings): return thirdpart_aux(samples, "vt", opt_filename, details, settings) -def download_from_bazaar(samples, details, opt_filename, settings): +def download_from_bazaar(samples:str, details:dict, opt_filename:str, settings): """ Downloads samples from MalwareBazaar. @@ -1462,7 +1704,31 @@ def download_from_bazaar(samples, details, opt_filename, settings): return thirdpart_aux(samples, "bazaar", opt_filename, details, settings) -def process_new_task_files(request, samples, details, opt_filename, unique): +def process_new_task_files(request, samples:list, details:dict, opt_filename:str, unique:bool=False) -> tuple: + """ + Processes new task files by validating and storing them. + + Args: + request: The HTTP request object containing user information. + samples (list): A list of sample files to be processed. + details (dict): A dictionary to store error messages and other details. + opt_filename (str): An optional filename to use for the stored files. + unique (bool, optional): A flag to enforce unique file submission. Defaults to False. + + Returns: + tuple: A tuple containing a list of processed files and the updated details dictionary. + + The function performs the following steps: + 1. Checks if each sample file is empty and logs an error if so. + 2. Validates the size of each sample file against the configured maximum size. + 3. Reads the data from each sample file. + 4. Sanitizes the filename or uses the optional filename provided. + 5. Stores the sample file in temporary storage and calculates its SHA-256 hash. + 6. Checks for duplicate file submissions if the unique flag is set. + 7. Appends the processed file data, path, and SHA-256 hash to the list of files. + + Errors encountered during processing are appended to the details dictionary. + """ list_of_files = [] for sample in samples: # Error if there was only one submitted sample, and it's empty. @@ -1516,7 +1782,26 @@ def process_new_task_files(request, samples, details, opt_filename, unique): return list_of_files, details -def process_new_dlnexec_task(url, route, options, custom): +def process_new_dlnexec_task(url:str, route:str, options:str, custom:str): + """ + Processes a new download and execute task by downloading a file from a given URL, + sanitizing the URL, and storing the file temporarily. + + Args: + url (str): The URL of the file to download. The URL may use obfuscation techniques + such as "hxxp" instead of "http" and "[.]" instead of ".". + route (str): The route or path where the file should be downloaded. + options (dict): Additional options for downloading the file. + custom (str): Custom parameters or settings for the task. + + Returns: + tuple: A tuple containing: + - path (str): The temporary file path where the downloaded file is stored. + - response (bytes): The content of the downloaded file. + - str: An empty string (reserved for future use or additional information). + + If the download fails, returns (False, False, False). + """ url = url.replace("hxxps://", "https://").replace("hxxp://", "http://").replace("[.]", ".") response = _download_file(route, url, options) if not response: @@ -1596,7 +1881,7 @@ def submit_task( filename=filename, ) if not task_id: - log.warn("Error adding CAPE task to database: %s", package) + log.warning("Error adding CAPE task to database: %s", package) return task_id log.info('CAPE detection on file "%s": %s - added as CAPE task with ID %s', target, package, task_id) @@ -1605,6 +1890,16 @@ def submit_task( # https://stackoverflow.com/questions/14989858/get-the-current-git-hash-in-a-python-script/68215738#68215738 def get_running_commit() -> str: + """ + Retrieves the current Git commit hash of the repository. + + This function reads the HEAD file in the .git directory to determine the + current branch or commit reference, then reads the corresponding file to + get the commit hash. + + Returns: + str: The current Git commit hash as a string. + """ git_folder = Path(CUCKOO_ROOT, ".git") head_name = Path(git_folder, "HEAD").read_text().split("\n")[0].split(" ")[-1] return Path(git_folder, head_name).read_text().replace("\n", "") diff --git a/lib/cuckoo/common/webadmin_utils.py b/lib/cuckoo/common/webadmin_utils.py index 308a56bb65e..9b27dfb1692 100644 --- a/lib/cuckoo/common/webadmin_utils.py +++ b/lib/cuckoo/common/webadmin_utils.py @@ -7,6 +7,15 @@ # admin utils def disable_user(user_id: int) -> bool: + """ + Disables a user by setting their 'is_active' status to False. + + Args: + user_id (int): The ID of the user to disable. + + Returns: + bool: True if the user was successfully disabled, False otherwise. + """ user = User.objects.get(id=user_id) if user: user.is_active = False diff --git a/tests/test_cape_utils.py b/tests/test_cape_utils.py new file mode 100644 index 00000000000..e5d528d8041 --- /dev/null +++ b/tests/test_cape_utils.py @@ -0,0 +1,80 @@ +import unittest +from unittest.mock import patch, MagicMock +from lib.cuckoo.common.cape_utils import cape_name_from_yara +from lib.cuckoo.common.cape_utils import static_config_parsers + +class TestCapeUtils(unittest.TestCase): + @patch("lib.cuckoo.common.cape_utils.File.yara_hit_provides_detection") + @patch("lib.cuckoo.common.cape_utils.File.get_cape_name_from_yara_hit") + def test_cape_name_from_yara(self, mock_get_cape_name_from_yara_hit, mock_yara_hit_provides_detection): + details = { + "cape_yara": [ + {"rule": "test_rule_1"}, + {"rule": "test_rule_2"} + ] + } + pid = 1234 + results = {} + + mock_yara_hit_provides_detection.side_effect = [False, True] + mock_get_cape_name_from_yara_hit.return_value = "test_name" + + name = cape_name_from_yara(details, pid, results) + + self.assertEqual(name, "test_name") + self.assertIn("detections2pid", results) + self.assertIn(str(pid), results["detections2pid"]) + self.assertIn("test_name", results["detections2pid"][str(pid)]) + + @patch("lib.cuckoo.common.cape_utils.File.yara_hit_provides_detection") + def test_cape_name_from_yara_no_detection(self, mock_yara_hit_provides_detection): + details = { + "cape_yara": [ + {"rule": "test_rule_1"} + ] + } + pid = 1234 + results = {} + + mock_yara_hit_provides_detection.return_value = False + + name = cape_name_from_yara(details, pid, results) + + self.assertIsNone(name) + self.assertNotIn("detections2pid", results) + + def test_cape_name_from_yara_no_cape_yara(self): + details = {} + pid = 1234 + results = {} + + name = cape_name_from_yara(details, pid, results) + + self.assertIsNone(name) + self.assertNotIn("detections2pid", results) + +class TestStaticConfigParsers(unittest.TestCase): + @patch("lib.cuckoo.common.cape_utils.HAVE_CAPE_EXTRACTORS", True) + @patch("lib.cuckoo.common.cape_utils.cape_malware_parsers") + def test_static_config_parsers_cape_extractors(self, mock_cape_malware_parsers): + cape_name = "test_cape" + file_path = "/path/to/file" + file_data = b"test data" + mock_parser = MagicMock() + mock_parser.extract_config.return_value = {"key": "value"} + mock_cape_malware_parsers.__contains__.return_value = True + mock_cape_malware_parsers.__getitem__.return_value = mock_parser + result = static_config_parsers(cape_name, file_path, file_data) + self.assertIn(cape_name, result) + self.assertIn("key", result[cape_name]) + self.assertEqual(result[cape_name]["key"], ["value"]) + + def test_static_config_parsers_no_extractors(self): + cape_name = "test_none" + file_path = "/path/to/file" + file_data = b"test data" + result = static_config_parsers(cape_name, file_path, file_data) + self.assertEqual(result, {}) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_web_utils.py b/tests/test_web_utils.py index b52c5fd5d56..de690235e1e 100644 --- a/tests/test_web_utils.py +++ b/tests/test_web_utils.py @@ -1,14 +1,24 @@ # Copyright (C) 2010-2015 Cuckoo Foundation. # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org # See the file 'docs/LICENSE' for copying permission. - +import io import tempfile +import unittest +from unittest.mock import MagicMock, patch import httpretty import pytest - +import pyzipper +import requests from lib.cuckoo.common.path_utils import path_delete, path_write_file -from lib.cuckoo.common.web_utils import _download_file, force_int, get_file_content, parse_request_arguments +from lib.cuckoo.common.web_utils import ( + _download_file, + _malwarebazaar_dl, + download_from_vt, + force_int, + get_file_content, + parse_request_arguments, +) @pytest.fixture @@ -90,3 +100,103 @@ def test_parse_request_arguments(mock_request): def test_force_int(): assert force_int(value="1") == 1 assert force_int(value="$") == 0 + + +class TestMalwareBazaarDownload(unittest.TestCase): + @patch('requests.post') + def test_malwarebazaar_dl_success(self, mock_post): + # Mock the response from requests.post + mock_response = MagicMock() + mock_response.ok = True + mock_response.content = io.BytesIO() + with pyzipper.AESZipFile(mock_response.content, 'w', encryption=pyzipper.WZ_AES) as zf: + zf.setpassword(b"infected") + zf.writestr('sample.txt', 'sample content') + mock_post.return_value = mock_response + + # Call the function + result = _malwarebazaar_dl('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + + # Check the result + self.assertEqual(result, b'sample content') + + @patch('requests.post') + def test_malwarebazaar_dl_file_not_found(self, mock_post): + # Mock the response from requests.post + mock_response = MagicMock() + mock_response.ok = True + mock_response.content = b'file_not_found' + mock_post.return_value = mock_response + + # Call the function + result = _malwarebazaar_dl('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + + # Check the result + assert not result + + @patch('requests.post') + def test_malwarebazaar_dl_bad_zip_file(self, mock_post): + # Mock the response from requests.post + mock_response = MagicMock() + mock_response.ok = True + mock_response.content = b'not a zip file' + mock_post.return_value = mock_response + + # Call the function + result = _malwarebazaar_dl('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + + # Check the result + assert not result + + @patch('requests.post') + def test_malwarebazaar_dl_exception(self, mock_post): + # Mock the response from requests.post to raise an exception + mock_post.side_effect = requests.exceptions.RequestException + + # Call the function + result = _malwarebazaar_dl('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + + # Check the result + assert not result + +class TestDownloadFromVT(unittest.TestCase): + @patch('lib.cuckoo.common.web_utils.thirdpart_aux') + def test_download_from_vt_with_vtdl_key(self, mock_thirdpart_aux): + settings = MagicMock() + settings.VTDL_KEY = 'dummy_vtdl_key' + details = {"errors": []} + samples = ['sample1'] + opt_filename = 'sample.txt' + mock_thirdpart_aux.return_value = details + result = download_from_vt(samples, details, opt_filename, settings) + self.assertEqual(result["headers"]["x-apikey"], 'dummy_vtdl_key') + self.assertEqual(result["service"], "VirusTotal") + mock_thirdpart_aux.assert_called_once_with(samples, "vt", opt_filename, details, settings) + + @patch('lib.cuckoo.common.web_utils.thirdpart_aux') + def test_download_from_vt_with_apikey_in_details(self, mock_thirdpart_aux): + settings = MagicMock() + settings.VTDL_KEY = None + details = {"apikey": "dummy_apikey", "errors": []} + samples = ['sample1'] + opt_filename = 'sample.txt' + mock_thirdpart_aux.return_value = details + result = download_from_vt(samples, details, opt_filename, settings) + self.assertEqual(result["headers"]["x-apikey"], 'dummy_apikey') + self.assertEqual(result["service"], "VirusTotal") + mock_thirdpart_aux.assert_called_once_with(samples, "vt", opt_filename, details, settings) + + def test_download_from_vt_no_apikey(self): + settings = MagicMock() + settings.VTDL_KEY = None + details = {"errors": []} + samples = ['sample1'] + opt_filename = 'sample.txt' + result = download_from_vt(samples, details, opt_filename, settings) + self.assertIn({"error": "Apikey not configured, neither passed as opt_apikey"}, result["errors"]) + self.assertNotIn("headers", result) + self.assertNotIn("service", result) + + +if __name__ == '__main__': + unittest.main()