From 3eaa853f566bbcbea2708db15ec3c542efdc8ded Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Wed, 2 Feb 2022 00:29:13 -0800 Subject: [PATCH] CRM_Queue_TaskRunner - Add background-friendly handler for `CRM_Queue_Task`s --- CRM/Queue/BAO/Queue.php | 22 ++++++- CRM/Queue/Runner.php | 4 +- CRM/Queue/TaskRunner.php | 132 +++++++++++++++++++++++++++++++++++++++ CRM/Utils/Hook.php | 56 +++++++++++++++++ 4 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 CRM/Queue/TaskRunner.php diff --git a/CRM/Queue/BAO/Queue.php b/CRM/Queue/BAO/Queue.php index 55175e32bd94..35f8be2b9d5e 100644 --- a/CRM/Queue/BAO/Queue.php +++ b/CRM/Queue/BAO/Queue.php @@ -18,7 +18,7 @@ /** * Track a list of known queues. */ -class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue { +class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Core\HookInterface { /** * Get a list of valid statuses. @@ -76,4 +76,24 @@ public static function getTypes($context = NULL) { ]; } + /** + * Queues which contain `CRM_Queue_Task` records should use the `task` runner to evaluate them. + * + * @code + * $q = Civi::queue('do-stuff', ['type' => 'Sql', 'runner' => 'task']); + * $q->createItem(new CRM_Queue_Task('my_callback_func', [1,2,3])); + * @endCode + * + * @param \CRM_Queue_Queue $queue + * @param array $items + * @param array $outcomes + * @throws \API_Exception + * @see CRM_Utils_Hook::queueRun() + */ + public static function hook_civicrm_queueRun_task(CRM_Queue_Queue $queue, array $items, array &$outcomes) { + foreach ($items as $itemPos => $item) { + $outcomes[$itemPos] = (new \CRM_Queue_TaskRunner())->run($queue, $item); + } + } + } diff --git a/CRM/Queue/Runner.php b/CRM/Queue/Runner.php index 4985bda7680e..ee69cb63106b 100644 --- a/CRM/Queue/Runner.php +++ b/CRM/Queue/Runner.php @@ -26,9 +26,9 @@ * This is used by some CLI upgrades. * * This runner is not appropriate for all queues or workloads, so you might choose or create - * a different runner. For example, `CRM_Queue_Autorunner` is geared toward background task lists. + * a different runner. For example, `CRM_Queue_TaskRunner` is geared toward background task lists. * - * @see CRM_Queue_Autorunner + * @see CRM_Queue_TaskRunner */ class CRM_Queue_Runner { diff --git a/CRM/Queue/TaskRunner.php b/CRM/Queue/TaskRunner.php new file mode 100644 index 000000000000..002e93b2ae35 --- /dev/null +++ b/CRM/Queue/TaskRunner.php @@ -0,0 +1,132 @@ +assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.'); + + /** @var \CRM_Queue_Task $task */ + $task = $item->data; + + /** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */ + + if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) { + \Civi::log()->debug("Skipping exhausted task: " . $task->title); + $outcome = $queue->getSpec('error'); + $exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted'); + } + else { + \Civi::log()->debug("Running task: " . $task->title); + try { + $runResult = $task->run($this->createContext($queue)); + $outcome = $runResult ? 'ok' : $queue->getSpec('error'); + $exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false'); + } + catch (\Exception $e) { + $outcome = $queue->getSpec('error'); + $exception = $e; + } + + if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) { + $outcome = 'retry'; + } + } + + if ($outcome !== 'ok') { + \CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception); + } + + if ($outcome === 'ok') { + $queue->deleteItem($item); + return $outcome; + } + + $logDetails = [ + 'id' => $queue->getName() . '#' . $item->id, + 'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']), + 'outcome' => $outcome, + 'message' => $exception ? $exception->getMessage() : NULL, + 'exception' => $exception, + ]; + + switch ($outcome) { + case 'retry': + \Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails); + $queue->releaseItem($item); + break; + + case 'delete': + \Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails); + $queue->deleteItem($item); + break; + + case 'abort': + \Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails); + $queue->setStatus('aborted'); + $queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */ + break; + + default: + \Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails); + break; + } + + return $outcome; + } + + /** + * @param \CRM_Queue_Queue $queue + * return CRM_Queue_TaskContext; + */ + private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext { + $taskCtx = new \CRM_Queue_TaskContext(); + $taskCtx->queue = $queue; + $taskCtx->log = \CRM_Core_Error::createDebugLogger(); + return $taskCtx; + } + + private function assertType($object, array $types, string $message) { + foreach ($types as $type) { + if ($object instanceof $type) { + return; + } + } + throw new \Exception($message); + } + + private function isRetriable(\CRM_Queue_Queue $queue, $item): bool { + return property_exists($item, 'run_count') + && is_numeric($queue->getSpec('retry_limit')) + && $queue->getSpec('retry_limit') + 1 > $item->run_count; + } + +} diff --git a/CRM/Utils/Hook.php b/CRM/Utils/Hook.php index 76c3cd004751..d2fb744c8843 100644 --- a/CRM/Utils/Hook.php +++ b/CRM/Utils/Hook.php @@ -2727,6 +2727,62 @@ public static function alterEntityRefParams(&$params, $formName) { ); } + /** + * Fire `hook_civicrm_queueRun_{$runner}`. + * + * This event only fires if these conditions are met: + * + * 1. The `$queue` has been persisted in `civicrm_queue`. + * 2. The `$queue` has a `runner` property. + * 3. The `$queue` has some pending tasks. + * 4. The system has a queue-running agent. + * + * @param \CRM_Queue_Queue $queue + * @param array $items + * List of claimed items which we may evaluate. + * @param array $outcomes + * The outcomes of each task. One of 'ok', 'retry', 'fail'. + * Keys should match the keys in $items. + */ + public static function queueRun(CRM_Queue_Queue $queue, array $items, &$outcomes) { + $runner = $queue->getSpec('runner'); + if (empty($runner) || !preg_match(';^[A-Za-z0-9_]+$;', $runner)) { + throw new \CRM_Core_Exception("Cannot autorun queue: " . $queue->getName()); + } + return self::singleton()->invoke(['queue', 'items', 'outcomes'], $queue, $items, + $outcomes, $exception, self::$_nullObject, self::$_nullObject, + 'civicrm_queueRun_' . $runner + ); + } + + /** + * This is called if automatic execution of a queue-task fails. + * + * The `$outcome` may be modified. For example, you might inspect the $item and $exception -- and then + * decide whether to 'retry', 'delete', or 'abort'. + * + * @param \CRM_Queue_Queue $queue + * @param \CRM_Queue_DAO_QueueItem|\stdClass $item + * The enqueued item $item. + * In principle, this is the $item format determined by the queue, which includes `id` and `data`. + * In practice, it is typically an instance of `CRM_Queue_DAO_QueueItem`. + * @param string $outcome + * The outcome of the task. Legal values: + * - 'retry': The task encountered a problem, and it should be retried. + * - 'delete': The task encountered a non-recoverable problem, and it should be deleted. + * - 'abort': The task encountered a non-recoverable problem, and the queue should be stopped. + * - 'ok': The task finished normally. (You won't generally see this, but it could be useful in some customizations.) + * The default outcome for task-errors is determined by the queue settings (`civicrm_queue.error`). + * @param \Throwable|null $exception + * If the task failed, this is the cause of the failure. + */ + public static function queueTaskError(CRM_Queue_Queue $queue, $item, &$outcome, ?Throwable $exception) { + return self::singleton()->invoke(['job', 'params'], $queue, $item, + $outcome, $exception, self::$_nullObject, self::$_nullObject, + 'civicrm_queueTaskError' + ); + } + /** * This hook is called before a scheduled job is executed *