Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Sigma Rules #106

Merged
merged 16 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest pytest-mock mock
python -m pip install flake8 pytest pytest-mock mock mypy
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements_sigma.txt ]; then pip install -r requirements_sigma.txt; fi
if [ -f requirements.types.txt ]; then pip install -r requirements.types.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
Expand All @@ -35,4 +37,7 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
pytest -vv
- name: Type check with mypy
run: |
mypy ./ --exclude 'setup\.py' --exclude 'tests/'
44 changes: 44 additions & 0 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,47 @@ def _echo(self, message: str, level: int = logging.DEBUG):
Write a message to STDOUT and the debug log stream.
"""
log_echo(message, self.log, level, use_tqdm=self._tqdm_echo)

def sigma_translation(product: str, sigma_rules: list) -> dict:
supports_json_ouput = True

try:
from sigma.collection import SigmaCollection # type: ignore
from sigma.plugins import SigmaPluginDirectory # type: ignore
plugins = SigmaPluginDirectory.default_plugin_directory()
except Exception as e:
raise e

if product in ('cbr','cbc'):
plugins.get_plugin_by_id('carbonblack').install()
from sigma.backends.carbonblack import CarbonBlackBackend # type: ignore

if product == 'cbr':
from sigma.pipelines.carbonblack import CarbonBlackResponse_pipeline as cb_pipeline # type: ignore
else:
from sigma.pipelines.carbonblack import CarbonBlack_pipeline as cb_pipeline # type: ignore

backend = CarbonBlackBackend(cb_pipeline())
elif product == 's1':
plugins.get_plugin_by_id('sentinelone').install()
from sigma.backends.sentinel_one import SentinelOneBackend # type: ignore
backend = SentinelOneBackend()
elif product == 'dfe':
supports_json_ouput = False
plugins.get_plugin_by_id('microsoft365defender').install()
from sigma.backends.microsoft365defender import Microsoft365DefenderBackend # type: ignore
backend = Microsoft365DefenderBackend()

rule_collection = SigmaCollection.load_ruleset(sigma_rules)
if supports_json_ouput:
return backend.convert(rule_collection, "json")
else:
results: dict = {"queries":[]}
for r in rule_collection:
results['queries'].append({
'query': backend.convert_rule(r)[0],
'id': r.id,
'title': r.title,
'description': r.description
})
return results
8 changes: 4 additions & 4 deletions products/vmware_cb_enterprise_edr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import logging

from typing import Generator
import cbc_sdk.errors
from cbc_sdk.rest_api import CBCloudAPI
from cbc_sdk.platform import Process
from cbc_sdk.base import QueryBuilder
import cbc_sdk.errors # type: ignore
from cbc_sdk.rest_api import CBCloudAPI # type: ignore
from cbc_sdk.platform import Process # type: ignore
from cbc_sdk.base import QueryBuilder # type: ignore

from common import Product, Result, Tag

Expand Down
4 changes: 2 additions & 2 deletions products/vmware_cb_response.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from cbapi.response import CbEnterpriseResponseAPI
from cbapi.response.models import Process
from cbapi.response import CbEnterpriseResponseAPI # type: ignore
from cbapi.response.models import Process # type: ignore

from common import Product, Tag, Result

Expand Down
2 changes: 2 additions & 0 deletions requirements.types.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
types-requests
types-tqdm
1 change: 1 addition & 0 deletions requirements_sigma.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pySigma>=0.9.5
9 changes: 7 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ def find_scripts():
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
],
install_requires=[
'cbapi==1.7.0', 'click', 'requests', 'tqdm', 'carbon-black-cloud-sdk'
],
extras_require={
"sigma": [
"pysigma>=0.9.5"
]
)
}
)
71 changes: 63 additions & 8 deletions surveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import json
import logging
import os
from typing import Optional, Tuple, Callable
from typing import Optional, Tuple, Callable, Any

import click
from tqdm import tqdm

from common import Tag, Result
from common import Tag, Result, sigma_translation
from help import log_echo
from load import get_product_instance, get_products

Expand Down Expand Up @@ -46,13 +46,13 @@ def _list_products(ctx, _, value) -> None:
f'{{:<{table_template[3]}}}'


def _write_results(output: Optional[csv.writer], results: list[Result], program: str, source: str,
def _write_results(output: Optional[Any], results: list[Result], program: str, source: str,
tag: Tag, log: logging.Logger, use_tqdm: bool = False) -> None:
"""
Helper function for writing search results to CSV or STDOUT.
"""
if output:
if isinstance(tag, Tuple):
if isinstance(tag, tuple):
tag = tag[0]

if len(results) > 0:
Expand Down Expand Up @@ -91,6 +91,8 @@ class ExecutionOptions:
output: Optional[str]
def_dir: Optional[str]
def_file: Optional[str]
sigma_rule: Optional[str]
sigma_dir: Optional[str]
no_file: bool
no_progress: bool
log_dir: str
Expand All @@ -112,6 +114,8 @@ class ExecutionOptions:
@click.option("--query", help="A single query to execute.")
@click.option("--iocfile", 'ioc_file', help="IOC file to process. One IOC per line. REQUIRES --ioctype")
@click.option("--ioctype", 'ioc_type', help="One of: ipaddr, domain, md5")
@click.option("--sigmarule", 'sigma_rule', help="Sigma rule file to process (must be in YAML format).", type=click.STRING)
@click.option("--sigmadir", 'sigma_dir', help='Directory containing multiple sigma rule files.', type=click.STRING)
# optional output
@click.option("--output", "--o", help="Specify the output file for the results. "
"The default is create survey.csv in the current directory.")
Expand All @@ -126,17 +130,18 @@ def cli(ctx, prefix: Optional[str], hostname: Optional[str], profile: str, days:
username: Optional[str],
ioc_file: Optional[str], ioc_type: Optional[str], query: Optional[str], output: Optional[str],
def_dir: Optional[str], def_file: Optional[str], no_file: bool, no_progress: bool,
sigma_rule: Optional[str], sigma_dir: Optional[str],
log_dir: str) -> None:

ctx.ensure_object(dict)
ctx.obj = ExecutionOptions(prefix, hostname, profile, days, minutes, username, ioc_file, ioc_type, query, output,
def_dir, def_file, no_file, no_progress, log_dir, dict())
def_dir, def_file, sigma_rule, sigma_dir, no_file, no_progress, log_dir, dict())

if ctx.invoked_subcommand is None:
survey(ctx, 'cbr')


# S1 options
# Cortex options
@cli.command('cortex', help="Query Cortex XDR")
@click.option("--creds", 'creds', help="Path to credential file", type=click.Path(exists=True), required=True)
@click.pass_context
Expand All @@ -147,6 +152,7 @@ def cortex(ctx, creds: Optional[str]) -> None:

survey(ctx, 'cortex')


# S1 options
@cli.command('s1', help="Query SentinelOne")
@click.option("--site-id", help="ID of SentinelOne site to query", multiple=True, default=None)
Expand Down Expand Up @@ -193,6 +199,7 @@ def cbr(ctx, sensor_group: Optional[Tuple]) -> None:
survey(ctx, 'cbr')


# DFE options
@cli.command('dfe', help="Query Microsoft Defender for Endpoints")
@click.option("--creds", 'creds', help="Path to credential file", type=click.Path(exists=True), required=True)
@click.pass_context
Expand All @@ -209,14 +216,26 @@ def survey(ctx, product_str: str = 'cbr') -> None:
ctx.fail("--iocfile requires --ioctype")

if opt.ioc_file and not os.path.isfile(opt.ioc_file):
ctx.fail(f'Supplied --iocfile is not a file')
ctx.fail('Supplied --iocfile is not a file')

if (opt.output or opt.prefix) and opt.no_file:
ctx.fail('--output and --prefix cannot be used with --no-file')

if opt.days and opt.minutes:
ctx.fail('--days and --minutes are mutually exclusive')

if (opt.sigma_rule or opt.sigma_dir) and product_str == 'cortex':
ctx.fail('Neither --sigmarule nor --sigmadir are supported by product "cortex"')

if (opt.sigma_rule or opt.sigma_dir) and product_str == 's1' and opt.product_args['pq']:
ctx.fail('Neither --sigmarule nor --sigmadir are supported by SentinelOne PowerQuery')

if opt.sigma_rule and not os.path.isfile(opt.sigma_rule):
ctx.fail('Supplied --sigmarule is not a file')

if opt.sigma_dir and not os.path.isdir(opt.sigma_dir):
ctx.fail('Supplied --sigmadir is not a directory')

# instantiate a logger
log = logging.getLogger('surveyor')
logging.debug(f'Product: {product_str}')
Expand Down Expand Up @@ -262,6 +281,9 @@ def survey(ctx, product_str: str = 'cbr') -> None:
# initial query is retrieved from product instance
base_query = product.base_query()

# placeholder for sigma rules if --sigmarule or --sigmadir is selected
sigma_rules = list()

# add filters specified by user
if opt.username is not None:
base_query.update({"username": opt.username})
Expand All @@ -284,7 +306,7 @@ def survey(ctx, product_str: str = 'cbr') -> None:
if not opt.no_file:
# determine output file name
if opt.output and opt.prefix:
log_echo(f"Output arg takes precendence so prefix arg will be ignored", log)
log_echo("Output arg takes precendence so prefix arg will be ignored", log)
if opt.output:
file_name = opt.output
elif opt.prefix:
Expand Down Expand Up @@ -329,6 +351,10 @@ def survey(ctx, product_str: str = 'cbr') -> None:
ctx.fail("The deffile doesn't exist. Please try again.")
definition_files.append(opt.def_file)

# add sigma_rule to list
if opt.sigma_rule:
sigma_rules.append(opt.sigma_rule)

# if --defdir add all files to list
if opt.def_dir:
if not os.path.exists(opt.def_dir):
Expand All @@ -339,6 +365,13 @@ def survey(ctx, product_str: str = 'cbr') -> None:
if os.path.splitext(filename)[1] == '.json':
definition_files.append(os.path.join(root_dir, filename))

# if --sigma_dir, add all files to sigma_rules list
if opt.sigma_dir:
for root_dir, dirs, files in os.walk(opt.sigma_dir):
for filename in files:
if os.path.splitext(filename)[1] == '.yml':
sigma_rules.append(os.path.join(root_dir, filename))

# run search based on IOC file
if opt.ioc_file:
with open(opt.ioc_file) as ioc_file:
Expand Down Expand Up @@ -377,6 +410,28 @@ def survey(ctx, product_str: str = 'cbr') -> None:
for tag, nested_results in product.get_results().items():
_write_results(writer, nested_results, tag.tag, str(tag.data), tag, log)

# if there's sigma rules to be processed
if len(sigma_rules) > 0:
translated_rules = sigma_translation(product_str, sigma_rules)
for rule in tqdm(translated_rules['queries'], desc="Processing sigma rules", disable=opt.no_progress):
program = f"{rule['title']} - {rule['id']}"
source = 'Sigma Rule'

product.nested_process_search(Tag(program, data=source), {'query': [rule['query']]}, base_query)

if product.has_results():
# write results as they become available
for tag, nested_results in product.get_results(final_call=False).items():
_write_results(writer, nested_results, program, str(tag.data), tag, log,
use_tqdm=True)

# ensure results are only written once
product.clear_results()

# write any remaining results
for tag, nested_results in product.get_results().items():
_write_results(writer, nested_results, tag.tag, str(tag.data), tag, log)

if output_file:
log_echo(f"\033[95mResults saved: {output_file.name}\033[0m", log)
except KeyboardInterrupt:
Expand Down
Loading