-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathextract_packets.py
88 lines (72 loc) · 2.43 KB
/
extract_packets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
import os.path
import sys
def create_jsons(testcase, outdir, name):
channels = [
("channel_ref_a_", "RefreshA"),
("channel_ref_b_", "RefreshB"),
("channel_snap_a_", "SnapshotA"),
("channel_snap_b_", "SnapshotB"),
]
outfile = "internal_" + name + ".json"
outfile = os.path.join(outdir, outfile)
with open(outfile, "w") as jsonFile:
json.dump(testcase['output'], jsonFile)
packets = testcase['input']['packets']
for fchannel, channel in channels:
data = [p for p in packets if p['Channel'] == channel]
data = { "packets": data }
outfile = fchannel + name + ".json"
outfile = os.path.join(outdir, outfile)
with open(outfile, "w") as jsonFile:
json.dump(data, jsonFile)
create_jsons.tests.append({
"name" : "test " + str(len(create_jsons.tests)), "file":name
})
create_jsons.tests = []
def processStream(f, outdir):
tcs = []
names = {}
for line in f.readlines():
if line.strip(" \n") == "":
continue
dic = json.loads(line)
if dic['status_code'] != 200:
continue
results = dic['results'][0]
if 'testcases' not in results:
continue
if 'name' not in results:
continue
name = results['name']
if name in names:
names[name] += 1
else:
names[name] = 1
name += str(names[name])
testcases = results['testcases']
if testcases == []:
continue
tcs.append( (name, testcases) )
for testcase in testcases:
point = str(testcase['point'])
tcname = name.replace("'","x") + "_" + point
testcase = json.loads(testcase['text'].decode('string_escape'))
create_jsons(testcase, outdir, tcname)
with open(os.path.join(outdir, "tests.json"), "w") as jsonTests:
json.dump({"tests": create_jsons.tests}, jsonTests)
def main():
if len(sys.argv) == 2:
infile = sys.stdin
outdir = sys.argv[1]
elif len(sys.argv) == 3:
infile = open(sys.argv[2], 'rb')
outdir = sys.argv[1]
else:
raise SystemExit(sys.argv[0] + " outdir [infile]")
if not os.path.exists(outdir):
os.makedirs(outdir)
processStream(infile, outdir)
infile.close()
if __name__ == '__main__':
main()