-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patheventloop.py
46 lines (33 loc) · 820 Bytes
/
eventloop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_loop)
async def sleep(interval):
await asyncio.sleep(interval)
def run(future):
task = None
def schedule():
nonlocal task
task = _loop.create_task(future)
_loop.stop()
if not _loop.is_running():
_loop.call_soon(schedule)
_loop.run_forever()
else:
task = _loop.create_task(future)
assert task, "Task was not scheduled"
return task
def stop():
global _loop
try:
_loop.stop()
_loop.close()
except Exception:
pass
class AsyncApp:
def __init__(self):
pass
def run(self, coro):
task = run(coro)
while not task.done():
_loop.run_until_complete(sleep(0.02))
return task.result()