Skip to content

Commit

Permalink
Merge pull request #24395 from totten/master-release-time
Browse files Browse the repository at this point in the history
Queue API - Expose option for delayed tasks (`$options['release_time']`)
  • Loading branch information
eileenmcnaughton authored Aug 27, 2022
2 parents 8fdcb4d + 5891a2a commit 27d5300
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions CRM/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ abstract public function existsQueue();
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
* Ex: ['release_time' => strtotime('+3 hours')]
*/
abstract public function createItem($data, $options = []);

Expand Down
4 changes: 4 additions & 0 deletions CRM/Queue/Queue/Memory.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,16 @@ public function existsQueue() {
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
* Ex: ['release_time' => strtotime('+3 hours')]
*/
public function createItem($data, $options = []) {
$id = $this->nextQueueItemId++;
// force copy, no unintendedsharing effects from pointers
$this->items[$id] = serialize($data);
$this->runCounts[$id] = 0;
if (isset($options['release_time'])) {
$this->releaseTimes[$id] = $options['release_time'];
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions CRM/Queue/Queue/SqlTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,17 @@ public function numberOfItems() {
* @param array $options
* Queue-dependent options; for example, if this is a
* priority-queue, then $options might specify the item's priority.
* Ex: ['release_time' => strtotime('+3 hours')]
*/
public function createItem($data, $options = []) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
$dao->data = serialize($data);
$dao->weight = CRM_Utils_Array::value('weight', $options, 0);
if (isset($options['release_time'])) {
$dao->release_time = date('Y-m-d H:i:s', $options['release_time']);
}
$dao->save();
}

Expand Down
33 changes: 33 additions & 0 deletions tests/phpunit/api/v4/Entity/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,39 @@ public function testEmptyPoll() {
$this->assertEquals(0, $startResult->count());
}

public function getDelayableDrivers(): array {
return [
'Sql' => [['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']],
'SqlParallel' => [['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']],
'Memory' => [['type' => 'Memory', 'runner' => 'task', 'error' => 'delete']],
];
}

/**
* @dataProvider getDelayableDrivers
*/
public function testDelayedStart(array $queueSpec) {
$queueName = 'QueueTest_' . md5(random_bytes(32)) . '_delayed';
$queue = \Civi::queue($queueName, $queueSpec);
$this->assertEquals(0, $queue->numberOfItems());

$releaseTime = \CRM_Utils_Time::strtotime('+3 seconds');
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
['itwillstartanymomentnow']
), ['release_time' => $releaseTime]);
$this->assertEquals(1, $queue->numberOfItems());

// Not available... yet...
$claim1 = $queue->claimItem();
$this->assertEquals(NULL, $claim1);

// OK, it'll come in a few seconds...
$claim2 = $this->waitForClaim(0.5, 6, $queueName);
$this->assertEquals('itwillstartanymomentnow', $claim2['data']['arguments'][0]);
$this->assertTrue(\CRM_Utils_Time::time() >= $releaseTime);
}

public function getErrorModes(): array {
return [
'delete' => ['delete'],
Expand Down

0 comments on commit 27d5300

Please sign in to comment.