diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 690c677..604a37f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,8 +25,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 pytest pytest-mock mock + python -m pip install flake8 pytest pytest-mock mock mypy if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f requirements_sigma.txt ]; then pip install -r requirements_sigma.txt; fi + if [ -f requirements.types.txt ]; then pip install -r requirements.types.txt; fi - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names @@ -35,4 +37,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest + pytest -vv + - name: Type check with mypy + run: | + mypy ./ --exclude 'setup\.py' --exclude 'tests/' \ No newline at end of file diff --git a/README.md b/README.md index 8bc1087..296b21c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Surveyor is a Python utility that queries Endpoint Detection and Response (EDR) products and summarizes the results. Security and IT teams can use Surveyor to baseline their environments and identify abnormal activity. -## Current Version: 2.4 +## Current Version: 2.4.1 Version 2.0 introduced breaking changes to the command line interface and support for SentinelOne. If you are looking for the prior version of Surveyor, see [past releases](https://github.com/redcanaryco/surveyor/releases). @@ -43,12 +43,18 @@ page of the wiki. #### Running the `sysinternals` definition file using the `cbr` product: -``` +```bash surveyor.py --deffile sysinternals cbr ``` #### Running the `sysinternals` definition file using the `dfe` product: -``` +```bash surveyor.py --deffile sysinternals dfe --creds dfe_creds.ini +``` + +#### Running a Sigma rule file using the `cbc` product: + +```bash +surveyor.py --sigmarule /path/to/sigma/rule.yml cbc ``` \ No newline at end of file diff --git a/common.py b/common.py index 40cba2a..8d18f44 100644 --- a/common.py +++ b/common.py @@ -136,3 +136,47 @@ def _echo(self, message: str, level: int = logging.DEBUG): Write a message to STDOUT and the debug log stream. """ log_echo(message, self.log, level, use_tqdm=self._tqdm_echo) + +def sigma_translation(product: str, sigma_rules: list) -> dict: + supports_json_ouput = True + + try: + from sigma.collection import SigmaCollection # type: ignore + from sigma.plugins import SigmaPluginDirectory # type: ignore + plugins = SigmaPluginDirectory.default_plugin_directory() + except Exception as e: + raise e + + if product in ('cbr','cbc'): + plugins.get_plugin_by_id('carbonblack').install() + from sigma.backends.carbonblack import CarbonBlackBackend # type: ignore + + if product == 'cbr': + from sigma.pipelines.carbonblack import CarbonBlackResponse_pipeline as cb_pipeline # type: ignore + else: + from sigma.pipelines.carbonblack import CarbonBlack_pipeline as cb_pipeline # type: ignore + + backend = CarbonBlackBackend(cb_pipeline()) + elif product == 's1': + plugins.get_plugin_by_id('sentinelone').install() + from sigma.backends.sentinel_one import SentinelOneBackend # type: ignore + backend = SentinelOneBackend() + elif product == 'dfe': + supports_json_ouput = False + plugins.get_plugin_by_id('microsoft365defender').install() + from sigma.backends.microsoft365defender import Microsoft365DefenderBackend # type: ignore + backend = Microsoft365DefenderBackend() + + rule_collection = SigmaCollection.load_ruleset(sigma_rules) + if supports_json_ouput: + return backend.convert(rule_collection, "json") + else: + results: dict = {"queries":[]} + for r in rule_collection: + results['queries'].append({ + 'query': backend.convert_rule(r)[0], + 'id': r.id, + 'title': r.title, + 'description': r.description + }) + return results diff --git a/products/vmware_cb_enterprise_edr.py b/products/vmware_cb_enterprise_edr.py index 23d6412..44e33c5 100644 --- a/products/vmware_cb_enterprise_edr.py +++ b/products/vmware_cb_enterprise_edr.py @@ -2,10 +2,10 @@ import logging from typing import Generator -import cbc_sdk.errors -from cbc_sdk.rest_api import CBCloudAPI -from cbc_sdk.platform import Process -from cbc_sdk.base import QueryBuilder +import cbc_sdk.errors # type: ignore +from cbc_sdk.rest_api import CBCloudAPI # type: ignore +from cbc_sdk.platform import Process # type: ignore +from cbc_sdk.base import QueryBuilder # type: ignore from common import Product, Result, Tag diff --git a/products/vmware_cb_response.py b/products/vmware_cb_response.py index cbc2fb6..b901a16 100644 --- a/products/vmware_cb_response.py +++ b/products/vmware_cb_response.py @@ -1,7 +1,7 @@ import logging -from cbapi.response import CbEnterpriseResponseAPI -from cbapi.response.models import Process +from cbapi.response import CbEnterpriseResponseAPI # type: ignore +from cbapi.response.models import Process # type: ignore from common import Product, Tag, Result diff --git a/requirements.types.txt b/requirements.types.txt new file mode 100644 index 0000000..cc2aa60 --- /dev/null +++ b/requirements.types.txt @@ -0,0 +1,2 @@ +types-requests +types-tqdm \ No newline at end of file diff --git a/requirements_sigma.txt b/requirements_sigma.txt new file mode 100644 index 0000000..39d4461 --- /dev/null +++ b/requirements_sigma.txt @@ -0,0 +1 @@ +pySigma>=0.9.5 \ No newline at end of file diff --git a/setup.py b/setup.py index 80daa0e..98dc934 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def find_scripts(): packages=find_packages(), scripts=find_scripts(), description='Extracts summarized process data from EDR platforms', - version='2.4.0', + version='2.4.1', classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', @@ -34,8 +34,13 @@ def find_scripts(): 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', 'Programming Language :: Python', - ], + ], install_requires=[ 'cbapi==1.7.0', 'click', 'requests', 'tqdm', 'carbon-black-cloud-sdk' + ], + extras_require={ + "sigma": [ + "pysigma>=0.9.5" ] - ) + } +) \ No newline at end of file diff --git a/surveyor.py b/surveyor.py index 2fc0c0a..3413e0e 100644 --- a/surveyor.py +++ b/surveyor.py @@ -11,12 +11,12 @@ import json import logging import os -from typing import Optional, Tuple, Callable +from typing import Optional, Tuple, Callable, Any import click from tqdm import tqdm -from common import Tag, Result +from common import Tag, Result, sigma_translation from help import log_echo from load import get_product_instance, get_products @@ -24,7 +24,7 @@ CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help", "-what-am-i-doing"]) # Application version -current_version = "2.1.0" +current_version = "2.4.1" def _list_products(ctx, _, value) -> None: @@ -46,13 +46,13 @@ def _list_products(ctx, _, value) -> None: f'{{:<{table_template[3]}}}' -def _write_results(output: Optional[csv.writer], results: list[Result], program: str, source: str, +def _write_results(output: Optional[Any], results: list[Result], program: str, source: str, tag: Tag, log: logging.Logger, use_tqdm: bool = False) -> None: """ Helper function for writing search results to CSV or STDOUT. """ if output: - if isinstance(tag, Tuple): + if isinstance(tag, tuple): tag = tag[0] if len(results) > 0: @@ -91,6 +91,8 @@ class ExecutionOptions: output: Optional[str] def_dir: Optional[str] def_file: Optional[str] + sigma_rule: Optional[str] + sigma_dir: Optional[str] no_file: bool no_progress: bool log_dir: str @@ -112,6 +114,8 @@ class ExecutionOptions: @click.option("--query", help="A single query to execute.") @click.option("--iocfile", 'ioc_file', help="IOC file to process. One IOC per line. REQUIRES --ioctype") @click.option("--ioctype", 'ioc_type', help="One of: ipaddr, domain, md5") +@click.option("--sigmarule", 'sigma_rule', help="Sigma rule file to process (must be in YAML format).", type=click.STRING) +@click.option("--sigmadir", 'sigma_dir', help='Directory containing multiple sigma rule files.', type=click.STRING) # optional output @click.option("--output", "--o", help="Specify the output file for the results. " "The default is create survey.csv in the current directory.") @@ -126,17 +130,18 @@ def cli(ctx, prefix: Optional[str], hostname: Optional[str], profile: str, days: username: Optional[str], ioc_file: Optional[str], ioc_type: Optional[str], query: Optional[str], output: Optional[str], def_dir: Optional[str], def_file: Optional[str], no_file: bool, no_progress: bool, + sigma_rule: Optional[str], sigma_dir: Optional[str], log_dir: str) -> None: ctx.ensure_object(dict) ctx.obj = ExecutionOptions(prefix, hostname, profile, days, minutes, username, ioc_file, ioc_type, query, output, - def_dir, def_file, no_file, no_progress, log_dir, dict()) + def_dir, def_file, sigma_rule, sigma_dir, no_file, no_progress, log_dir, dict()) if ctx.invoked_subcommand is None: survey(ctx, 'cbr') -# S1 options +# Cortex options @cli.command('cortex', help="Query Cortex XDR") @click.option("--creds", 'creds', help="Path to credential file", type=click.Path(exists=True), required=True) @click.pass_context @@ -147,6 +152,7 @@ def cortex(ctx, creds: Optional[str]) -> None: survey(ctx, 'cortex') + # S1 options @cli.command('s1', help="Query SentinelOne") @click.option("--site-id", help="ID of SentinelOne site to query", multiple=True, default=None) @@ -193,6 +199,7 @@ def cbr(ctx, sensor_group: Optional[Tuple]) -> None: survey(ctx, 'cbr') +# DFE options @cli.command('dfe', help="Query Microsoft Defender for Endpoints") @click.option("--creds", 'creds', help="Path to credential file", type=click.Path(exists=True), required=True) @click.pass_context @@ -209,7 +216,7 @@ def survey(ctx, product_str: str = 'cbr') -> None: ctx.fail("--iocfile requires --ioctype") if opt.ioc_file and not os.path.isfile(opt.ioc_file): - ctx.fail(f'Supplied --iocfile is not a file') + ctx.fail('Supplied --iocfile is not a file') if (opt.output or opt.prefix) and opt.no_file: ctx.fail('--output and --prefix cannot be used with --no-file') @@ -217,6 +224,18 @@ def survey(ctx, product_str: str = 'cbr') -> None: if opt.days and opt.minutes: ctx.fail('--days and --minutes are mutually exclusive') + if (opt.sigma_rule or opt.sigma_dir) and product_str == 'cortex': + ctx.fail('Neither --sigmarule nor --sigmadir are supported by product "cortex"') + + if (opt.sigma_rule or opt.sigma_dir) and product_str == 's1' and opt.product_args['pq']: + ctx.fail('Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery') + + if opt.sigma_rule and not os.path.isfile(opt.sigma_rule): + ctx.fail('Supplied --sigmarule is not a file') + + if opt.sigma_dir and not os.path.isdir(opt.sigma_dir): + ctx.fail('Supplied --sigmadir is not a directory') + # instantiate a logger log = logging.getLogger('surveyor') logging.debug(f'Product: {product_str}') @@ -262,6 +281,9 @@ def survey(ctx, product_str: str = 'cbr') -> None: # initial query is retrieved from product instance base_query = product.base_query() + # placeholder for sigma rules if --sigmarule or --sigmadir is selected + sigma_rules = list() + # add filters specified by user if opt.username is not None: base_query.update({"username": opt.username}) @@ -284,7 +306,7 @@ def survey(ctx, product_str: str = 'cbr') -> None: if not opt.no_file: # determine output file name if opt.output and opt.prefix: - log_echo(f"Output arg takes precendence so prefix arg will be ignored", log) + log_echo("Output arg takes precendence so prefix arg will be ignored", log) if opt.output: file_name = opt.output elif opt.prefix: @@ -329,6 +351,10 @@ def survey(ctx, product_str: str = 'cbr') -> None: ctx.fail("The deffile doesn't exist. Please try again.") definition_files.append(opt.def_file) + # add sigma_rule to list + if opt.sigma_rule: + sigma_rules.append(opt.sigma_rule) + # if --defdir add all files to list if opt.def_dir: if not os.path.exists(opt.def_dir): @@ -339,6 +365,13 @@ def survey(ctx, product_str: str = 'cbr') -> None: if os.path.splitext(filename)[1] == '.json': definition_files.append(os.path.join(root_dir, filename)) + # if --sigma_dir, add all files to sigma_rules list + if opt.sigma_dir: + for root_dir, dirs, files in os.walk(opt.sigma_dir): + for filename in files: + if os.path.splitext(filename)[1] == '.yml': + sigma_rules.append(os.path.join(root_dir, filename)) + # run search based on IOC file if opt.ioc_file: with open(opt.ioc_file) as ioc_file: @@ -377,6 +410,28 @@ def survey(ctx, product_str: str = 'cbr') -> None: for tag, nested_results in product.get_results().items(): _write_results(writer, nested_results, tag.tag, str(tag.data), tag, log) + # if there's sigma rules to be processed + if len(sigma_rules) > 0: + translated_rules = sigma_translation(product_str, sigma_rules) + for rule in tqdm(translated_rules['queries'], desc="Processing sigma rules", disable=opt.no_progress): + program = f"{rule['title']} - {rule['id']}" + source = 'Sigma Rule' + + product.nested_process_search(Tag(program, data=source), {'query': [rule['query']]}, base_query) + + if product.has_results(): + # write results as they become available + for tag, nested_results in product.get_results(final_call=False).items(): + _write_results(writer, nested_results, program, str(tag.data), tag, log, + use_tqdm=True) + + # ensure results are only written once + product.clear_results() + + # write any remaining results + for tag, nested_results in product.get_results().items(): + _write_results(writer, nested_results, tag.tag, str(tag.data), tag, log) + if output_file: log_echo(f"\033[95mResults saved: {output_file.name}\033[0m", log) except KeyboardInterrupt: diff --git a/tests/test_surveyor.py b/tests/test_surveyor.py index c882e4b..6b5b432 100644 --- a/tests/test_surveyor.py +++ b/tests/test_surveyor.py @@ -165,6 +165,22 @@ def test_invalid_def_file(runner, mocker): mocked_nested_process_search.assert_not_called() +def test_invalid_sigma_rule(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + result = runner.invoke(cli, ["--sigmarule", "nonexistent.yml"]) + assert "Supplied --sigmarule is not a file" in result.output + mocked_nested_process_search.assert_not_called() + + +def test_invalid_sigma_dir(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + result = runner.invoke(cli, ["--sigmadir", "./nonexistent_dir"]) + assert "Supplied --sigmadir is not a directory" in result.output + mocked_nested_process_search.assert_not_called() + + def test_ioc_file(runner, mocker): """ Verify if an IOC file is passed, it is logged and an EDR product is called @@ -265,16 +281,196 @@ def test_mutually_exclusive_output_prefix(runner, mocker): result = runner.invoke(cli, ['--prefix', 'test_prefix', '--output', 'test_output.csv']) assert "Output arg takes precendence so prefix arg will be ignored" in result.output + def test_no_file_output(runner, mocker): mocker.patch('products.vmware_cb_response.CbResponse._authenticate') default_output = 'surveyor.csv' runner.invoke(cli, ['--no_file']) assert not os.path.exists(default_output) + def test_base_query_filters_with_query(runner, mocker): mocker.patch('products.vmware_cb_response.CbResponse._authenticate') mocked_process_search = mocker.patch('products.vmware_cb_response.CbResponse.process_search') filter_args = ['--days', '5', '--hostname', 'workstation1', '--username', 'admin'] result = runner.invoke(cli, ["--query", "SELECT * FROM processes"] + filter_args) assert "Running Custom Query: SELECT * FROM processes" in result.output - mocked_process_search.assert_called_once_with(Tag('query'), {'days':5, 'hostname':'workstation1','username':'admin'}, 'SELECT * FROM processes') \ No newline at end of file + mocked_process_search.assert_called_once_with(Tag('query'), {'days':5, 'hostname':'workstation1','username':'admin'}, 'SELECT * FROM processes') + + +def test_sigma_rule(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + with runner.isolated_filesystem() as temp_dir: + sigma_file_path = os.path.join(temp_dir, "test_sigma_rule.yml") + with open(sigma_file_path, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule +id: 5fd18e43-749c-4bae-93b6-d46e1f27062e +description: Test sigma rule +logsource: + category: process_creation +detection: + selection: + - Image: 'curl.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + result = runner.invoke(cli, ["--sigmarule", sigma_file_path]) + assert "Processing sigma rules" in result.output + mocked_nested_process_search.assert_called_once_with(Tag('Test sigma rule - 5fd18e43-749c-4bae-93b6-d46e1f27062e', 'Sigma Rule'), {"query":["process_name:curl.exe"]}, {}) + + +def test_sigma_rule_with_base_query(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + filter_args = ['--days', '5', '--hostname', 'workstation1', '--username', 'admin'] + with runner.isolated_filesystem() as temp_dir: + sigma_file_path = os.path.join(temp_dir, "test_sigma_rule.yml") + with open(sigma_file_path, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule +id: 5fd18e43-749c-4bae-93b6-d46e1f27062e +description: Test sigma rule +logsource: + category: process_creation +detection: + selection: + - Image: 'curl.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + result = runner.invoke(cli, ["--sigmarule", sigma_file_path] + filter_args) + assert "Processing sigma rules" in result.output + mocked_nested_process_search.assert_called_once_with(Tag('Test sigma rule - 5fd18e43-749c-4bae-93b6-d46e1f27062e', 'Sigma Rule'), {"query":["process_name:curl.exe"]}, {'username':'admin', 'hostname':'workstation1','days':5 }) + + +def test_sigma_dir(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + with runner.isolated_filesystem() as temp_dir: + sigma_file_path1 = os.path.join(temp_dir, "test_sigma_rule1.yml") + with open(sigma_file_path1, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule +id: 5fd18e43-749c-4bae-93b6-d46e1f27062e +description: Test sigma rule +logsource: + category: process_creation +detection: + selection: + - Image: 'curl.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + + sigma_file_path2 = os.path.join(temp_dir, "test_sigma_rule2.yml") + with open(sigma_file_path2, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule 2 +id: 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c +description: Test sigma rule 2 +logsource: + category: process_creation +detection: + selection: + - Image: 'powershell.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + result = runner.invoke(cli, ["--sigmadir", temp_dir]) + + expected_calls = [mocker.call(Tag('Test sigma rule - 5fd18e43-749c-4bae-93b6-d46e1f27062e', 'Sigma Rule'), {"query":["process_name:curl.exe"]}, {}), + mocker.call(Tag('Test sigma rule 2 - 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c', 'Sigma Rule'), {"query":["process_name:powershell.exe"]}, {})] + assert "Processing sigma rules" in result.output + mocked_nested_process_search.assert_has_calls(expected_calls, any_order=True) + + +def test_sigma_dir_with_base_query(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + mocked_nested_process_search = mocker.patch('products.vmware_cb_response.CbResponse.nested_process_search') + filter_args = ['--days', '5', '--hostname', 'workstation1', '--username', 'admin'] + with runner.isolated_filesystem() as temp_dir: + sigma_file_path1 = os.path.join(temp_dir, "test_sigma_rule1.yml") + with open(sigma_file_path1, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule +id: 5fd18e43-749c-4bae-93b6-d46e1f27062e +description: Test sigma rule +logsource: + category: process_creation +detection: + selection: + - Image: 'curl.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + + sigma_file_path2 = os.path.join(temp_dir, "test_sigma_rule2.yml") + with open(sigma_file_path2, 'w') as sigmafile: + sigmafile.write("""title: Test sigma rule 2 +id: 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c +description: Test sigma rule 2 +logsource: + category: process_creation +detection: + selection: + - Image: 'powershell.exe' + condition: selection +fields: + - CommandLine + - ParentCommandLine""") + result = runner.invoke(cli, ["--sigmadir", temp_dir] + filter_args) + + expected_calls = [mocker.call(Tag('Test sigma rule - 5fd18e43-749c-4bae-93b6-d46e1f27062e', 'Sigma Rule'), {"query":["process_name:curl.exe"]}, {'username':'admin', 'hostname':'workstation1','days':5 }), + mocker.call(Tag('Test sigma rule 2 - 15ecb82d-b7c0-4e53-9bf3-deedb4c9908c', 'Sigma Rule'), {"query":["process_name:powershell.exe"]}, {'username':'admin', 'hostname':'workstation1','days':5 })] + assert "Processing sigma rules" in result.output + mocked_nested_process_search.assert_has_calls(expected_calls, any_order=True) + + +def test_sigma_rule_with_cortex(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + with runner.isolated_filesystem() as temp_dir: + cred_file = os.path.join(temp_dir, "test.ini") + with open(cred_file, 'w') as cred_file_output: + cred_file_output.write("testing123") + + result = runner.invoke(cli, ['--sigmarule', 'test.yml', 'cortex', '--creds', cred_file]) + assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output + assert result.exit_code != 0 + + +def test_sigma_dir_with_cortex(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + with runner.isolated_filesystem() as temp_dir: + cred_file = os.path.join(temp_dir, "test.ini") + with open(cred_file, 'w') as cred_file_output: + cred_file_output.write("testing123") + + result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 'cortex', '--creds', cred_file]) + assert 'Neither --sigmarule nor --sigmadir are supported by product "cortex"' in result.output + assert result.exit_code != 0 + + +def test_sigma_rule_with_s1_pq(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + with runner.isolated_filesystem() as temp_dir: + cred_file = os.path.join(temp_dir, "test.ini") + with open(cred_file, 'w') as cred_file_output: + cred_file_output.write("testing123") + + result = runner.invoke(cli, ['--sigmarule', 'test.yml', 's1', '--creds', cred_file]) + assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output + assert result.exit_code != 0 + + +def test_sigma_dir_with_s1_pq(runner, mocker): + mocker.patch('products.vmware_cb_response.CbResponse._authenticate') + with runner.isolated_filesystem() as temp_dir: + cred_file = os.path.join(temp_dir, "test.ini") + with open(cred_file, 'w') as cred_file_output: + cred_file_output.write("testing123") + + result = runner.invoke(cli, ['--sigmadir', './sigma_dir', 's1', '--creds', cred_file]) + assert 'Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery' in result.output + assert result.exit_code != 0 \ No newline at end of file