-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathexternalprocess.py
348 lines (282 loc) · 10.9 KB
/
externalprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
'''Running subprocesses asynchronously.'''
from __future__ import print_function
import fcntl
import os
import os.path
import subprocess
import functools
import datetime
import pty
import signal
import atexit
import tornado.ioloop
from tornado.ioloop import IOLoop, PeriodicCallback
import tornado.process
from seesaw.event import Event
from seesaw.task import Task
from seesaw.config import realize
import time
_all_procs = set()
@atexit.register
def cleanup():
if _all_procs:
print('Subprocess did not exit properly!')
for proc in _all_procs:
print('Killing', proc)
try:
if hasattr(proc, 'proc'):
proc.proc.terminate()
else:
proc.terminate()
except Exception as error:
print(error)
time.sleep(0.1)
try:
if hasattr(proc, 'proc'):
proc.proc.kill()
else:
proc.kill()
except Exception as error:
print(error)
class AsyncPopen(object):
'''Asynchronous version of :class:`subprocess.Popen`.
Deprecated.
'''
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.ioloop = None
self.master_fd = None
self.master = None
self.pipe = None
self.stdin = None
self.on_output = Event()
self.on_end = Event()
@classmethod
def ignore_sigint(cls):
# http://stackoverflow.com/q/5045771/1524507
signal.signal(signal.SIGINT, signal.SIG_IGN)
os.setpgrp()
def run(self):
self.ioloop = IOLoop.instance()
(master_fd, slave_fd) = pty.openpty()
# make stdout, stderr non-blocking
fcntl.fcntl(master_fd, fcntl.F_SETFL,
fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
self.master_fd = master_fd
self.master = os.fdopen(master_fd)
# listen to stdout, stderr
self.ioloop.add_handler(master_fd, self._handle_subprocess_stdout,
self.ioloop.READ)
slave = os.fdopen(slave_fd)
self.kwargs["stdout"] = slave
self.kwargs["stderr"] = slave
self.kwargs["close_fds"] = True
self.kwargs["preexec_fn"] = self.ignore_sigint
self.pipe = subprocess.Popen(*self.args, **self.kwargs)
self.stdin = self.pipe.stdin
# check for process exit
self.wait_callback = PeriodicCallback(self._wait_for_end, 250)
self.wait_callback.start()
_all_procs.add(self.pipe)
def _handle_subprocess_stdout(self, fd, events):
if not self.master.closed and (events & IOLoop._EPOLLIN) != 0:
data = self.master.read()
self.on_output(data)
self._wait_for_end(events)
def _wait_for_end(self, events=0):
self.pipe.poll()
if self.pipe.returncode is not None or \
(events & tornado.ioloop.IOLoop._EPOLLHUP) > 0:
self.wait_callback.stop()
self.master.close()
self.ioloop.remove_handler(self.master_fd)
self.on_end(self.pipe.returncode)
_all_procs.remove(self.pipe)
class AsyncPopen2(object):
'''Adapter for the legacy AsyncPopen'''
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.on_output = Event()
self.on_end = Event()
self.pipe = None
def run(self):
self.kwargs["stdout"] = tornado.process.Subprocess.STREAM
self.kwargs["stderr"] = tornado.process.Subprocess.STREAM
self.kwargs["preexec_fn"] = AsyncPopen.ignore_sigint
self.pipe = tornado.process.Subprocess(*self.args, **self.kwargs)
self.pipe.stdout.read_until_close(
callback=self._handle_subprocess_stdout,
streaming_callback=self._handle_subprocess_stdout)
self.pipe.stderr.read_until_close(
callback=self._handle_subprocess_stdout,
streaming_callback=self._handle_subprocess_stdout)
self.pipe.set_exit_callback(self._end_callback)
_all_procs.add(self.pipe)
def _handle_subprocess_stdout(self, data):
self.on_output(data)
def _end_callback(self, return_code):
self.on_end(return_code)
_all_procs.remove(self.pipe)
@property
def stdin(self):
return self.pipe.stdin
class ExternalProcess(Task):
'''External subprocess runner.'''
def __init__(self, name, args, max_tries=1, retry_delay=2,
accept_on_exit_code=None, retry_on_exit_code=None, env=None):
Task.__init__(self, name)
self.args = args
self.max_tries = max_tries
self.retry_delay = retry_delay
if accept_on_exit_code is not None:
self.accept_on_exit_code = accept_on_exit_code
else:
self.accept_on_exit_code = [0]
self.retry_on_exit_code = retry_on_exit_code
self.env = env or {}
if 'PYTHONIOENCODING' not in self.env:
self.env['PYTHONIOENCODING'] = 'utf8:replace'
def enqueue(self, item):
self.start_item(item)
item.log_output("Starting %s for %s\n" % (self, item.description()))
item["tries"] = 0
item["ExternalProcess.stdin_write_error"] = False
item["ExternalProcess.running"] = False
self.process(item)
def stdin_data(self, item):
return b""
def process(self, item):
with self.task_cwd():
p = AsyncPopen2(
args=realize(self.args, item),
env=realize(self.env, item),
stdin=subprocess.PIPE,
close_fds=True
)
p.on_output += functools.partial(self.on_subprocess_stdout, p,
item)
p.on_end += functools.partial(self.on_subprocess_end, item)
p.run()
item["ExternalProcess.running"] = True
try:
p.stdin.write(self.stdin_data(item))
except Exception as error:
# FIXME: We need to properly propagate errors
item.log_output("Error writing to process: %s" % str(error))
item["ExternalProcess.stdin_write_error"] = True
p.stdin.close()
def fail_item(self, item):
# Don't allow the item to fail until the external process completes
if item["ExternalProcess.running"]:
return
Task.fail_item(self, item)
def on_subprocess_stdout(self, pipe, item, data):
item.log_output(data, full_line=False)
def on_subprocess_end(self, item, returncode):
item["ExternalProcess.running"] = False
if returncode in self.accept_on_exit_code and \
not item["ExternalProcess.stdin_write_error"]:
self.handle_process_result(returncode, item)
else:
self.handle_process_error(returncode, item)
def handle_process_result(self, exit_code, item):
item.log_output("Finished %s for %s\n" % (self, item.description()))
self.complete_item(item)
def handle_process_error(self, exit_code, item):
item["tries"] += 1
item.log_output(
"Process %s returned exit code %d for %s\n" %
(self, exit_code, item.description())
)
item.log_error(self, exit_code)
retry_acceptable = self.max_tries is None or \
item["tries"] < self.max_tries
exit_status_indicates_retry = self.retry_on_exit_code is None or \
exit_code in self.retry_on_exit_code or \
item["ExternalProcess.stdin_write_error"]
if retry_acceptable and exit_status_indicates_retry:
item.log_output(
"Retrying %s for %s after %d seconds...\n" %
(self, item.description(), self.retry_delay)
)
IOLoop.instance().add_timeout(
datetime.timedelta(seconds=self.retry_delay),
functools.partial(self.process, item)
)
else:
item.log_output("Failed %s for %s\n" % (self, item.description()))
self.fail_item(item)
class WgetDownload(ExternalProcess):
'''Download with Wget process runner.'''
def __init__(self, args, max_tries=1, accept_on_exit_code=None,
retry_on_exit_code=None, env=None, stdin_data_function=None):
ExternalProcess.__init__(
self, "WgetDownload",
args=args, max_tries=max_tries,
accept_on_exit_code=(accept_on_exit_code
if accept_on_exit_code is not None else [0]),
retry_on_exit_code=retry_on_exit_code,
env=env)
self.stdin_data_function = stdin_data_function
def stdin_data(self, item):
if self.stdin_data_function:
return self.stdin_data_function(item)
else:
return b""
class RsyncUpload(ExternalProcess):
'''Upload with Rsync process runner.'''
def __init__(self, target, files, target_source_path="./", bwlimit="0",
max_tries=None, extra_args=None):
args = [
"rsync",
"-rltv",
"--timeout=300",
"--contimeout=300",
"--progress",
"--bwlimit", bwlimit
]
if extra_args is not None:
args.extend(extra_args)
args.extend([
"--files-from=-",
target_source_path,
target
])
ExternalProcess.__init__(self, "RsyncUpload",
args=args,
max_tries=max_tries)
self.files = files
self.target_source_path = target_source_path
def stdin_data(self, item):
return "".join(
[
"%s\n" % os.path.relpath(
realize(f, item),
realize(self.target_source_path, item)
)
for f in realize(self.files, item)
]).encode('utf-8')
class CurlUpload(ExternalProcess):
'''Upload with Curl process runner.'''
def __init__(self, target, filename, connect_timeout="60", speed_limit="1",
speed_time="900", max_tries=None):
args = [
"curl",
"--fail",
"--output", "/dev/null",
"--connect-timeout", str(connect_timeout),
"--speed-limit", str(speed_limit), # minimum upload speed 1B/s
# stop if speed < speed-limit for 900 seconds
"--speed-time", str(speed_time),
"--header", "X-Curl-Limits: inf,%s,%s" % (str(speed_limit),
str(speed_time)),
"--write-out", "Upload server: %{url_effective}\\n",
"--location",
"--upload-file", filename,
target
]
ExternalProcess.__init__(self, "CurlUpload",
args=args,
max_tries=max_tries)