forked from SeattleTestbed/seattlelib_v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadvertisepipe.r2py
272 lines (209 loc) · 8.05 KB
/
advertisepipe.r2py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
<Author>
advertise_object.r2py:
Eric Kimbrel kimbrl@cs.washington.edu
Monzur Muhammad
This library:
albert.rafetseder@univie.ac.at
<Start Date>
2014-03-25
<Purpose>
Stores a list of (key, value) tuples and uses a single thread to
advertise each tuple in the list on our advertise servers. This
prevents a program from using multiple threads to repeatedly
advertise values.
Usage:
Call add(key, value) to add an item to the list of tuples to be
advertised. This call returns an advertise handle which can be
used to remove the advertised item from the advertise list via
the remove(advertise_handle) function.
"""
advertise = dy_import_module('advertise.r2py')
# XXX Used for singlethreadedadvertise_announce
centralizedadvertise = dy_import_module("centralizedadvertise.r2py")
centralizedadvertise_v2 = dy_import_module("centralizedadvertise_v2.r2py")
udp_centralizedadvertise = dy_import_module("udpcentralizedadvertise.r2py")
uniqueid = dy_import_module('uniqueid.r2py')
# Store info to be advertised. This dict is organized as
# { key1: {
# value1: list_of_handles_advertising_this,
# value2: list_of_handles_advertising_that
# },
# key2: {
# ...
# }
# }
#
# This layout makes adding/removing items a little more difficult,
# but saves lots of redundant advertising on the advertise services.
advertise_dict = {}
# This is to preclude looping over the dict in one thread, and adding
# new keys in another
advertise_dict_lock = createlock()
# We want to run efficiently and not spawn more than one thread at once!
advertise_thread_lock = createlock()
# Time to live we set on items we announce
TTL = 240
# Re-advertise items after this many seconds
REDO_INTERVAL = 120
# Pause between checks for (re-)advertising items
CHECK_INTERVAL = 1
# Store errors that occur in the advertise_thread as tuples of
# (getruntime(), repr(e)) where e is the exception you caught
mycontext['advertisepipe_last_error'] = None
# Pause for this long if the advertise_thread encounters errors
ERROR_RETRY_INTERVAL = 5
def advertise_thread():
"""
The core advertise function. If the REDO_INTERVAL has passed,
advertise everything that's in the advertise_dict.
Any unexpected errors that are raised here are caught in the
wrapper function error_safe_advertise_thread().
"""
# Preinitialize the start time
start = getruntime()
# Keep going as long as there are items in the dict
while len(advertise_dict) > 0:
# Make sure the advertise_dict doesn't change while we loop through it
advertise_dict_lock.acquire(True)
# Remember when we started the current advertisement pass
start = getruntime()
# Advertise each key-value pair in the dict
# XXX This can pretty much monopolize the vessel's link!
for key, value_dict in advertise_dict.items():
for value in value_dict.keys():
try:
advertise.advertise_announce(key, value, TTL)
except (advertise.AdvertiseError, TimeoutError), e:
# Let's assume this was a temporary problem and continue
error = (getruntime(),
"Error advertising advertise_dict " + repr(e))
mycontext["advertisepipe_last_error"] = error
sleep(ERROR_RETRY_INTERVAL)
except Exception, e:
# Hu? We didn't expect *that* exception. Log and retry later.
error = (getruntime(),
"Unexpected error advertising advertise_dict " + repr(e))
mycontext["advertisepipe_last_error"] = error
advertise_dict_lock.release()
raise
# If we reach this, advertising our items went just fine.
advertise_dict_lock.release()
# Now wait until the REDO_INTERVAL expires before repeating.
while getruntime() - start < REDO_INTERVAL:
sleep(CHECK_INTERVAL)
def error_safe_advertise_thread():
"""
Wrapper function to catch errors that might occur in advertise_thread,
and retry by starting a new thread.
No exception is considered fatal. The latest exception is logged in the
global variable mycontext['advertisepipe_last_error'].
"""
while True:
try:
advertise_thread()
except Exception, e:
# The current thread has died. Log the error and retry.
mycontext['advertisepipe_last_error'] = (getruntime(),
"Advertise thread died with " + repr(e))
sleep(ERROR_RETRY_INTERVAL)
else:
# The thread has returned, indicating that it has detected it
# should stop. Release the thread lock so that a new advertise
# thread can be spawned later on demand.
try:
advertise_thread_lock.release()
except LockDoubleReleaseError:
pass
break
def start_error_safe_advertise_thread():
"""
Helper function to check the thread lock (non-blocking)
and start an advertise thread if none is running.
"""
if advertise_thread_lock.acquire(False):
createthread(error_safe_advertise_thread)
def add_to_pipe(key, value):
"""
<Purpose>
Add the key-value pair to the advertise pipe, and advertise
it instantly.
<Arguments>
key, value:
The item to advertise. value will be advertised under key.
<Returns>
A handle that can be used to remove the key-value pair
<Excpetions>
None
"""
# create a unique handle
handle = uniqueid.uniqueid_getid()
# Make sure the advertise_dict is not in use before modifying it
advertise_dict_lock.acquire(True)
if key not in advertise_dict:
# The key is new to us
advertise_dict[key] = {}
if value not in advertise_dict[key]:
# We know the key (or have just generated it), but the value is new
advertise_dict[key][value] = []
if handle not in advertise_dict[key][value]:
# We know (or have generated) key and value, now add our handle
advertise_dict[key][value].append(handle)
# Done adding the item, make the dict freely accessible again.
advertise_dict_lock.release()
# Advertise this item right away
singlethreadedadvertise_announce(key, value, TTL)
# Signal to the advertise thread that it should continuously
# advertise us, and start a thread to do so (if required).
start_error_safe_advertise_thread()
# return the handle
return handle
def remove_from_pipe(handle):
"""
<Purpose>
Removes the key-value pair corresponding to the handle from
the advertise pipe. Also clean up otherwise empty keys/values.
Note that other handles might be still advertising the same key
and the same (or another) value.
<Arguments>
A handle returned from add_to_pipe
<Returns>
None
<Excpetions>
None.
"""
# Make sure the advertise_dict doesn't change while we loop through it
advertise_dict_lock.acquire(True)
# Go through the advertise dict and remove handle whereever it appears
for key, value_dict in advertise_dict.items():
for value, handle_list in value_dict.items():
if handle in handle_list:
handle_list.remove(handle)
# Additionally, remove values without handles ...
if handle_list == []:
del advertise_dict[key][value]
# ... and keys without values
if advertise_dict[key] == {}:
del advertise_dict[key]
# Done with our changes.
advertise_dict_lock.release()
def singlethreadedadvertise_announce(key, value, ttl):
"""
This is a deliberately single-threaded version of advertise's
advertise_announce. We avoid "runaway" conditions such as when
the node was offline and spawned very many advertise threads
during that period.
"""
announce_functions = [centralizedadvertise.centralizedadvertise_announce,
centralizedadvertise_v2.v2centralizedadvertise_announce,
udp_centralizedadvertise.udpcentralizedadvertise_announce]
for announce_function in announce_functions:
try:
announce_function(key, value, ttl)
except:
# We don't care if we fail advertising the first time --- the
# advertise_thread will retry for us and announce when possible,
# i.e. within REDO_INTERVAL seconds of the node coming back online,
# or every ERROR_RETRY_INTERVAL seconds if the advertise services
# had an error.
pass