-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathparser.py
105 lines (96 loc) · 3.42 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#encoding:utf8
from constants import *
def absort_space(lexer_obj):
token = lexer_obj.look_ahead()
while token[0] == SPACE:
lexer_obj.get_next_token()
token = lexer_obj.look_ahead()
def expecting(lexer_obj, expect_token_type):
token = lexer_obj.get_next_token()
token_type = token[0]
if token_type == expect_token_type:
return token
else:
raise Exception("expecting {}, got {} instead".format(expect_token_type, token_type))
def optional(lexer_obj, expect_token_type):
token = lexer_obj.look_ahead()
token_type = token[0]
if token_type == expect_token_type:
return [lexer_obj.get_next_token()]
return None
def field(lexer_obj):
elements = {}
word = expecting(lexer_obj, WORD)
elements["name"] = word
number = expecting(lexer_obj, NUMBER)
elements["id"] = number
expecting(lexer_obj, COLON)
is_star = optional(lexer_obj, STAR)
if is_star:
elements["array"] = True
word = expecting(lexer_obj, WORD)
elements["type"] = word
return elements
def struct(lexer_obj):
element = []
expecting(lexer_obj, LEFT_PARENTHESE)
token = lexer_obj.look_ahead()
while token[0] != RIGHT_PARENTHESE:
if token[0] == POINT:
new_sproto_type = sproto_type(lexer_obj)
element.append(new_sproto_type)
token = lexer_obj.look_ahead()
else:
new_field = field(lexer_obj)
element.append(new_field)
token = lexer_obj.look_ahead()
expecting(lexer_obj, RIGHT_PARENTHESE)
return element
def sproto_type(lexer_obj):
expecting(lexer_obj, POINT)
sproto_type_name = expecting(lexer_obj, WORD)
element = struct(lexer_obj)
return {"type":"sproto_type", "name":sproto_type_name, "fields":element}
def sproto_protocol(lexer_obj):
result = {"type":"sproto_protocol"}
protocol_name = expecting(lexer_obj, WORD)
protocol_id = expecting(lexer_obj, NUMBER)
expecting(lexer_obj, LEFT_PARENTHESE)
token = lexer_obj.look_ahead()
while token[0] != RIGHT_PARENTHESE:
token = expecting(lexer_obj, WORD)
if token[1] == "request" or token[1] == "response":
next_token = lexer_obj.look_ahead()
if next_token[0] == LEFT_PARENTHESE:
packet = struct(lexer_obj)
elif next_token[0] == WORD:
packet = expecting(lexer_obj, WORD)
else:
raise Exception("expecting WORD or Struct, got {} instead".format(next_token))
result[token[1]] = packet
else:
raise Exception("expecting request or response, got {} instead".format(next_token))
token = lexer_obj.look_ahead()
expecting(lexer_obj, RIGHT_PARENTHESE)
result["name"] = protocol_name
result["id"] = protocol_id
return result
def sproto_group(lexer_obj):
total = {"type":"group"}
all_sproto_types = []
protocols = []
token = lexer_obj.look_ahead()
#import ipdb; ipdb.set_trace()
while token[0] != EOF:
if token[0] == SPACE:
absort_space(lexer_obj)
elif token[0] == POINT:
element = sproto_type(lexer_obj)
all_sproto_types.append(element)
elif token[0] == WORD:
protocol = sproto_protocol(lexer_obj)
protocols.append(protocol)
token = lexer_obj.look_ahead()
total["type"] = all_sproto_types
total["protocol"] = protocols
return total