Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Regex and full query for definition files with additional parameter mappings and output fields for SentinelOne #87

Merged
merged 13 commits into from
Feb 27, 2023
44 changes: 37 additions & 7 deletions products/sentinel_one.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Tuple, Callable
import re

import requests
from requests.adapters import HTTPAdapter
Expand All @@ -26,12 +27,17 @@ class Query:


PARAMETER_MAPPING: dict[str, str] = {
'query': 'query', # non-existent field to specify a fully defined query string in a definition file.
'process_name': 'ProcessName',
'ipaddr': 'IP',
'cmdline': 'CmdLine',
'digsig_publisher': 'SrcProcPublisher',
'domain': 'Url',
'internal_name': 'TgtFileInternalName'
'digsig_publisher': 'Publisher',
'domain': 'DNS',
'internal_name': 'TgtFileInternalName',
'url': 'Url',
'filemod': 'FilePath',
'modload': 'ModulePath',
'process_file_description': 'SrcProcDisplayName'
}


Expand Down Expand Up @@ -422,9 +428,18 @@ def nested_process_search(self, tag: Tag, criteria: dict, base_query: dict):
parameter = PARAMETER_MAPPING[search_field]
search_value = all_terms

if len(terms) > 1:
if parameter == 'query':
# Formats queries as (a) OR (b) OR (c) OR (d)
if len(terms) > 1:
search_value = ') OR ('.join(terms)
else:
search_value = terms[0]
operator = 'raw'
elif len(terms) > 1:
search_value = f'({all_terms})'
operator = 'in contains anycase'
elif not re.findall(r'\w+\.\w+', search_value):
operator = 'regexp'
else:
operator = 'containscis'

Expand Down Expand Up @@ -469,6 +484,9 @@ def _process_queries(self):
combined_queries[key].append((tag, query.search_value))
elif query.full_query is not None:
query_text.append((tag, query.full_query))
elif query.operator == 'raw':
full_query = f'({query.search_value})'
query_text.append((tag, full_query))
else:
full_query = query.parameter + ' ' + query.operator + ' ' + query.search_value
query_text.append((tag, full_query))
Expand Down Expand Up @@ -552,9 +570,21 @@ def _process_queries(self):
for event in events:
hostname = event['endpointName']
username = event['srcProcUser']
path = event['processImagePath']
path = event['srcProcImagePath']
srcprocstorylineid = event['srcProcStorylineId'] if 'srcProcStorylineId' in event else 'None'
srcprocdisplayname = event['srcProcDisplayName'] if 'srcProcDisplayName' in event else 'None'
tgtprocdisplayname = event['tgtProcDisplayName'] if 'tgtProcDisplayName' in event else 'None'
tgtfilepath = event['tgtFilePath'] if 'tgtFilePath' in event else 'None'
tgtfilesha1 = event['fileSha1'] if 'fileSha1' in event else 'None'
tgtfilesha256 = event['fileSha256'] if 'fileSha256' in event else 'None'
scrprocparentimagepath = event['srcProcParentImagePath'] if 'srcProcParentImagePath' in event else 'None'
tgtprocimagepath = event['tgtProcImagePath'] if 'tgtProcImagePath' in event else 'None'
url = event['networkUrl'] if 'networkUrl' in event else 'None'
srcip = event['srcIp'] if 'srcIp' in event else 'None'
dstip = event['dstIp'] if 'dstIp' in event else 'None'
dnsrequest = event['dnsRequest'] if 'dnsRequest' in event else 'None'
command_line = event['srcProcCmdLine']
additional_data = (event['eventTime'], event['siteId'], event['siteName'])
additional_data = (srcprocstorylineid, srcprocdisplayname, scrprocparentimagepath, tgtprocdisplayname, tgtprocimagepath, tgtfilepath, tgtfilesha1, tgtfilesha256, url, srcip, dstip, dnsrequest, event['eventType'], event['eventTime'], event['siteId'], event['siteName'])

result = Result(hostname, username, path, command_line, additional_data)
self._results[merged_tag].append(result)
Expand All @@ -574,4 +604,4 @@ def get_results(self, final_call: bool = True) -> dict[Tag, list[Result]]:
return self._results

def get_other_row_headers(self) -> list[str]:
return ['Event Time', 'Site ID', 'Site Name']
return ['SrcProcStorylineId', 'SrcProcDisplayName', 'SrcProcParentImagePath', 'TgtProcDisplayName', 'TgtProcPath', 'TgtFilePath', 'TgtFileSHA1', 'TgtFileSHA256', 'Network URL', 'Source IP', 'Dest IP', 'DNS Request', 'EventType', 'Event Time', 'Site ID', 'Site Name']