Skip to content

Commit

Permalink
feat:optimize cron schedule code
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Jan 23, 2025
1 parent 292bac9 commit 60f4b33
Show file tree
Hide file tree
Showing 17 changed files with 270 additions and 88 deletions.
6 changes: 4 additions & 2 deletions Test/WorkerCron/CurlQuery/RemoteUrl.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace Test\WorkerCron\CurlQuery;

use Common\Library\HttpClient\RawResponse;
use Swoolefy\Worker\Dto\CronUrlTaskMetaDto;

class RemoteUrl {

Expand All @@ -11,8 +12,9 @@ class RemoteUrl {
* @param RawResponse $response
* @return void
*/
public function handle(RawResponse $response) {
public function handle(RawResponse $response, CronUrlTaskMetaDto $taskMetaDto)
{
// 做简单的响应处理
var_dump($response->getHeaders());
var_dump($response->getDecodeBody());
}
}
8 changes: 3 additions & 5 deletions Test/WorkerCron/LocalOrder/LocalOrderHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ public function doCronTask($cron, string $cronName)
// var_dump(env('HOST_PASSWORD'));
// var_dump(Swfy::getConf()['bjg']);
SystemEnv::clearEnvRepository();
RunLog::info("this is a cron test log");
RunLog::error("this is a cron test log");
// RunLog::info("this is a cron test log");
// RunLog::error("this is a cron test log");
var_dump("cron start");
sleep(3);
sleep(20);
var_dump(SystemEnv::get('WEB_SITE_HOST'));


sleep(60);
//AbstractBaseWorker::getProcessInstance()->reboot(3);
var_dump("cron end");

Expand Down
File renamed without changes.
14 changes: 10 additions & 4 deletions Test/WorkerCron/conf/remote_task.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@
[
'cron_name' => 'send message', // 发送短信
'cron_expression' => 10, // 10s执行一次
'url' => 'http://www.baidu.com',
//'cron_expression' => '*/1 * * * *', // 每分钟执行一次
'url' => 'http://127.0.0.1:9501/index/index',
'method' => 'get',
'connect_time_out' => 10, //连接对方主机最长等待时间
'curl_time_out' => 15, // 整个请求最长等待总时间,要比connection_time_out大
'request_time_out' => 15, // 整个请求最长等待总时间,要比connection_time_out大
'options' => [], // curl option
'headers' => [], // 请求头
'params' => [], // post参数
// 'callback' => function(RawResponse $response) {
// (new \Test\WorkerCron\CurlQuery\RemoteUrl())->handle($response);
// },
'callback' => [\Test\WorkerCron\CurlQuery\RemoteUrl::class, 'handle'],
//'cron_expression' => '*/1 * * * *', // 每分钟执行一次
'before_callback' => function() {
var_dump('before_callback');
},
'response_callback' => [\Test\WorkerCron\CurlQuery\RemoteUrl::class, 'handle'],
'after_callback' => function() {
var_dump('after_callback');
},
]
];
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// 动态定时任务列表,可以存在数据库中
'task_list' => function () {
//$list1 = include __DIR__ . '/schedule_task.php';
//$list1 = include __DIR__ . '/fork_task.php';
$list2 = Kernel::buildScheduleTaskList(Kernel::schedule());
return array_merge($list1 ?? [], $list2 ?? []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
// ],

// 独立进程本地处理任务
// [
// 'process_name' => 'test-local-cron-worker11', // 进程名称
// 'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class,
// 'worker_num' => 2, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 3600, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => [
// 'cron_name' => 'cancel-order', // 定时任务名称
// 'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类
// //'cron_expression' => '*/1 * * * *', // 每分钟执行一次
// 'cron_expression' => 10, // 10s执行一次
// ],
// ],
[
'process_name' => 'test-local-cron-worker11', // 进程名称
'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class,
'worker_num' => 2, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => [
'cron_name' => 'cancel-order', // 定时任务名称
'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类
//'cron_expression' => '*/1 * * * *', // 每分钟执行一次
'with_block_lapping' => 1, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的
// 'run_in_background' => 1, // 后台运行,不受主进程的退出影响
'cron_expression' => 10, // 10s执行一次
],
],
];
File renamed without changes.
6 changes: 3 additions & 3 deletions Test/WorkerCron/worker_cron_conf.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

return array_merge(
include __DIR__ . "/conf/schedule_conf.php",
//include __DIR__."/conf/order_conf.php",
//include __DIR__."/conf/product_conf.php",
//include __DIR__ . "/conf/schedule_local_conf.php",
include __DIR__."/conf/schedule_url_conf.php",
//include __DIR__."/conf/schedule_fork_conf.php",
);
6 changes: 6 additions & 0 deletions Test/WorkerDaemon/PipeWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
class PipeWorkerProcess extends \Swoolefy\Worker\AbstractWorkerProcess
{

/**
* loopHandle 将循环处理
*
* @return void
* @throws \Common\Library\Exception\DbException
*/
public function loopHandle()
{
$a = 1;
Expand Down
41 changes: 21 additions & 20 deletions src/Script/MainCliScript.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ private function generateTraceId()
*/
private function saveCronScriptPidFile()
{
$cronScriptPidFile = str_replace("-","", AbstractKernel::OPTION_SCHEDULE_CRON_SCRIPT_PID_FILE);
$pidFile = $this->getOption($cronScriptPidFile);
if (SystemEnv::cronScheduleScriptModel()) {
$cronScriptPidFile = str_replace("-","", AbstractKernel::OPTION_SCHEDULE_CRON_SCRIPT_PID_FILE);
$pidFile = $this->getOption($cronScriptPidFile);
file_put_contents($pidFile, Swfy::getMasterPid());
}
}
Expand Down Expand Up @@ -219,30 +219,31 @@ public static function parseClass()
throw new SystemException("【Error】Missing cli command param. eg: --c=fixed:user:name --name=xxxx");
}

if (defined('ROOT_NAMESPACE')) {
$rootNamespace = ROOT_NAMESPACE;
$nameSpace = $rootNamespace[APP_NAME];
$nameSpace = str_replace('\\', '/', $nameSpace);
$nameSpaceArr = explode('/', trim($nameSpace, '/'));

$kernelNameSpace = array_merge($nameSpaceArr, ['Kernel']);
/**
* @var \Swoolefy\Script\AbstractKernel $kernelClass
*/
$kernelClass = implode('\\', $kernelNameSpace);
$commands = $kernelClass::getCommands() ?? [];
if (!isset($commands[$command])) {
throw new SystemException("【Error】 Kernel::commands property not defined command={$command}.");
}
$class = $commands[$command][0];
$action = $commands[$command][1];
}else {
if (!defined('ROOT_NAMESPACE')) {
throw new SystemException("【Error】script.php not defined const ROOT_NAMESPACE.");
}

$rootNamespace = ROOT_NAMESPACE;
$nameSpace = $rootNamespace[APP_NAME];
$nameSpace = str_replace('\\', '/', $nameSpace);
$nameSpaceArr = explode('/', trim($nameSpace, '/'));

$kernelNameSpace = array_merge($nameSpaceArr, ['Kernel']);
/**
* @var \Swoolefy\Script\AbstractKernel $kernelClass
*/
$kernelClass = implode('\\', $kernelNameSpace);
$commands = $kernelClass::getCommands() ?? [];
if (!isset($commands[$command])) {
throw new SystemException("【Error】 Kernel::commands property not defined command={$command}.");
}
$class = $commands[$command][0];
$action = $commands[$command][1];

if(!is_subclass_of($class, __CLASS__)) {
throw new SystemException("【Error】class={$class} bust be extended \Swoolefy\Script\MainCliScript");
}

putenv("handle_class={$class}");
putenv("a={$action}");
return $class;
Expand Down
10 changes: 8 additions & 2 deletions src/Worker/AbstractMainProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ protected function parseWorkerConf()
// 指定只启动某一个进程,开发,调试使用
// php daemon.php start Test --only=order-sync
// php cron.php start Test --only=order-sync
if(defined('WORKER_CONF_FILE')) {

if (function_exists('customLoadWorkerConf')) {
$workerConfList = customLoadWorkerConf();
}else if(defined('WORKER_CONF_FILE')) {
$mainManager = \Swoolefy\Worker\MainManager::getInstance();
$workerConfListNew = [];
$workerConfList = \Swoolefy\Worker\MainManager::loadWorkerConf(WORKER_CONF_FILE);
}

$workerConfListNew = [];
if(!empty($workerConfList)) {
// Specify Process to Run When dev or test to debug, Avoid the impact of other processes
$onlyProcess = Helper::getCliParams('only');
if ($onlyProcess) {
Expand Down
24 changes: 16 additions & 8 deletions src/Worker/Cron/CronForkProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,36 @@ class CronForkProcess extends CronProcess
*/
protected $forkType = self::FORK_TYPE_PROC_OPEN;

/**
* @var array
*/
protected $params = [];

/**
* onInit
* @return void
*/
public function onInit()
{
parent::onInit();
$this->params = $this->getArgs()['params'] ?? [];
}

/**
* run
*/
public function run()
{
parent::run();
$this->runCronTask();
try {
parent::run();
$this->runCronTask();
} catch (\Throwable $throwable) {
$context = [
'file' => $throwable->getFile(),
'line' => $throwable->getLine(),
'message' => $throwable->getMessage(),
'code' => $throwable->getCode(),
"reboot_count" => $this->getRebootCount(),
'trace' => $throwable->getTraceAsString(),
];
parent::onHandleException($throwable, $context);
sleep(2);
$this->reboot();
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/Worker/Cron/CronLocalProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public function run()
function (): bool {
// 上一个任务未执行完,下一个任务到来时不执行,返回false结束
if ($this->withBlockLapping && $this->handing) {
$this->fmtWriteInfo("{$this->getProcessName()}】进程定时任务还在处理中,暂时不再处理下一个任务");
$this->fmtWriteNote("{$this->getProcessName()}】进程定时任务还在处理中,暂时不再处理下一个任务");
return false;
}

if (!$this->isDue()) {
$this->fmtWriteInfo("{$this->getProcessName()}】定时任务进程退出|重启中,暂时不再处理任务");
$this->fmtWriteNote("{$this->getProcessName()}】定时任务进程退出|重启中,暂时不再处理任务");
return false;
}

Expand Down
73 changes: 54 additions & 19 deletions src/Worker/Cron/CronUrlProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Swoolefy\Core\Crontab\CrontabManager;
use Common\Library\HttpClient\CurlHttpClient;
use Swoolefy\Worker\Dto\CronUrlTaskMetaDto;

class CronUrlProcess extends CronProcess
{
Expand All @@ -31,8 +32,22 @@ public function onInit()
*/
public function run()
{
parent::run();
$this->runCronTask();
try {
parent::run();
$this->runCronTask();
}catch (\Throwable $throwable) {
$context = [
'file' => $throwable->getFile(),
'line' => $throwable->getLine(),
'message' => $throwable->getMessage(),
'code' => $throwable->getCode(),
"reboot_count" => $this->getRebootCount(),
'trace' => $throwable->getTraceAsString(),
];
parent::onHandleException($throwable, $context);
sleep(2);
$this->reboot();
}
}

/**
Expand All @@ -44,35 +59,55 @@ protected function registerCronTask(array $taskList)
if(!empty($taskList)) {
foreach($taskList as $task) {
try {
$isNewAddFlag = $this->isNewAddTask($task['cron_name']);
$scheduleUrlTask = CronUrlTaskMetaDto::load($task);
$isNewAddFlag = $this->isNewAddTask($scheduleUrlTask->cron_name);
if ($isNewAddFlag) {
CrontabManager::getInstance()->addRule($task['cron_name'], $task['cron_expression'], function ($expression, $cron_name) use($task) {
CrontabManager::getInstance()->addRule($task['cron_name'], $task['cron_expression'], function ($expression, $cron_name) use($scheduleUrlTask) {
if (is_array($scheduleUrlTask->before_callback) && count($scheduleUrlTask->before_callback) == 2) {
list($class, $action) = $scheduleUrlTask->before_callback;
(new $class)->{$action}($scheduleUrlTask);
}else if ($scheduleUrlTask->before_callback instanceof \Closure) {
$res = call_user_func($scheduleUrlTask->before_callback, $scheduleUrlTask);
if ($res === false) {
$this->fmtWriteNote("cron_name=$cron_name task meta of before_callback return false, stop cron task");
return false;
}
}

$httpClient = new CurlHttpClient();
$httpClient->setOptionArray($task['options'] ?? []);
$httpClient->setHeaderArray($task['headers'] ?? []);
$method = strtolower($task['method']);
$httpClient->setOptionArray($scheduleUrlTask->options ?? []);
$httpClient->setHeaderArray($scheduleUrlTask->headers ?? []);
$method = strtolower($scheduleUrlTask->method);
$rawResponse = $httpClient->{$method}(
$task['url'],
$task['params'] ?? [],
$task['connect_time_out'] ?? 5,
$task['curl_time_out'] ?? 10,
$scheduleUrlTask->url,
$scheduleUrlTask->params ?? [],
$scheduleUrlTask->connect_time_out ?? 30,
$scheduleUrlTask->request_time_out ?? 60,
);

if (isset($task['callback']) && is_array($task['callback']) && count($task['callback']) == 2) {
list($class, $action) = $task['callback'];
(new $class)->{$action}($rawResponse, $task);
} else if (isset($task['callback']) && $task['callback'] instanceof \Closure) {
call_user_func($task['callback'], $rawResponse, $task);
if (is_array($scheduleUrlTask->response_callback) && count($scheduleUrlTask->response_callback) == 2) {
list($class, $action) = $scheduleUrlTask->response_callback;
(new $class)->{$action}($rawResponse, $scheduleUrlTask);
}else if ($scheduleUrlTask->response_callback instanceof \Closure) {
call_user_func($scheduleUrlTask->response_callback, $rawResponse, $scheduleUrlTask);
}


if (is_array($scheduleUrlTask->after_callback) && count($scheduleUrlTask->after_callback) == 2) {
list($class, $action) = $scheduleUrlTask->after_callback;
(new $class)->{$action}($scheduleUrlTask);
}else if ($scheduleUrlTask->after_callback instanceof \Closure) {
call_user_func($scheduleUrlTask->after_callback, $scheduleUrlTask);
}
});
}
}catch (\Throwable $throwable) {
$this->onHandleException($throwable, $task);
}
}

// 解除已暂停的定时任务
$this->unregisterCronTask($taskList);
}

// 解除已暂停的定时任务
$this->unregisterCronTask($taskList);
}
}
Loading

0 comments on commit 60f4b33

Please sign in to comment.