Skip to content

Commit

Permalink
CRM_Queue_TaskRunner - Add background-friendly handler for `CRM_Queue…
Browse files Browse the repository at this point in the history
…_Task`s
  • Loading branch information
totten committed Jun 2, 2022
1 parent 0374a8c commit 3eaa853
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 3 deletions.
22 changes: 21 additions & 1 deletion CRM/Queue/BAO/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/**
* Track a list of known queues.
*/
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Core\HookInterface {

/**
* Get a list of valid statuses.
Expand Down Expand Up @@ -76,4 +76,24 @@ public static function getTypes($context = NULL) {
];
}

/**
* Queues which contain `CRM_Queue_Task` records should use the `task` runner to evaluate them.
*
* @code
* $q = Civi::queue('do-stuff', ['type' => 'Sql', 'runner' => 'task']);
* $q->createItem(new CRM_Queue_Task('my_callback_func', [1,2,3]));
* @endCode
*
* @param \CRM_Queue_Queue $queue
* @param array $items
* @param array $outcomes
* @throws \API_Exception
* @see CRM_Utils_Hook::queueRun()
*/
public static function hook_civicrm_queueRun_task(CRM_Queue_Queue $queue, array $items, array &$outcomes) {
foreach ($items as $itemPos => $item) {
$outcomes[$itemPos] = (new \CRM_Queue_TaskRunner())->run($queue, $item);
}
}

}
4 changes: 2 additions & 2 deletions CRM/Queue/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* This is used by some CLI upgrades.
*
* This runner is not appropriate for all queues or workloads, so you might choose or create
* a different runner. For example, `CRM_Queue_Autorunner` is geared toward background task lists.
* a different runner. For example, `CRM_Queue_TaskRunner` is geared toward background task lists.
*
* @see CRM_Queue_Autorunner
* @see CRM_Queue_TaskRunner
*/
class CRM_Queue_Runner {

Expand Down
132 changes: 132 additions & 0 deletions CRM/Queue/TaskRunner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<?php
/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

/**
* `CRM_Queue_TaskRunner` a list tasks from a queue. It is designed to supported background
* tasks which run automatically.
*
* This runner is not appropriate for all queues or workloads, so you might choose or create
* a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists.
*
* @see CRM_Queue_Runner
*/
class CRM_Queue_TaskRunner {

/**
* @param \CRM_Queue_Queue $queue
* @param $item
* @return string
* One of the following:
* - 'ok': Task executed normally. Removed from queue.
* - 'retry': Task encountered an error. Will try again later.
* - 'delete': Task encountered an error. Will not try again later. Removed from queue.
* - 'abort': Task encountered an error. Will not try again later. Stopped the queue.
* @throws \API_Exception
*/
public function run(CRM_Queue_Queue $queue, $item): string {
$this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.');

/** @var \CRM_Queue_Task $task */
$task = $item->data;

/** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */

if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) {
\Civi::log()->debug("Skipping exhausted task: " . $task->title);
$outcome = $queue->getSpec('error');
$exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted');
}
else {
\Civi::log()->debug("Running task: " . $task->title);
try {
$runResult = $task->run($this->createContext($queue));
$outcome = $runResult ? 'ok' : $queue->getSpec('error');
$exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false');
}
catch (\Exception $e) {
$outcome = $queue->getSpec('error');
$exception = $e;
}

if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) {
$outcome = 'retry';
}
}

if ($outcome !== 'ok') {
\CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception);
}

if ($outcome === 'ok') {
$queue->deleteItem($item);
return $outcome;
}

$logDetails = [
'id' => $queue->getName() . '#' . $item->id,
'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']),
'outcome' => $outcome,
'message' => $exception ? $exception->getMessage() : NULL,
'exception' => $exception,
];

switch ($outcome) {
case 'retry':
\Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails);
$queue->releaseItem($item);
break;

case 'delete':
\Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails);
$queue->deleteItem($item);
break;

case 'abort':
\Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails);
$queue->setStatus('aborted');
$queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */
break;

default:
\Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails);
break;
}

return $outcome;
}

/**
* @param \CRM_Queue_Queue $queue
* return CRM_Queue_TaskContext;
*/
private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext {
$taskCtx = new \CRM_Queue_TaskContext();
$taskCtx->queue = $queue;
$taskCtx->log = \CRM_Core_Error::createDebugLogger();
return $taskCtx;
}

private function assertType($object, array $types, string $message) {
foreach ($types as $type) {
if ($object instanceof $type) {
return;
}
}
throw new \Exception($message);
}

private function isRetriable(\CRM_Queue_Queue $queue, $item): bool {
return property_exists($item, 'run_count')
&& is_numeric($queue->getSpec('retry_limit'))
&& $queue->getSpec('retry_limit') + 1 > $item->run_count;
}

}
56 changes: 56 additions & 0 deletions CRM/Utils/Hook.php
Original file line number Diff line number Diff line change
Expand Up @@ -2727,6 +2727,62 @@ public static function alterEntityRefParams(&$params, $formName) {
);
}

/**
* Fire `hook_civicrm_queueRun_{$runner}`.
*
* This event only fires if these conditions are met:
*
* 1. The `$queue` has been persisted in `civicrm_queue`.
* 2. The `$queue` has a `runner` property.
* 3. The `$queue` has some pending tasks.
* 4. The system has a queue-running agent.
*
* @param \CRM_Queue_Queue $queue
* @param array $items
* List of claimed items which we may evaluate.
* @param array $outcomes
* The outcomes of each task. One of 'ok', 'retry', 'fail'.
* Keys should match the keys in $items.
*/
public static function queueRun(CRM_Queue_Queue $queue, array $items, &$outcomes) {
$runner = $queue->getSpec('runner');
if (empty($runner) || !preg_match(';^[A-Za-z0-9_]+$;', $runner)) {
throw new \CRM_Core_Exception("Cannot autorun queue: " . $queue->getName());
}
return self::singleton()->invoke(['queue', 'items', 'outcomes'], $queue, $items,
$outcomes, $exception, self::$_nullObject, self::$_nullObject,
'civicrm_queueRun_' . $runner
);
}

/**
* This is called if automatic execution of a queue-task fails.
*
* The `$outcome` may be modified. For example, you might inspect the $item and $exception -- and then
* decide whether to 'retry', 'delete', or 'abort'.
*
* @param \CRM_Queue_Queue $queue
* @param \CRM_Queue_DAO_QueueItem|\stdClass $item
* The enqueued item $item.
* In principle, this is the $item format determined by the queue, which includes `id` and `data`.
* In practice, it is typically an instance of `CRM_Queue_DAO_QueueItem`.
* @param string $outcome
* The outcome of the task. Legal values:
* - 'retry': The task encountered a problem, and it should be retried.
* - 'delete': The task encountered a non-recoverable problem, and it should be deleted.
* - 'abort': The task encountered a non-recoverable problem, and the queue should be stopped.
* - 'ok': The task finished normally. (You won't generally see this, but it could be useful in some customizations.)
* The default outcome for task-errors is determined by the queue settings (`civicrm_queue.error`).
* @param \Throwable|null $exception
* If the task failed, this is the cause of the failure.
*/
public static function queueTaskError(CRM_Queue_Queue $queue, $item, &$outcome, ?Throwable $exception) {
return self::singleton()->invoke(['job', 'params'], $queue, $item,
$outcome, $exception, self::$_nullObject, self::$_nullObject,
'civicrm_queueTaskError'
);
}

/**
* This hook is called before a scheduled job is executed
*
Expand Down

0 comments on commit 3eaa853

Please sign in to comment.