Skip to content

Commit

Permalink
Civi\Api4\Queue - Add APIs for claiming and running enqueued tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
totten committed Jun 2, 2022
1 parent 3eaa853 commit 8369e17
Show file tree
Hide file tree
Showing 4 changed files with 648 additions and 0 deletions.
90 changes: 90 additions & 0 deletions Civi/Api4/Action/Queue/ClaimItems.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

namespace Civi\Api4\Action\Queue;

use Civi\Api4\Generic\Traits\SelectParamTrait;

/**
* Claim an item from the queue. Returns zero or one items.
*
* @method ?string setQueue
* @method $this setQueue(?string $queue)
*/
class ClaimItems extends \Civi\Api4\Generic\AbstractAction {

use SelectParamTrait;

/**
* Name of the target queue.
*
* @var string|null
*/
protected $queue;

public function _run(\Civi\Api4\Generic\Result $result) {
$this->select = empty($this->select) ? ['id', 'data', 'queue'] : $this->select;
$queue = $this->queue();
if (!$queue->isActive()) {
return;
}

$isBatch = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface;
$limit = $queue->getSpec('batch_limit') ?: 1;
if ($limit > 1 && !$isBatch) {
throw new \API_Exception(sprintf('Queue "%s" (%s) does not support batching.', $queue->getName(), get_class($queue)));
// Note 1: Simply looping over `claimItem()` is unlikley to help the consumer b/c
// drivers like Sql+Memory are linear+blocking.
// Note 2: The default is batch_limit=1. So someone has specifically chosen an invalid configuration...
}
$items = $isBatch ? $queue->claimItems($limit) : [$queue->claimItem()];

foreach ($items as $item) {
if ($item) {
$result[] = $this->convertItemToStub($item);
}
}
}

/**
* @param \CRM_Queue_DAO_QueueItem|\stdClass $item
* @return array
*/
protected function convertItemToStub(object $item): array {
$array = [];
foreach ($this->select as $field) {
switch ($field) {
case 'id':
$array['id'] = $item->id;
break;

case 'data':
$array['data'] = (array) $item->data;
break;

case 'queue':
$array['queue'] = $this->queue;
break;

}
}
return $array;
}

protected function queue(): \CRM_Queue_Queue {
if (empty($this->queue)) {
throw new \API_Exception('Missing required parameter: $queue');
}
return \Civi::queue($this->queue);
}

}
119 changes: 119 additions & 0 deletions Civi/Api4/Action/Queue/RunItems.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<?php

namespace Civi\Api4\Action\Queue;

/**
* Run an enqueued item (task).
*
* You must either:
*
* - (a) Give the target queue-item specifically (`setItem()`). Useful if you called `claimItem()` separately.
* - (b) Give the name of the queue from which to find an item (`setQueue()`).
*
* Note: If you use `setItem()`, the inputted will be validated (refetched) to ensure authenticity of all details.
*
* Returns 0 or 1 records which indicate the outcome of running the chosen task.
*
* ```php
* $todo = Civi\Api4\Queue::claimItem()->setQueue($item)->setLeaseTime(600)->execute()->single();
* $result = Civi\Api4\Queue::runItem()->setItem($todo)->execute()->single();
* assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
*
* $result = Civi\Api4\Queue::runItem()->setQueue('foo')->execute()->first();
* assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
* ```
*
* Valid outcomes are:
* - 'ok': Task executed normally. Removed from queue.
* - 'retry': Task encountered an error. Will try again later.
* - 'fail': Task encountered an error. Will not try again later. Removed from queue.
*
* @method $this setItem(?array $item)
* @method ?array getItem()
* @method ?string setQueue
* @method $this setQueue(?string $queue)
*/
class RunItems extends \Civi\Api4\Generic\AbstractAction {

/**
* Previously claimed item - which should now be released.
*
* @var array|null
* Fields: {id: scalar, queue: string}
*/
protected $items;

/**
* Name of the target queue.
*
* @var string|null
*/
protected $queue;

public function _run(\Civi\Api4\Generic\Result $result) {
if (!empty($this->items)) {
$this->validateItemStubs();
$queue = \Civi::queue($this->items[0]['queue']);
$ids = \CRM_Utils_Array::collect('id', $this->items);
if (count($ids) > 1 && !($queue instanceof \CRM_Queue_Queue_BatchQueueInterface)) {
throw new \API_Exception("runItems: Error: Running multiple items requires BatchQueueInterface");
}
if (count($ids) > 1) {
$items = $queue->fetchItems($ids);
}
else {
$items = [$queue->fetchItem($ids[0])];
}
}
elseif (!empty($this->queue)) {
$queue = \Civi::queue($this->queue);
if (!$queue->isActive()) {
return;
}
$items = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface
? $queue->claimItems($queue->getSpec('batch_limit') ?: 1)
: [$queue->claimItem()];
}
else {
throw new \API_Exception("runItems: Requires either 'queue' or 'item'.");
}

if (empty($items)) {
return;
}

$outcomes = [];
\CRM_Utils_Hook::queueRun($queue, $items, $outcomes);
if (empty($outcomes)) {
throw new \API_Exception(sprintf('Failed to run queue items (name=%s, runner=%s, itemCount=%d, outcomeCount=%d)',
$queue->getName(), $queue->getSpec('runner'), count($items), count($outcomes)));
}
foreach ($items as $itemPos => $item) {
$result[] = ['outcome' => $outcomes[$itemPos], 'item' => $this->createItemStub($item)];
}
}

private function validateItemStubs(): void {
$queueNames = [];
if (!isset($this->items[0])) {
throw new \API_Exception("Queue items must be given as numeric array.");
}
foreach ($this->items as $item) {
if (empty($item['queue'])) {
throw new \API_Exception("Queue item requires property 'queue'.");
}
if (empty($item['id'])) {
throw new \API_Exception("Queue item requires property 'id'.");
}
$queueNames[$item['queue']] = 1;
}
if (count($queueNames) > 1) {
throw new \API_Exception("Queue items cannot be mixed. Found queues: " . implode(', ', array_keys($queueNames)));
}
}

private function createItemStub($item): array {
return ['id' => $item->id, 'queue' => $item->queue_name];
}

}
30 changes: 30 additions & 0 deletions Civi/Api4/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
*/
namespace Civi\Api4;

use Civi\Api4\Action\Queue\ClaimItems;
use Civi\Api4\Action\Queue\RunItems;

/**
* Track a list of durable/scannable queues.
*
Expand All @@ -31,7 +34,34 @@ public static function permissions() {
return [
'meta' => ['access CiviCRM'],
'default' => ['administer queues'],
'runItem' => [\CRM_Core_Permission::ALWAYS_DENY_PERMISSION],
];
}

/**
* Claim an item from the queue. Returns zero or one items.
*
* Note: This is appropriate for persistent, auto-run queues.
*
* @param bool $checkPermissions
* @return \Civi\Api4\Action\Queue\ClaimItems
*/
public static function claimItems($checkPermissions = TRUE) {
return (new ClaimItems(static::getEntityName(), __FUNCTION__))
->setCheckPermissions($checkPermissions);
}

/**
* Run an item from the queue.
*
* Note: This is appropriate for persistent, auto-run queues.
*
* @param bool $checkPermissions
* @return \Civi\Api4\Action\Queue\RunItems
*/
public static function runItems($checkPermissions = TRUE) {
return (new RunItems(static::getEntityName(), __FUNCTION__))
->setCheckPermissions($checkPermissions);
}

}
Loading

0 comments on commit 8369e17

Please sign in to comment.