From 6656061059c87d8396bc2696864da855fdefa878 Mon Sep 17 00:00:00 2001 From: bingcool <2437667702@qq.com> Date: Thu, 23 Jan 2025 17:57:02 +0800 Subject: [PATCH] feat: optimize script cron code --- Test/WorkerCron/CurlQuery/RemoteUrl.php | 6 +- .../LocalOrder/LocalOrderHandle.php | 8 +- .../conf/{schedule_task.php => fork_task.php} | 0 Test/WorkerCron/conf/remote_task.php | 14 ++- ...hedule_conf.php => schedule_fork_conf.php} | 2 +- ...oduct_conf.php => schedule_local_conf.php} | 32 +++--- .../{order_conf.php => schedule_url_conf.php} | 0 Test/WorkerCron/worker_cron_conf.php | 6 +- Test/WorkerDaemon/PipeWorkerProcess.php | 6 ++ src/Script/MainCliScript.php | 41 ++++---- src/Worker/AbstractMainProcess.php | 10 +- src/Worker/Cron/CronForkProcess.php | 24 +++-- src/Worker/Cron/CronLocalProcess.php | 4 +- src/Worker/Cron/CronUrlProcess.php | 73 ++++++++++---- src/Worker/Dto/CronUrlTaskMetaDto.php | 98 +++++++++++++++++++ src/Worker/MainManager.php | 6 +- src/Worker/Traits/SystemTrait.php | 28 +++++- 17 files changed, 270 insertions(+), 88 deletions(-) rename Test/WorkerCron/conf/{schedule_task.php => fork_task.php} (100%) rename Test/WorkerCron/conf/{schedule_conf.php => schedule_fork_conf.php} (94%) rename Test/WorkerCron/conf/{product_conf.php => schedule_local_conf.php} (52%) rename Test/WorkerCron/conf/{order_conf.php => schedule_url_conf.php} (100%) create mode 100644 src/Worker/Dto/CronUrlTaskMetaDto.php diff --git a/Test/WorkerCron/CurlQuery/RemoteUrl.php b/Test/WorkerCron/CurlQuery/RemoteUrl.php index 65f37acd..61058a12 100644 --- a/Test/WorkerCron/CurlQuery/RemoteUrl.php +++ b/Test/WorkerCron/CurlQuery/RemoteUrl.php @@ -2,6 +2,7 @@ namespace Test\WorkerCron\CurlQuery; use Common\Library\HttpClient\RawResponse; +use Swoolefy\Worker\Dto\CronUrlTaskMetaDto; class RemoteUrl { @@ -11,8 +12,9 @@ class RemoteUrl { * @param RawResponse $response * @return void */ - public function handle(RawResponse $response) { + public function handle(RawResponse $response, CronUrlTaskMetaDto $taskMetaDto) + { // 做简单的响应处理 - var_dump($response->getHeaders()); + var_dump($response->getDecodeBody()); } } \ No newline at end of file diff --git a/Test/WorkerCron/LocalOrder/LocalOrderHandle.php b/Test/WorkerCron/LocalOrder/LocalOrderHandle.php index 51e9cc85..7db2e651 100644 --- a/Test/WorkerCron/LocalOrder/LocalOrderHandle.php +++ b/Test/WorkerCron/LocalOrder/LocalOrderHandle.php @@ -30,14 +30,12 @@ public function doCronTask($cron, string $cronName) // var_dump(env('HOST_PASSWORD')); // var_dump(Swfy::getConf()['bjg']); SystemEnv::clearEnvRepository(); - RunLog::info("this is a cron test log"); - RunLog::error("this is a cron test log"); +// RunLog::info("this is a cron test log"); +// RunLog::error("this is a cron test log"); var_dump("cron start"); - sleep(3); + sleep(20); var_dump(SystemEnv::get('WEB_SITE_HOST')); - - sleep(60); //AbstractBaseWorker::getProcessInstance()->reboot(3); var_dump("cron end"); diff --git a/Test/WorkerCron/conf/schedule_task.php b/Test/WorkerCron/conf/fork_task.php similarity index 100% rename from Test/WorkerCron/conf/schedule_task.php rename to Test/WorkerCron/conf/fork_task.php diff --git a/Test/WorkerCron/conf/remote_task.php b/Test/WorkerCron/conf/remote_task.php index d04f4ca8..5a0019a4 100644 --- a/Test/WorkerCron/conf/remote_task.php +++ b/Test/WorkerCron/conf/remote_task.php @@ -5,17 +5,23 @@ [ 'cron_name' => 'send message', // 发送短信 'cron_expression' => 10, // 10s执行一次 - 'url' => 'http://www.baidu.com', + //'cron_expression' => '*/1 * * * *', // 每分钟执行一次 + 'url' => 'http://127.0.0.1:9501/index/index', 'method' => 'get', 'connect_time_out' => 10, //连接对方主机最长等待时间 - 'curl_time_out' => 15, // 整个请求最长等待总时间,要比connection_time_out大 + 'request_time_out' => 15, // 整个请求最长等待总时间,要比connection_time_out大 'options' => [], // curl option 'headers' => [], // 请求头 'params' => [], // post参数 // 'callback' => function(RawResponse $response) { // (new \Test\WorkerCron\CurlQuery\RemoteUrl())->handle($response); // }, - 'callback' => [\Test\WorkerCron\CurlQuery\RemoteUrl::class, 'handle'], - //'cron_expression' => '*/1 * * * *', // 每分钟执行一次 + 'before_callback' => function() { + var_dump('before_callback'); + }, + 'response_callback' => [\Test\WorkerCron\CurlQuery\RemoteUrl::class, 'handle'], + 'after_callback' => function() { + var_dump('after_callback'); + }, ] ]; \ No newline at end of file diff --git a/Test/WorkerCron/conf/schedule_conf.php b/Test/WorkerCron/conf/schedule_fork_conf.php similarity index 94% rename from Test/WorkerCron/conf/schedule_conf.php rename to Test/WorkerCron/conf/schedule_fork_conf.php index 37c3b821..ce92d152 100644 --- a/Test/WorkerCron/conf/schedule_conf.php +++ b/Test/WorkerCron/conf/schedule_fork_conf.php @@ -20,7 +20,7 @@ // 动态定时任务列表,可以存在数据库中 'task_list' => function () { - //$list1 = include __DIR__ . '/schedule_task.php'; + //$list1 = include __DIR__ . '/fork_task.php'; $list2 = Kernel::buildScheduleTaskList(Kernel::schedule()); return array_merge($list1 ?? [], $list2 ?? []); } diff --git a/Test/WorkerCron/conf/product_conf.php b/Test/WorkerCron/conf/schedule_local_conf.php similarity index 52% rename from Test/WorkerCron/conf/product_conf.php rename to Test/WorkerCron/conf/schedule_local_conf.php index 848aee28..53d77916 100644 --- a/Test/WorkerCron/conf/product_conf.php +++ b/Test/WorkerCron/conf/schedule_local_conf.php @@ -21,19 +21,21 @@ // ], // 独立进程本地处理任务 -// [ -// 'process_name' => 'test-local-cron-worker11', // 进程名称 -// 'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class, -// 'worker_num' => 2, // 默认动态进程数量 -// 'max_handle' => 100, //消费达到10000后reboot进程 -// 'life_time' => 3600, // 每隔3600s重启进程 -// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 -// 'extend_data' => [], -// 'args' => [ -// 'cron_name' => 'cancel-order', // 定时任务名称 -// 'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类 -// //'cron_expression' => '*/1 * * * *', // 每分钟执行一次 -// 'cron_expression' => 10, // 10s执行一次 -// ], -// ], + [ + 'process_name' => 'test-local-cron-worker11', // 进程名称 + 'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class, + 'worker_num' => 2, // 默认动态进程数量 + 'max_handle' => 100, //消费达到10000后reboot进程 + 'life_time' => 3600, // 每隔3600s重启进程 + 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待 + 'extend_data' => [], + 'args' => [ + 'cron_name' => 'cancel-order', // 定时任务名称 + 'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类 + //'cron_expression' => '*/1 * * * *', // 每分钟执行一次 + 'with_block_lapping' => 1, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的 +// 'run_in_background' => 1, // 后台运行,不受主进程的退出影响 + 'cron_expression' => 10, // 10s执行一次 + ], + ], ]; \ No newline at end of file diff --git a/Test/WorkerCron/conf/order_conf.php b/Test/WorkerCron/conf/schedule_url_conf.php similarity index 100% rename from Test/WorkerCron/conf/order_conf.php rename to Test/WorkerCron/conf/schedule_url_conf.php diff --git a/Test/WorkerCron/worker_cron_conf.php b/Test/WorkerCron/worker_cron_conf.php index 02884563..eace4623 100644 --- a/Test/WorkerCron/worker_cron_conf.php +++ b/Test/WorkerCron/worker_cron_conf.php @@ -1,7 +1,7 @@ getOption($cronScriptPidFile); if (SystemEnv::cronScheduleScriptModel()) { + $cronScriptPidFile = str_replace("-","", AbstractKernel::OPTION_SCHEDULE_CRON_SCRIPT_PID_FILE); + $pidFile = $this->getOption($cronScriptPidFile); file_put_contents($pidFile, Swfy::getMasterPid()); } } @@ -219,30 +219,31 @@ public static function parseClass() throw new SystemException("【Error】Missing cli command param. eg: --c=fixed:user:name --name=xxxx"); } - if (defined('ROOT_NAMESPACE')) { - $rootNamespace = ROOT_NAMESPACE; - $nameSpace = $rootNamespace[APP_NAME]; - $nameSpace = str_replace('\\', '/', $nameSpace); - $nameSpaceArr = explode('/', trim($nameSpace, '/')); - - $kernelNameSpace = array_merge($nameSpaceArr, ['Kernel']); - /** - * @var \Swoolefy\Script\AbstractKernel $kernelClass - */ - $kernelClass = implode('\\', $kernelNameSpace); - $commands = $kernelClass::getCommands() ?? []; - if (!isset($commands[$command])) { - throw new SystemException("【Error】 Kernel::commands property not defined command={$command}."); - } - $class = $commands[$command][0]; - $action = $commands[$command][1]; - }else { + if (!defined('ROOT_NAMESPACE')) { throw new SystemException("【Error】script.php not defined const ROOT_NAMESPACE."); } + $rootNamespace = ROOT_NAMESPACE; + $nameSpace = $rootNamespace[APP_NAME]; + $nameSpace = str_replace('\\', '/', $nameSpace); + $nameSpaceArr = explode('/', trim($nameSpace, '/')); + + $kernelNameSpace = array_merge($nameSpaceArr, ['Kernel']); + /** + * @var \Swoolefy\Script\AbstractKernel $kernelClass + */ + $kernelClass = implode('\\', $kernelNameSpace); + $commands = $kernelClass::getCommands() ?? []; + if (!isset($commands[$command])) { + throw new SystemException("【Error】 Kernel::commands property not defined command={$command}."); + } + $class = $commands[$command][0]; + $action = $commands[$command][1]; + if(!is_subclass_of($class, __CLASS__)) { throw new SystemException("【Error】class={$class} bust be extended \Swoolefy\Script\MainCliScript"); } + putenv("handle_class={$class}"); putenv("a={$action}"); return $class; diff --git a/src/Worker/AbstractMainProcess.php b/src/Worker/AbstractMainProcess.php index 7f12b8e3..4a959d9e 100644 --- a/src/Worker/AbstractMainProcess.php +++ b/src/Worker/AbstractMainProcess.php @@ -42,10 +42,16 @@ protected function parseWorkerConf() // 指定只启动某一个进程,开发,调试使用 // php daemon.php start Test --only=order-sync // php cron.php start Test --only=order-sync - if(defined('WORKER_CONF_FILE')) { + + if (function_exists('customLoadWorkerConf')) { + $workerConfList = customLoadWorkerConf(); + }else if(defined('WORKER_CONF_FILE')) { $mainManager = \Swoolefy\Worker\MainManager::getInstance(); - $workerConfListNew = []; $workerConfList = \Swoolefy\Worker\MainManager::loadWorkerConf(WORKER_CONF_FILE); + } + + $workerConfListNew = []; + if(!empty($workerConfList)) { // Specify Process to Run When dev or test to debug, Avoid the impact of other processes $onlyProcess = Helper::getCliParams('only'); if ($onlyProcess) { diff --git a/src/Worker/Cron/CronForkProcess.php b/src/Worker/Cron/CronForkProcess.php index b75bcd30..e576e9c0 100644 --- a/src/Worker/Cron/CronForkProcess.php +++ b/src/Worker/Cron/CronForkProcess.php @@ -29,11 +29,6 @@ class CronForkProcess extends CronProcess */ protected $forkType = self::FORK_TYPE_PROC_OPEN; - /** - * @var array - */ - protected $params = []; - /** * onInit * @return void @@ -41,7 +36,6 @@ class CronForkProcess extends CronProcess public function onInit() { parent::onInit(); - $this->params = $this->getArgs()['params'] ?? []; } /** @@ -49,8 +43,22 @@ public function onInit() */ public function run() { - parent::run(); - $this->runCronTask(); + try { + parent::run(); + $this->runCronTask(); + } catch (\Throwable $throwable) { + $context = [ + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + 'message' => $throwable->getMessage(), + 'code' => $throwable->getCode(), + "reboot_count" => $this->getRebootCount(), + 'trace' => $throwable->getTraceAsString(), + ]; + parent::onHandleException($throwable, $context); + sleep(2); + $this->reboot(); + } } /** diff --git a/src/Worker/Cron/CronLocalProcess.php b/src/Worker/Cron/CronLocalProcess.php index a918cfa7..f3bc9d9d 100644 --- a/src/Worker/Cron/CronLocalProcess.php +++ b/src/Worker/Cron/CronLocalProcess.php @@ -63,12 +63,12 @@ public function run() function (): bool { // 上一个任务未执行完,下一个任务到来时不执行,返回false结束 if ($this->withBlockLapping && $this->handing) { - $this->fmtWriteInfo("【{$this->getProcessName()}】进程定时任务还在处理中,暂时不再处理下一个任务"); + $this->fmtWriteNote("【{$this->getProcessName()}】进程定时任务还在处理中,暂时不再处理下一个任务"); return false; } if (!$this->isDue()) { - $this->fmtWriteInfo("【{$this->getProcessName()}】定时任务进程退出|重启中,暂时不再处理任务"); + $this->fmtWriteNote("【{$this->getProcessName()}】定时任务进程退出|重启中,暂时不再处理任务"); return false; } diff --git a/src/Worker/Cron/CronUrlProcess.php b/src/Worker/Cron/CronUrlProcess.php index a31a76d6..4a509c2f 100644 --- a/src/Worker/Cron/CronUrlProcess.php +++ b/src/Worker/Cron/CronUrlProcess.php @@ -13,6 +13,7 @@ use Swoolefy\Core\Crontab\CrontabManager; use Common\Library\HttpClient\CurlHttpClient; +use Swoolefy\Worker\Dto\CronUrlTaskMetaDto; class CronUrlProcess extends CronProcess { @@ -31,8 +32,22 @@ public function onInit() */ public function run() { - parent::run(); - $this->runCronTask(); + try { + parent::run(); + $this->runCronTask(); + }catch (\Throwable $throwable) { + $context = [ + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + 'message' => $throwable->getMessage(), + 'code' => $throwable->getCode(), + "reboot_count" => $this->getRebootCount(), + 'trace' => $throwable->getTraceAsString(), + ]; + parent::onHandleException($throwable, $context); + sleep(2); + $this->reboot(); + } } /** @@ -44,25 +59,45 @@ protected function registerCronTask(array $taskList) if(!empty($taskList)) { foreach($taskList as $task) { try { - $isNewAddFlag = $this->isNewAddTask($task['cron_name']); + $scheduleUrlTask = CronUrlTaskMetaDto::load($task); + $isNewAddFlag = $this->isNewAddTask($scheduleUrlTask->cron_name); if ($isNewAddFlag) { - CrontabManager::getInstance()->addRule($task['cron_name'], $task['cron_expression'], function ($expression, $cron_name) use($task) { + CrontabManager::getInstance()->addRule($task['cron_name'], $task['cron_expression'], function ($expression, $cron_name) use($scheduleUrlTask) { + if (is_array($scheduleUrlTask->before_callback) && count($scheduleUrlTask->before_callback) == 2) { + list($class, $action) = $scheduleUrlTask->before_callback; + (new $class)->{$action}($scheduleUrlTask); + }else if ($scheduleUrlTask->before_callback instanceof \Closure) { + $res = call_user_func($scheduleUrlTask->before_callback, $scheduleUrlTask); + if ($res === false) { + $this->fmtWriteNote("cron_name=$cron_name task meta of before_callback return false, stop cron task"); + return false; + } + } + $httpClient = new CurlHttpClient(); - $httpClient->setOptionArray($task['options'] ?? []); - $httpClient->setHeaderArray($task['headers'] ?? []); - $method = strtolower($task['method']); + $httpClient->setOptionArray($scheduleUrlTask->options ?? []); + $httpClient->setHeaderArray($scheduleUrlTask->headers ?? []); + $method = strtolower($scheduleUrlTask->method); $rawResponse = $httpClient->{$method}( - $task['url'], - $task['params'] ?? [], - $task['connect_time_out'] ?? 5, - $task['curl_time_out'] ?? 10, + $scheduleUrlTask->url, + $scheduleUrlTask->params ?? [], + $scheduleUrlTask->connect_time_out ?? 30, + $scheduleUrlTask->request_time_out ?? 60, ); - if (isset($task['callback']) && is_array($task['callback']) && count($task['callback']) == 2) { - list($class, $action) = $task['callback']; - (new $class)->{$action}($rawResponse, $task); - } else if (isset($task['callback']) && $task['callback'] instanceof \Closure) { - call_user_func($task['callback'], $rawResponse, $task); + if (is_array($scheduleUrlTask->response_callback) && count($scheduleUrlTask->response_callback) == 2) { + list($class, $action) = $scheduleUrlTask->response_callback; + (new $class)->{$action}($rawResponse, $scheduleUrlTask); + }else if ($scheduleUrlTask->response_callback instanceof \Closure) { + call_user_func($scheduleUrlTask->response_callback, $rawResponse, $scheduleUrlTask); + } + + + if (is_array($scheduleUrlTask->after_callback) && count($scheduleUrlTask->after_callback) == 2) { + list($class, $action) = $scheduleUrlTask->after_callback; + (new $class)->{$action}($scheduleUrlTask); + }else if ($scheduleUrlTask->after_callback instanceof \Closure) { + call_user_func($scheduleUrlTask->after_callback, $scheduleUrlTask); } }); } @@ -70,9 +105,9 @@ protected function registerCronTask(array $taskList) $this->onHandleException($throwable, $task); } } - - // 解除已暂停的定时任务 - $this->unregisterCronTask($taskList); } + + // 解除已暂停的定时任务 + $this->unregisterCronTask($taskList); } } \ No newline at end of file diff --git a/src/Worker/Dto/CronUrlTaskMetaDto.php b/src/Worker/Dto/CronUrlTaskMetaDto.php new file mode 100644 index 00000000..6ce4658c --- /dev/null +++ b/src/Worker/Dto/CronUrlTaskMetaDto.php @@ -0,0 +1,98 @@ + $value) { + $scheduleTask->$property = $value; + } + return $scheduleTask; + } +} \ No newline at end of file diff --git a/src/Worker/MainManager.php b/src/Worker/MainManager.php index da270aba..f441fc0e 100644 --- a/src/Worker/MainManager.php +++ b/src/Worker/MainManager.php @@ -1704,11 +1704,7 @@ private function getMaxProcessNum() */ public static function loadWorkerConf(string $confPath) { - if (function_exists('customLoadWorkerConf')) { - $conf = customLoadWorkerConf($confPath); - }else { - $conf = self::defaultLoadWorkerConf($confPath); - } + $conf = self::defaultLoadWorkerConf($confPath); return $conf; } diff --git a/src/Worker/Traits/SystemTrait.php b/src/Worker/Traits/SystemTrait.php index 1cd5477c..7635db61 100644 --- a/src/Worker/Traits/SystemTrait.php +++ b/src/Worker/Traits/SystemTrait.php @@ -89,7 +89,31 @@ protected function inChildrenProcessEnv(): bool */ protected function fmtWriteInfo($msg) { - initConsoleStyleIo()->write("$msg", true); + initConsoleStyleIo()->info($msg); + $this->writeLog($msg); + } + + /** + * 系统内部日志 + * + * @param $msg + * @return void + */ + protected function fmtWriteNote($msg) + { + initConsoleStyleIo()->note($msg); + $this->writeLog($msg); + } + + /** + * 系统内部日志 + * + * @param $msg + * @return void + */ + protected function fmtWriteWarning($msg) + { + initConsoleStyleIo()->warning($msg); $this->writeLog($msg); } @@ -101,7 +125,7 @@ protected function fmtWriteInfo($msg) */ protected function fmtWriteError($msg) { - initConsoleStyleIo()->write("$msg", true); + initConsoleStyleIo()->error($msg); $this->writeLog($msg); }