Skip to content

Commit

Permalink
CRM_Queue_TaskRunner - Add background-friendly handler for `CRM_Queue…
Browse files Browse the repository at this point in the history
…_Task`s
  • Loading branch information
totten committed Mar 2, 2022
1 parent a7e1ad7 commit 211e387
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 3 deletions.
22 changes: 21 additions & 1 deletion CRM/Queue/BAO/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/**
* Track a list of known queues.
*/
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Test\HookInterface {

/**
* Get a list of valid queue types.
Expand All @@ -33,4 +33,24 @@ public static function getTypes($context = NULL) {
];
}

/**
* Queues which contain `CRM_Queue_Task` records should use the `task` runner to evaluate them.
*
* @code
* $q = Civi::queue('do-stuff', ['type' => 'Sql', 'runner' => 'task']);
* $q->createItem(new CRM_Queue_Task('my_callback_func', [1,2,3]));
* @endCode
*
* @param \CRM_Queue_Queue $queue
* @param array $items
* @param array $outcomes
* @throws \API_Exception
* @see CRM_Utils_Hook::queueRun()
*/
public static function hook_civicrm_queueRun_task(CRM_Queue_Queue $queue, array $items, array &$outcomes) {
foreach ($items as $itemPos => $item) {
$outcomes[$itemPos] = (new \CRM_Queue_TaskRunner())->run($queue, $item);
}
}

}
4 changes: 2 additions & 2 deletions CRM/Queue/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* This is used by some CLI upgrades.
*
* This runner is not appropriate for all queues or workloads, so you might choose or create
* a different runner. For example, `CRM_Queue_Autorunner` is geared toward background task lists.
* a different runner. For example, `CRM_Queue_TaskRunner` is geared toward background task lists.
*
* @see CRM_Queue_Autorunner
* @see CRM_Queue_TaskRunner
*/
class CRM_Queue_Runner {

Expand Down
108 changes: 108 additions & 0 deletions CRM/Queue/TaskRunner.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php
/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

/**
* `CRM_Queue_TaskRunner` a list tasks from a queue. It is designed to supported background
* tasks which run automatically.
*
* This runner is not appropriate for all queues or workloads, so you might choose or create
* a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists.
*
* @see CRM_Queue_Runner
*/
class CRM_Queue_TaskRunner {

/**
* @param \CRM_Queue_Queue $queue
* @param $item
* @return string
* One of the following:
* - 'ok': Task executed normally. Removed from queue.
* - 'retry': Task encountered an error. Will try again later.
* - 'fail': Task encountered an error. Will not try again later. Removed from queue.
* @throws \API_Exception
*/
public function run(CRM_Queue_Queue $queue, $item): string {
$this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.');

/** @var \CRM_Queue_Task $task */
$task = $item->data;

if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) {
\Civi::log()->debug("Skipping exhausted task: " . $task->title);
$outcome = 'fail';
$exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)));
}
else {
\Civi::log()->debug("Running task: " . $task->title);
try {
$runResult = $task->run($this->createContext($queue));
$outcome = $runResult ? 'ok' : 'fail';
$exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false');
}
catch (\Exception $e) {
$outcome = 'fail';
$exception = $e;
}

if ($outcome === 'fail' && $this->isRetriable($queue, $item)) {
$outcome = 'retry';
}
}

if ($outcome !== 'ok') {
\CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception);
}

switch ($outcome) {
case 'ok':
$queue->deleteItem($item);
break;

case 'retry':
$queue->releaseItem($item);
break;

case 'fail':
$queue->deleteItem($item);
break;
}

return $outcome;
}

/**
* @param \CRM_Queue_Queue $queue
* return CRM_Queue_TaskContext;
*/
private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext {
$taskCtx = new \CRM_Queue_TaskContext();
$taskCtx->queue = $queue;
$taskCtx->log = \CRM_Core_Error::createDebugLogger();
return $taskCtx;
}

private function assertType($object, array $types, string $message) {
foreach ($types as $type) {
if ($object instanceof $type) {
return;
}
}
throw new \Exception($message);
}

private function isRetriable(\CRM_Queue_Queue $queue, $item): bool {
return property_exists($item, 'run_count')
&& is_numeric($queue->getSpec('retry_limit'))
&& $queue->getSpec('retry_limit') + 1 > $item->run_count;
}

}
49 changes: 49 additions & 0 deletions CRM/Utils/Hook.php
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,55 @@ public static function alterEntityRefParams(&$params, $formName) {
);
}

/**
* Fire `hook_civicrm_queueRun_{$runner}`.
*
* This event only fires if these conditions are met:
*
* 1. The `$queue` has been persisted in `civicrm_queue`.
* 2. The `$queue` has a `runner` property.
* 3. The `$queue` has some pending tasks.
* 4. The system has a queue-running agent.
*
* @param \CRM_Queue_Queue $queue
* @param array $items
* List of claimed items which we may evaluate.
* @param array $outcomes
* The outcomes of each task. One of 'ok', 'retry', 'fail'.
* Keys should match the keys in $items.
*/
public static function queueRun(CRM_Queue_Queue $queue, array $items, &$outcomes) {
$runner = $queue->getSpec('runner');
if (empty($runner) || !preg_match(';^[A-Za-z0-9_]+$;', $runner)) {
throw new \CRM_Core_Exception("Cannot autorun queue: " . $queue->getName());
}
return self::singleton()->invoke(['queue', 'items', 'outcomes'], $queue, $items,
$outcomes, $exception, self::$_nullObject, self::$_nullObject,
'civicrm_queueRun_' . $runner
);
}

/**
* This is called if automatic execution of a queue-task fails.
*
* @param \CRM_Queue_Queue $queue
* @param \CRM_Queue_DAO_QueueItem|\stdClass $item
* The enqeued item $item.
* In principle, this is the $item format determined by the queue, which includes `id` and `data`.
* In practice, it is typically an instance of `CRM_Queue_DAO_QueueItem`.
* @param string $outcome
* The outcome of the task. One of 'ok', 'retry', 'fail'.
* The outcome may be altered - eg to flip between 'retry' or 'fail'.
* @param \Throwable|null $exception
* If the task failed, this is the cause of the failure.
*/
public static function queueTaskError(CRM_Queue_Queue $queue, $item, &$outcome, ?Throwable $exception) {
return self::singleton()->invoke(['job', 'params'], $queue, $item,
$outcome, $exception, self::$_nullObject, self::$_nullObject,
'civicrm_queueTaskError'
);
}

/**
* This hook is called before a scheduled job is executed
*
Expand Down

0 comments on commit 211e387

Please sign in to comment.