diff --git a/lib/traixroute/detector/detection_rules.py b/lib/traixroute/detector/detection_rules.py index 91634ca..d75dc54 100755 --- a/lib/traixroute/detector/detection_rules.py +++ b/lib/traixroute/detector/detection_rules.py @@ -44,15 +44,6 @@ def __init__(self): self.asmt = [] # remote_peering: An instance of the remote peering class. self.remote_peering = remote_peering() - - - def load_files(self, filename): - try: - with open(filename) as f: - return [line.strip() for line in f.readlines()] - except: - print(filename + ' not found. Exiting.') - sys.exit(0) def rules_extract(self, mypath): ''' @@ -63,15 +54,14 @@ def rules_extract(self, mypath): file = '/configuration/rules.txt' try: - f = open(mypath + '/configuration/rules.txt', 'r') + with open(mypath + '/configuration/rules.txt', 'r') as f: + rules = [line.strip() for line in f.readlines()] except: print(file + ' does not exist. Exiting.') sys.exit(0) [delimeters1, expressions] = self.load_syntax_rules( 'configuration/expressions.txt', 'configuration/delimeters.txt', mypath) - rules = [line.strip() for line in f.readlines()] - for i in range(0, len(rules)): temp = (rules[i].split('#')) temp[0] = temp[0].replace(' ', '') @@ -196,27 +186,23 @@ def resolve_path(self, path, output, path_info_extract, db_extract, traIXparser) # Check if the condition part of a candidate rule # is satisfied in order to proceed with the # assessment part. - if len(cur_rule) > 0: + if len(cur_rule): current_hop = 1 if i < len(path) - 1: cur_path_asn = temp_path_asn[i - 1:i + 2] temp_ixp_long = ixp_long[i - 1:i + 2] temp_ixp_short = ixp_short[i - 1:i + 2] - cur_encounter_type = encounter_type[ - i - 1:i + 2] + cur_encounter_type = encounter_type[i - 1:i + 2] cur_path = path[i - 1:i + 2] else: cur_path_asn = temp_path_asn[i - 1:i + 1] temp_ixp_long = ixp_long[i - 1:i + 1] temp_ixp_short = ixp_short[i - 1:i + 1] - cur_encounter_type = encounter_type[ - i - 1:i + 1] + cur_encounter_type = encounter_type[i - 1:i + 1] cur_path = path[i - 1:i + 1] - set_ixp_short = list( - itertools.product(*temp_ixp_short)) - set_ixp_long = list( - itertools.product(*temp_ixp_long)) + set_ixp_short = list(itertools.product(*temp_ixp_short)) + set_ixp_long = list(itertools.product(*temp_ixp_long)) for ixp in range(0, len(set_ixp_short)): cur_ixp_long = list(set_ixp_long[ixp]) cur_ixp_short = list(set_ixp_short[ixp]) @@ -224,8 +210,7 @@ def resolve_path(self, path, output, path_info_extract, db_extract, traIXparser) cur_path, cur_rule, cur_path_asn, current_hop, cur_ixp_long, cur_ixp_short, asn2names, cur_encounter_type) if rule_check: if cur_asmt != '?': - self.rule_hits[ - j] = self.rule_hits[j] + 1 + self.rule_hits[j] = self.rule_hits[j] + 1 if self.remote_peering.rule_hit(j): self.remote_peering.temp_index = j @@ -253,25 +238,24 @@ def check_rules(self, path, rule, path_asn, path_cur, ixp_long, ixp_short, asn2n if len(rule) > len(path_asn): return False - for i in range(0, len(rule)): + for i,item in enumerate(rule): if len(path_asn) > path_cur + i - 1: - if ('IXP_IP' in rule[i] and '!AS_M' in rule[i]) and 'IXP prefix' not in encounter_type[path_cur + i - 1] and 'IXP IP' not in encounter_type[path_cur + i - 1]: + if ('IXP_IP' in item and '!AS_M' in item) and 'IXP prefix' not in encounter_type[path_cur + i - 1] and 'IXP IP' not in encounter_type[path_cur + i - 1]: return False - elif 'IXP_IP' in rule[i] and 'AS_M' in rule[i] and '!' not in rule[i] and 'IXP IP' not in encounter_type[path_cur + i - 1]: + elif 'IXP_IP' in item and 'AS_M' in item and '!' not in item and 'IXP IP' not in encounter_type[path_cur + i - 1]: return False - elif ('IXP_IP' not in rule[i] or '!AS_M' not in rule[i]) and 'IXP prefix' in encounter_type[path_cur + i - 1]: + elif ('IXP_IP' not in item or '!AS_M' not in item) and 'IXP prefix' in encounter_type[path_cur + i - 1]: return False - elif ('IXP_IP' not in rule[i] or 'AS_M' not in rule[i]) and 'IXP IP' in encounter_type[path_cur + i - 1]: + elif ('IXP_IP' not in item or 'AS_M' not in item) and 'IXP IP' in encounter_type[path_cur + i - 1]: return False - # Applies each condition of the condition part of the candidate rule - # onto the path. + # Applies each condition of the condition part of the candidate rule onto the path. string_h = string_handler.string_handler() check = 0 - for i in range(0, len(rule)): + for i,expression in enumerate(rule): + # The current condition of the condition part of the rule. current = path_cur + i - 1 - expression = rule[i] # Checking for IXP membership based on a non-IXP IP. if '!AS_M' in expression and 'and' not in expression and path_cur != current: @@ -333,13 +317,13 @@ def check_rules(self, path, rule, path_asn, path_cur, ixp_long, ixp_short, asn2n # Checking for IXP IP or Prefix based on either IXP membership or # Prefixes data. if 'IXP_IP' in expression and '!AS_M' in expression: - check = check + 1 + check += 1 if not self.check_names(rule, expression, current, i, encounter_type, 'IXP_IP', ixp_long, ixp_short): return False elif not self.check_number(rule, expression, path_asn, current, i, encounter_type, '!AS_M'): return False elif 'IXP_IP' in expression and 'AS_M' in expression: - check = check + 1 + check += 1 if not self.check_names(rule, expression, current, i, encounter_type, 'IXP_IP', ixp_long, ixp_short): return False elif not self.check_number(rule, expression, path_asn, current, i, encounter_type, 'AS_M'): @@ -402,8 +386,7 @@ def check_number(self, rule, expression, path_asn, current, i, encounter_type, s if len(rule) > i + 1 and len(path_asn) > current + 1: - [final1, final2] = self.find_numbers( - rule, str_to_chk, current, True) + [final1, final2] = self.find_numbers(rule, str_to_chk, current, True) if final1 == '' or final2 == '': return True if self.is_int(final1) and self.is_int(final2): @@ -492,9 +475,17 @@ def load_syntax_rules(self, filename1, filename2, mypath): a) delimeters1: A list containing the delimeters that separate the keywords. b) expressions: A list containing the allowed keywords. ''' - - expression_dump = self.load_files(mypath + '/' + filename1) - delimiter_dump = self.load_files(mypath + '/' + filename2) + + def load_files(filename): + try: + with open(filename) as f: + return [line.strip() for line in f.readlines()] + except: + print(filename + ' not found. Exiting.') + sys.exit(0) + + expression_dump = load_files(mypath + '/' + filename1) + delimiter_dump = load_files(mypath + '/' + filename2) candidate_delimiters = [del_node.split('#')[0] for del_node in delimiter_dump if del_node.split('#')[0]] if len(candidate_delimiters) != 1: