-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtaskkit.py
210 lines (167 loc) · 6.16 KB
/
taskkit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Copyright (c) 2012, Rui Carmo
Description: In-process job management
License: MIT (see LICENSE.md for details)
"""
from Queue import Empty, Queue, PriorityQueue
from collections import defaultdict
from functools import partial
from signal import signal, SIGINT, SIGTERM, SIGHUP
import sys, logging
from threading import Semaphore, Thread
import time, traceback, ctypes
from uuid import uuid4
from cPickle import dumps, loads
import multiprocessing
log = logging.getLogger(__name__)
default_priority = 0
max_workers = multiprocessing.cpu_count() * 2
channels = {}
closed = {}
class Pool:
"""Represents a thread pool"""
def __init__(self, workers = max_workers, rate_limit = 1000):
self.max_workers = workers
self.mutex = Semaphore()
self.results = {}
self.retries = defaultdict(int)
self.queue = PriorityQueue()
self.threads = []
self.rate_limit = rate_limit
self.running = True
def _tick(self):
time.sleep(1.0/self.rate_limit)
# clean up finished threads
self.threads = [t for t in self.threads if t.isAlive()]
return (not self.queue.empty()) or (len(self.threads) > 0)
def _loop(self):
"""Handle task submissions"""
def run_task(priority, f, uuid, retries, args, kwargs):
"""Run a single task"""
try:
t.name = getattr(f, '__name__', None)
result = f(*args, **kwargs)
except Exception as e:
# Retry the task if applicable
if log:
log.error(traceback.format_exc())
if retries > 0:
with self.mutex:
self.retries[uuid] += 1
# re-queue the task with a lower (i.e., higher-valued) priority
self.queue.put((priority+1, dumps((f, uuid, retries - 1, args, kwargs))))
self.queue.task_done()
return
result = e
with self.mutex:
self.results[uuid] = dumps(result)
self.retries[uuid] += 1
self.queue.task_done()
while self._tick():
# spawn more threads to fill free slots
log.debug("Running %d/%d threads" % (len(self.threads),self.max_workers))
if self.running and len(self.threads) < self.max_workers:
log.debug("Queue Length: %d" % self.queue.qsize())
try:
priority, data = self.queue.get(True, 1.0/self.rate_limit)
except Empty:
continue
f, uuid, retries, args, kwargs = loads(data)
log.debug(f)
t = Thread(target=run_task, args=[priority, f, uuid, retries, args, kwargs])
t.setDaemon(True)
self.threads.append(t)
t.start()
log.debug("Exited loop.")
for t in self.threads:
t.join()
def kill_all(self):
"""Very hacky way to kill threads by tossing an exception into their state"""
for t in self.threads:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(t.ident), ctypes.py_object(SystemExit))
def stop(self):
"""Flush the job queue"""
self.running = False
self.queue = PriorityQueue()
def start(self, daemonize=False):
"""Pool entry point"""
self.results = {}
self.retries = defaultdict(int)
if daemonize:
t = Thread(target = self._loop, args=[self])
t.setDaemon(True)
t.start()
return
else:
self._loop()
default_pool = Pool()
class Deferred(object):
"""Allows lookup of task results and status"""
def __init__(self, pool, uuid):
self.uuid = uuid
self.pool = pool
self._result = None
@property
def result(self):
if self._result is None:
with self.pool.mutex:
if self.uuid in self.pool.results.keys():
self._result = loads(self.pool.results[self.uuid])
return self._result
@property
def retries(self):
return self.pool.retries[self.uuid]
def task(func=None, pool=None, max_retries=0, priority=default_priority):
"""Task decorator - setus up a .delay() attribute in the task function"""
if func is None:
return partial(task, pool=pool, max_retries=max_retries)
if pool is None:
pool = default_pool
def delay(*args, **kwargs):
uuid = str(uuid4()) # one for each task
pool.queue.put((priority,dumps((func, uuid, max_retries, args, kwargs))))
return Deferred(pool, uuid)
func.delay = delay
func.pool = pool
return func
def go(*args, **kwargs):
"""Queue up a function, Go-style"""
uuid = str(uuid4()) # one for each task
default_pool.queue.put((default_priority,dumps((args[0], uuid, 0, args[1:], kwargs))))
return Deferred(default_pool, uuid)
class Channel:
"""A serializable shim that proxies to a Queue object"""
def __init__(self, size):
self.uuid = str(uuid4()) # one for each task
channels[self.uuid] = Queue(size)
def recv(self):
return channels[self.uuid].get()
def send(self, item):
if self.uuid in closed:
raise RuntimeError("Channel is closed.")
channels[self.uuid].put(item)
def close(self):
closed[self.uuid] = True
def __iter__(self):
yield self.recv()
while True:
try:
res = channels[self.uuid].get(True, 1.0/default_pool.rate_limit)
yield res
except Empty:
# check channel again and end iteration if closed
if channels[self.uuid].empty() and (self.uuid in closed):
return
def chan(size = 0):
"""Return a shim that acts like a Go channel"""
return Channel(size)
def halt(signal, frame):
default_pool.stop()
default_pool.kill_all()
sys.exit()
def start(daemonize = False):
signal(SIGINT, halt)
signal(SIGTERM, halt)
default_pool.start(daemonize = daemonize)