-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkv.py
142 lines (121 loc) · 4.3 KB
/
kv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
# -*- coding: utf8 -*-
import sys
import json
import socket
import raft
debug = True
udp_socket = None
kv = {}
g_seq = 0
session = {}
def command_exec(command):
global udp_socket, kv, session
msg = json.loads(command)
if msg['cmd'] == 'set':
kv[msg['key']] = msg['val']
elif msg['cmd'] == 'del':
if msg['val'] in kv:
del kv[msg['key']]
if msg['seq'] in session:
addr = session[msg['seq']]
del session[msg['seq']]
if msg['cmd'] == 'get':
if debug:
print 'Special strong consistency read'
if msg['key'] in kv:
udp_socket.sendto(json.dumps({
'ret': 0,
'val': kv[msg['key']],
}), addr)
else:
udp_socket.sendto(json.dumps({
'ret': -1,
'err': 'Not Found',
}), addr)
else:
udp_socket.sendto(json.dumps({
'ret': 0,
}), addr)
if __name__ == '__main__':
# python kv.py 127.0.0.1:9901 127.0.0.1:9902 127.0.0.1:9903
# python kv.py 127.0.0.1:9902 127.0.0.1:9901 127.0.0.1:9903
# python kv.py 127.0.0.1:9903 127.0.0.1:9902 127.0.0.1:9901
if len(sys.argv) < 4 or '-h' in sys.argv or '--help' in sys.argv:
print 'Usage: python kv.py selfHost:port partner1Host:port partner2Host:port ...'
sys.exit()
def get_addr_by_str(s):
hps = s.split(':')
return (hps[0], int(hps[1]))
self = get_addr_by_str(sys.argv[1])
partners = []
for i in xrange(2, len(sys.argv)):
partners.append(get_addr_by_str(sys.argv[i]))
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(self)
udp_socket.settimeout(0.1)
def send_to(msg, addr):
msg['_raft'] = 1
buff = json.dumps(msg)
udp_socket.sendto(buff, addr)
node = raft.Node(self, partners)
node.RegisterSendFunc(send_to)
node.RegisterExecFunc(command_exec)
if debug:
node.Debug()
while True:
try:
buff, addr = udp_socket.recvfrom(65536)
try:
msg = json.loads(buff)
except ValueError:
msg = {}
if '_raft' in msg:
del msg['_raft']
node._onMsgRecv(addr, msg)
else:
while True:
if 'cmd' not in msg or \
msg['cmd'] not in ('get', 'set', 'del'):
break
if 'key' not in msg or \
not isinstance(msg['key'], basestring):
break
if msg['cmd'] == 'set' and ('val' not in msg or
not isinstance(msg['val'], basestring)):
break
if debug:
if msg['cmd'] != 'get':
print 'New Request %s' % msg
if not node.IsLeader():
udp_socket.sendto(json.dumps({
'ret': -999,
'err': 'Not Leader',
'redirect': node.GetLeader(),
}), addr)
break
if msg['cmd'] == 'get' and not node.IsReadOnlyNeedAppendCommand():
if msg['key'] in kv:
udp_socket.sendto(json.dumps({
'ret': 0,
'val': kv[msg['key']],
}), addr)
else:
udp_socket.sendto(json.dumps({
'ret': -1,
'err': 'Not Found',
}), addr)
else:
g_seq += 1
seq = g_seq
session[seq] = addr
msg['seq'] = seq
node.AppendCommand(json.dumps(msg))
break
except socket.timeout:
pass
except socket.error, e:
# 傻逼Windows, UDP对面没开端口也会抛连接被RESET异常
if str(e) != '[Errno 10054] ':
raise
node._onTick()