-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathonug_csnf_mapping_load.py
executable file
·153 lines (129 loc) · 5.28 KB
/
onug_csnf_mapping_load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# Copyright (c) 2021 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
import sys
import argparse
import csv
import json
def convert_onug_csv_to_json(input_file, output_file):
all_providers = {}
hasHeader = True
try:
with open(input_file) as f:
csvreader = csv.reader(f)
if hasHeader: next(csvreader) # Consume one line if a header exists
# Iterate over the rows, and unpack each row into the variables
for provider_name, provider_type, provider_id, source_name, alert_id_name, csnf_path, provider_path, static_value, entity_type in csvreader:
# If the provider hasn't been processed yet, create a new dict for it
if provider_name not in all_providers:
all_providers[provider_name] = {
"provider" : provider_name,
"providerType" : provider_type,
"providerId" : provider_id,
"source" : {}}
# Get the dict object that holds this provider's information
provider = all_providers[provider_name]
# If the tournament hasn't been processed already for this team, create a new dict for it in the team's dict
if source_name not in provider["source"]:
provider["source"][source_name] = { "sourceName" : source_name, "sourceId" : "None", "alerts": {}}
if alert_id_name not in provider["source"][source_name]["alerts"]:
provider["source"][source_name]["alerts"][alert_id_name] = {"alertMapping" : {}}
alert_mapping = provider["source"][source_name]["alerts"][alert_id_name]["alertMapping"]
alert_mapping[csnf_path] = {
"path" : provider_path,
"entityType" : entity_type,
"mappedValue" : True if static_value else False,
"value" : static_value
}
except Exception as e:
raise("Failed to convert ONUG parse CSV file \n " + str(e))
try:
with open(output_file, "w") as outfile:
json.dump(all_providers, outfile)
except Exception as e:
raise("Failed to write JSON to file \n" + str(e))
return all_providers
def convert_onug_csv_to_trigger(input_file, output_file):
all_providers = {}
hasHeader = True
try:
with open(input_file) as f:
csvreader = csv.reader(f)
if hasHeader: next(csvreader) # Consume one line if a header exists
# Iterate over the rows, and unpack each row into the variables
for provider_name, provider_type, provider_id, source_name, alert_id_name, csnf_path, provider_path, static_value, entity_type in csvreader:
# If the provider hasn't been processed yet, create a new dict for it
if provider_name not in all_providers:
all_providers[provider_name] = {
"provider" : provider_name,
"providerType" : provider_type,
"providerId" : provider_id,
"source" : {}}
# Get the dict object that holds this provider's information
provider = all_providers[provider_name]
# If the tournament hasn't been processed already for this team, create a new dict for it in the team's dict
if source_name not in provider["source"]:
provider["source"][source_name] = { "sourceName" : source_name, "sourceId" : "None", "alerts": {}}
if alert_id_name not in provider["source"][source_name]["alerts"]:
provider["source"][source_name]["alerts"][alert_id_name] = {"alertMapping" : {}}
alert_mapping = provider["source"][source_name]["alerts"][alert_id_name]["alertMapping"]
alert_mapping[csnf_path] = {
"path" : provider_path,
"entityType" : entity_type,
"mappedValue" : True if static_value else False,
"value" : static_value
}
except Exception as e:
raise("Failed to convert ONUG parse CSV file \n " + str(e))
try:
with open(output_file, "w") as outfile:
json.dump(all_providers, outfile)
except Exception as e:
raise("Failed to write JSON to file \n" + str(e))
return all_providers
##########################################################################
# Arg Parsing function to be updated
##########################################################################
def set_parser_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'-o',
type=argparse.FileType('r'),
dest='input.csv',
help="Input CSV File"
)
parser.add_argument(
'-i',
type=argparse.FileType('w'),
dest='output_json',
help="JSON Output prefix")
result = parser.parse_args()
if len(sys.argv) < 3:
parser.print_help()
return None
##########################################################################
# execute_conversion
##########################################################################
def execute_conversion():
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
type=argparse.FileType('r'),
dest='input_csv',
help="Input CSV File"
)
parser.add_argument(
'-o',
type=argparse.FileType('w'),
dest='output_json',
help="JSON Output prefix")
result = parser.parse_args()
if len(sys.argv) != 5 :
parser.print_help()
return None
print(f'Input file is: {result.input_csv.name}')
print(f'Output file is: {result.output_json.name}')
convert_onug_csv_to_json(result.input_csv.name, result.output_json.name)
##########################################################################
# Main
##########################################################################
execute_conversion()