diff --git a/common/scheduler.py b/common/scheduler.py index 21dd4ab..9791e50 100644 --- a/common/scheduler.py +++ b/common/scheduler.py @@ -10,12 +10,13 @@ # 一个简单的循环任务调度器 import time -import threading -from .variable import running +import asyncio +import traceback +from .utils import timestamp_format from . import log logger = log.log("scheduler") - +running_event = asyncio.Event() global tasks tasks = [] @@ -29,12 +30,14 @@ def __init__(self, name, function, interval = 86400, latest_execute = 0): def check_available(self): return (time.time() - self.latest_execute) >= self.interval - def run(self): + async def run(self): try: logger.info(f"task {self.name} run start") - self.function() + await self.function() + logger.info(f'task {self.name} run success, next execute: {timestamp_format(self.interval + self.latest_execute)}') except Exception as e: logger.error(f"task {self.name} run failed, waiting for next execute...") + logger.error(traceback.format_exc()) def append(name, task, interval = 86400): global tasks @@ -42,18 +45,17 @@ def append(name, task, interval = 86400): wrapper = taskWrapper(name, task, interval) return tasks.append(wrapper) -def thread_runner(): - global tasks - while True: - if not running: - return +# 在 thread_runner 函数中修改循环逻辑 +async def thread_runner(): + global tasks, running_event + while not running_event.is_set(): for t in tasks: - if t.check_available(): + if t.check_available() and not running_event.is_set(): t.latest_execute = int(time.time()) - threading.Thread(target = t.run).start() - time.sleep(1) + await t.run() # 等待异步任务完成 + await asyncio.sleep(1) -def run(): +async def run(): logger.debug("scheduler thread starting...") - threading.Thread(target = thread_runner).start() + task = asyncio.create_task(thread_runner()) logger.debug("schedluer thread load success") \ No newline at end of file diff --git a/main.py b/main.py index 1b593ac..2ddec04 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ from common import log from common import Httpx from common import variable +from common import scheduler from aiohttp.web import Response import ujson as json import threading @@ -130,6 +131,7 @@ async def run_app(): logger.info(f"监听 -> http://{host}:{port}") async def initMain(): + await scheduler.run() variable.aioSession = aiohttp.ClientSession() try: await run_app() @@ -147,7 +149,11 @@ async def initMain(): logger.error("遇到未知错误,请查看日志") logger.error(traceback.format_exc()) finally: - await variable.aioSession.close() + logger.info('wating for sessions to complete...') + if variable.aioSession: + await variable.aioSession.close() + + variable.running = False logger.info("Server stopped") if __name__ == "__main__":