diff --git a/CRM/Core/BAO/UserJob.php b/CRM/Core/BAO/UserJob.php index 0f1c6835002b..f0a650b9fb3c 100644 --- a/CRM/Core/BAO/UserJob.php +++ b/CRM/Core/BAO/UserJob.php @@ -38,7 +38,7 @@ public static function on_civi_queue_check(\Civi\Core\Event\GenericHookEvent $e) /** @var \CRM_Queue_Queue $queue */ $queue = $e->queue; $userJobId = static::findUserJobId($queue->getName()); - if ($userJobId && $queue->numberOfItems() < 1) { + if ($userJobId && $queue->getStatistic('total') < 1) { $queue->setStatus('completed'); } } diff --git a/CRM/Queue/Queue.php b/CRM/Queue/Queue.php index 9ff7d9b10c72..7dfc1cdd6a21 100644 --- a/CRM/Queue/Queue.php +++ b/CRM/Queue/Queue.php @@ -138,8 +138,28 @@ abstract public function createItem($data, $options = []); * Determine number of items remaining in the queue. * * @return int - */ - abstract public function numberOfItems(); + * @deprecated + * Use `getStatistic(string $name)` instead. + * The definition of `numberOfItems()` has become conflicted among different subclasses. + */ + public function numberOfItems() { + // This is the statistic traditionally reported by core queue implementations. + // However, it may not be as useful, and subclasses may have different interpretations. + return $this->getStatistic('total'); + } + + /** + * Get summary information about items in the queue. + * + * @param string $name + * The desired statistic. Ex: + * - 'ready': The number of items ready for execution (not currently claimed, not scheduled for future). + * - 'blocked': The number of items that may be runnable in the future, but cannot be run right now. + * - 'total': The total number of items known to the queue, regardless of whether their current status. + * @return int|float|null + * The value of the statistic, or NULL if the queue backend does not unsupport this statistic. + */ + abstract public function getStatistic(string $name); /** * Get the next item. diff --git a/CRM/Queue/Queue/Memory.php b/CRM/Queue/Queue/Memory.php index 7fd1ab6ae211..e66791a5b8a7 100644 --- a/CRM/Queue/Queue/Memory.php +++ b/CRM/Queue/Queue/Memory.php @@ -110,12 +110,35 @@ public function createItem($data, $options = []) { } /** - * Determine number of items remaining in the queue. - * - * @return int + * @param string $name + * @return int|float|null + * @see \CRM_Queue_Queue::getStatistic() */ - public function numberOfItems() { - return count($this->items); + public function getStatistic(string $name) { + $ready = function(): int { + $now = CRM_Utils_Time::time(); + $ready = 0; + foreach ($this->items as $id => $item) { + if (empty($this->releaseTimes[$id]) || $this->releaseTimes[$id] <= $now) { + $ready++; + } + } + return $ready; + }; + + switch ($name) { + case 'ready': + return $ready(); + + case 'blocked': + return count($this->items) - $ready(); + + case 'total': + return count($this->items); + + default: + return NULL; + } } /** diff --git a/CRM/Queue/Queue/SqlTrait.php b/CRM/Queue/Queue/SqlTrait.php index 886eafa0c323..69f5cf220689 100644 --- a/CRM/Queue/Queue/SqlTrait.php +++ b/CRM/Queue/Queue/SqlTrait.php @@ -47,22 +47,34 @@ public function deleteQueue() { * @return bool */ public function existsQueue() { - return ($this->numberOfItems() > 0); + return ($this->getStatistic('total') > 0); } /** - * Determine number of items remaining in the queue. - * - * @return int + * @param string $name + * @return int|float|null + * @see \CRM_Queue_Queue::getStatistic() */ - public function numberOfItems() { - return CRM_Core_DAO::singleValueQuery(" - SELECT count(*) - FROM civicrm_queue_item - WHERE queue_name = %1 - ", [ - 1 => [$this->getName(), 'String'], - ]); + public function getStatistic(string $name) { + switch ($name) { + case 'ready': + return (int) CRM_Core_DAO::singleValueQuery( + 'SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1 AND (release_time is null OR release_time <= FROM_UNIXTIME(%2))', + [1 => [$this->getName(), 'String'], 2 => [CRM_Utils_Time::time(), 'Int']]); + + case 'blocked': + return (int) CRM_Core_DAO::singleValueQuery( + 'SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1 AND release_time > FROM_UNIXTIME(%2)', + [1 => [$this->getName(), 'String'], 2 => [CRM_Utils_Time::time(), 'Int']]); + + case 'total': + return (int) CRM_Core_DAO::singleValueQuery( + 'SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1', + [1 => [$this->getName(), 'String']]); + + default: + return NULL; + } } /** diff --git a/Civi/Test/QueueTestTrait.php b/Civi/Test/QueueTestTrait.php new file mode 100644 index 000000000000..6b0290e4f7d1 --- /dev/null +++ b/Civi/Test/QueueTestTrait.php @@ -0,0 +1,20 @@ +getStatistic('total'), $queue->getStatistic('ready'), $queue->getStatistic('blocked')]; + $this->assertEquals(sprintf($format, ...$expect), sprintf($format, ...$actual)); + + // Deprecated - but checking for continuity. + $this->assertEquals($total, $queue->numberOfItems()); + } + +} diff --git a/templates/CRM/Queue/Page/Runner.tpl b/templates/CRM/Queue/Page/Runner.tpl index e7ade413415e..620b834c0e47 100644 --- a/templates/CRM/Queue/Page/Runner.tpl +++ b/templates/CRM/Queue/Page/Runner.tpl @@ -63,7 +63,7 @@ CRM.$(function($) { if (!data.is_error) { queueRunnerData.completed++; } - if (data.numberOfItems) { + if ('numberOfItems' in data && data.numberOfItems !== null) { queueRunnerData.numberOfItems = parseInt(data.numberOfItems); } diff --git a/tests/phpunit/CRM/Queue/Queue/SqlTest.php b/tests/phpunit/CRM/Queue/Queue/SqlTest.php index 3fb7e3965232..6de10ed62213 100644 --- a/tests/phpunit/CRM/Queue/Queue/SqlTest.php +++ b/tests/phpunit/CRM/Queue/Queue/SqlTest.php @@ -18,6 +18,8 @@ */ class CRM_Queue_Queue_SqlTest extends CiviUnitTestCase { + use \Civi\Test\QueueTestTrait; + /* ----------------------- Queue providers ----------------------- */ /* Define a list of queue providers which should be tested */ @@ -73,12 +75,12 @@ public function testPriorities($queueSpec) { 'test-key' => 'c', ]); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); $this->queue->deleteItem($item); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('b', $item->data['test-key']); $this->queue->deleteItem($item); @@ -103,27 +105,27 @@ public function testPriorities($queueSpec) { 'test-key' => 'd', ]); - $this->assertEquals(4, $this->queue->numberOfItems()); + $this->assertQueueStats(4, 4, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('start', $item->data['test-key']); $this->queue->deleteItem($item); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('c', $item->data['test-key']); $this->queue->deleteItem($item); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('d', $item->data['test-key']); $this->queue->deleteItem($item); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('end', $item->data['test-key']); $this->queue->deleteItem($item); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } } diff --git a/tests/phpunit/CRM/Queue/QueueTest.php b/tests/phpunit/CRM/Queue/QueueTest.php index e3fe0010aa7f..3f1c6ac5b41c 100644 --- a/tests/phpunit/CRM/Queue/QueueTest.php +++ b/tests/phpunit/CRM/Queue/QueueTest.php @@ -16,6 +16,8 @@ */ class CRM_Queue_QueueTest extends CiviUnitTestCase { + use \Civi\Test\QueueTestTrait; + /* ----------------------- Queue providers ----------------------- */ /* Define a list of queue providers which should be tested */ @@ -25,19 +27,19 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { */ public function getQueueSpecs() { $queueSpecs = []; - $queueSpecs[] = [ + $queueSpecs['Sql'] = [ [ 'type' => 'Sql', 'name' => 'test-queue-sql', ], ]; - $queueSpecs[] = [ + $queueSpecs['Memory'] = [ [ 'type' => 'Memory', 'name' => 'test-queue-mem', ], ]; - $queueSpecs[] = [ + $queueSpecs['SqlParallel'] = [ [ 'type' => 'SqlParallel', 'name' => 'test-queue-sqlparallel', @@ -163,35 +165,41 @@ public function testBasicUsage($queueSpec) { 'test-key' => 'c', ]); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); + $item = $this->queue->claimItem(); + $this->assertQueueStats(3, 2, 1, $this->queue); $this->assertEquals('a', $item->data['test-key']); $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); $item = $this->queue->claimItem(); + $this->assertQueueStats(2, 1, 1, $this->queue); $this->assertEquals('b', $item->data['test-key']); $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); + $this->assertQueueStats(1, 1, 0, $this->queue); $this->queue->createItem([ 'test-key' => 'd', ]); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); + $item = $this->queue->claimItem(); + $this->assertQueueStats(2, 1, 1, $this->queue); $this->assertEquals('c', $item->data['test-key']); $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('d', $item->data['test-key']); $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -211,16 +219,16 @@ public function testManualRelease($queueSpec) { $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); $this->assertEquals(1, $item->run_count); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); $this->queue->releaseItem($item); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); $this->assertEquals(2, $item->run_count); $this->queue->deleteItem($item); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -238,11 +246,12 @@ public function testTimeoutRelease($queueSpec) { $this->queue->createItem([ 'test-key' => 'a', ]); + $this->assertQueueStats(1, 1, 0, $this->queue); $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); $this->assertEquals(1, $item->run_count); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); // forget to release // haven't reach expiration yet @@ -255,10 +264,10 @@ public function testTimeoutRelease($queueSpec) { $item3 = $this->queue->claimItem(); $this->assertEquals('a', $item3->data['test-key']); $this->assertEquals(2, $item3->run_count); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); $this->queue->deleteItem($item3); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -279,7 +288,7 @@ public function testStealItem($queueSpec) { $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); $this->assertEquals(1, $item->run_count); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); // forget to release // haven't reached expiration yet, so claimItem fails @@ -291,10 +300,10 @@ public function testStealItem($queueSpec) { $item3 = $this->queue->stealItem(); $this->assertEquals('a', $item3->data['test-key']); $this->assertEquals(2, $item3->run_count); - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); $this->queue->deleteItem($item3); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -385,13 +394,13 @@ public function testCreateResetTrue($queueSpec) { $this->queue->createItem([ 'test-key' => 'b', ]); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); unset($this->queue); $queue2 = $this->queueService->create( $queueSpec + ['reset' => TRUE] ); - $this->assertEquals(0, $queue2->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue2); } /** @@ -408,11 +417,11 @@ public function testCreateResetFalse($queueSpec) { $this->queue->createItem([ 'test-key' => 'b', ]); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); unset($this->queue); $queue2 = $this->queueService->create($queueSpec); - $this->assertEquals(2, $queue2->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $queue2); $item = $queue2->claimItem(); $this->assertEquals('a', $item->data['test-key']); @@ -433,11 +442,11 @@ public function testLoad($queueSpec) { $this->queue->createItem([ 'test-key' => 'b', ]); - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); unset($this->queue); $queue2 = $this->queueService->create($queueSpec); - $this->assertEquals(2, $queue2->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $queue2); $item = $queue2->claimItem(); $this->assertEquals('a', $item->data['test-key']); @@ -460,12 +469,12 @@ public function testBatchClaim($queueSpec) { for ($i = 0; $i < 9; $i++) { $this->queue->createItem('x' . $i); } - $this->assertEquals(9, $this->queue->numberOfItems()); + $this->assertQueueStats(9, 9, 0, $this->queue); // We expect this driver to be fully compliant with batching. $claimsA = $this->queue->claimItems(3); $claimsB = $this->queue->claimItems(3); - $this->assertEquals(9, $this->queue->numberOfItems()); + $this->assertQueueStats(9, 3, 6, $this->queue); $this->assertEquals(['x0', 'x1', 'x2'], CRM_Utils_Array::collect('data', $claimsA)); $this->assertEquals(['x3', 'x4', 'x5'], CRM_Utils_Array::collect('data', $claimsB)); @@ -474,24 +483,24 @@ public function testBatchClaim($queueSpec) { $this->queue->releaseItems([$claimsA[2]]); /* x2: will retry with next claimItems() */ $this->queue->deleteItems([$claimsB[0], $claimsB[1]]); /* x3, x4 */ /* claimsB[2]: x5: Oops, we're gonna take some time to finish this one. */ - $this->assertEquals(5, $this->queue->numberOfItems()); + $this->assertQueueStats(5, 4, 1, $this->queue); $claimsC = $this->queue->claimItems(3); $this->assertEquals(['x2', 'x6', 'x7'], CRM_Utils_Array::collect('data', $claimsC)); $this->queue->deleteItem($claimsC[0]); /* x2 */ $this->queue->releaseItem($claimsC[1]); /* x6: will retry with next claimItems() */ $this->queue->deleteItem($claimsC[2]); /* x7 */ - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 2, 1, $this->queue); $claimsD = $this->queue->claimItems(3); $this->assertEquals(['x6', 'x8'], CRM_Utils_Array::collect('data', $claimsD)); $this->queue->deleteItem($claimsD[0]); /* x6 */ $this->queue->deleteItem($claimsD[1]); /* x8 */ - $this->assertEquals(1, $this->queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $this->queue); // claimsB took a while to wrap-up. But it finally did! $this->queue->deleteItem($claimsB[2]); /* x5 */ - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } public function testSetStatus() { diff --git a/tests/phpunit/CRM/Queue/RunnerTest.php b/tests/phpunit/CRM/Queue/RunnerTest.php index 8581edd14ef3..a179fa9d51b9 100644 --- a/tests/phpunit/CRM/Queue/RunnerTest.php +++ b/tests/phpunit/CRM/Queue/RunnerTest.php @@ -16,6 +16,8 @@ */ class CRM_Queue_RunnerTest extends CiviUnitTestCase { + use \Civi\Test\QueueTestTrait; + public function setUp(): void { parent::setUp(); $this->queueService = CRM_Queue_Service::singleton(TRUE); @@ -61,11 +63,11 @@ public function testRunAllNormal() { 'errorMode' => CRM_Queue_Runner::ERROR_ABORT, ]); $this->assertEquals(self::$_recordedValues, []); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); $result = $runner->runAll(); $this->assertEquals(TRUE, $result); $this->assertEquals(self::$_recordedValues, ['a', 'b', 'c']); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -97,11 +99,11 @@ public function testRunAll_AddMore() { 'errorMode' => CRM_Queue_Runner::ERROR_ABORT, ]); $this->assertEquals(self::$_recordedValues, []); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); $result = $runner->runAll(); $this->assertEquals(TRUE, $result); $this->assertEquals(self::$_recordedValues, ['a', 1, 2, 3, 'b']); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -132,12 +134,13 @@ public function testRunAll_Continue_Exception() { 'errorMode' => CRM_Queue_Runner::ERROR_CONTINUE, ]); $this->assertEquals(self::$_recordedValues, []); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); + $result = $runner->runAll(); // FIXME useless return $this->assertEquals(TRUE, $result); $this->assertEquals(self::$_recordedValues, ['a', 'c']); - $this->assertEquals(0, $this->queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $this->queue); } /** @@ -168,13 +171,14 @@ public function testRunAll_Abort_Exception() { 'errorMode' => CRM_Queue_Runner::ERROR_ABORT, ]); $this->assertEquals(self::$_recordedValues, []); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); + $result = $runner->runAll(); $this->assertEquals(1, $result['is_error']); // nothing from 'c' $this->assertEquals(self::$_recordedValues, ['a']); // 'b' and 'c' remain - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); } /** @@ -205,13 +209,13 @@ public function testRunAll_Abort_False() { 'errorMode' => CRM_Queue_Runner::ERROR_ABORT, ]); $this->assertEquals(self::$_recordedValues, []); - $this->assertEquals(3, $this->queue->numberOfItems()); + $this->assertQueueStats(3, 3, 0, $this->queue); $result = $runner->runAll(); $this->assertEquals(1, $result['is_error']); // nothing from 'c' $this->assertEquals(self::$_recordedValues, ['a']); // 'b' and 'c' remain - $this->assertEquals(2, $this->queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $this->queue); } /** diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php index 6f74de2eccc8..3656eb1ff8f5 100644 --- a/tests/phpunit/api/v4/Entity/QueueTest.php +++ b/tests/phpunit/api/v4/Entity/QueueTest.php @@ -22,6 +22,7 @@ use Civi\Api4\Queue; use Civi\Api4\UserJob; use Civi\Core\Event\GenericHookEvent; +use Civi\Test\QueueTestTrait; /** * @group headless @@ -29,6 +30,8 @@ */ class QueueTest extends Api4TestBase { + use QueueTestTrait; + protected function setUp(): void { \Civi::$statics[__CLASS__] = [ 'doSomethingResult' => TRUE, @@ -56,7 +59,7 @@ public function testBasicLinearPolling() { 'retry_limit' => 2, 'retry_interval' => 4, ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], @@ -98,13 +101,13 @@ public function testBasicLinearPolling() { $this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); // All done. - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); } public function testBasicParallelPolling() { $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel'; $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], @@ -129,7 +132,7 @@ public function testBasicParallelPolling() { Queue::runItems(0)->setItems([$first])->execute(); $this->assertEquals(['second_ok', 'first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); } /** @@ -149,7 +152,7 @@ public function testBatchParallelPolling() { 'error' => 'delete', 'batch_limit' => 3, ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); for ($i = 0; $i < 7; $i++) { \Civi::queue($queueName)->createItem(['thingy' => $i]); @@ -189,7 +192,7 @@ function($item) { public function testSelect() { $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel'; $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], @@ -205,7 +208,7 @@ public function testSelect() { public function testEmptyPoll() { $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear'; $queue = \Civi::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $startResult = Queue::claimItems()->setQueue($queueName)->execute(); $this->assertEquals(0, $startResult->count()); @@ -225,14 +228,14 @@ public function getDelayableDrivers(): array { public function testDelayedStart(array $queueSpec) { $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_delayed'; $queue = \Civi::queue($queueName, $queueSpec); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $releaseTime = \CRM_Utils_Time::strtotime('+3 seconds'); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], ['itwillstartanymomentnow'] ), ['release_time' => $releaseTime]); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $queue); // Not available... yet... $claim1 = $queue->claimItem(); @@ -268,7 +271,7 @@ public function testRetryWithPoliteExhaustion(string $errorMode) { 'retry_limit' => 2, 'retry_interval' => 1, ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], @@ -311,28 +314,28 @@ public function testRetryWithDelinquencyAndSuccess() { 'retry_interval' => 0, 'lease_time' => 1, ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], ['playinghooky'] )); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $queue); $claim1 = $this->waitForClaim(0.5, 5, $queueName); // Oops, don't do anything with claim #1! - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $queue); $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']); $claim2 = $this->waitForClaim(0.5, 5, $queueName); // Oops, don't do anything with claim #2! - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $queue); $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']); $claim3 = $this->waitForClaim(0.5, 5, $queueName); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $queue); $result = Queue::runItems(0)->setItems([$claim3])->execute()->first(); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $this->assertEquals(['playinghooky_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']); $this->assertEquals('ok', $result['outcome']); } @@ -357,19 +360,19 @@ public function testRetryWithEventualFailure(string $errorMode) { 'retry_interval' => 0, 'lease_time' => 1, ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], ['playinghooky'] )); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $queue); $claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) { $claim = $this->waitForClaim(0.5, 5, $queueName); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 0, 1, $queue); $result = Queue::runItems(0)->setItems([$claim])->execute()->first(); - $this->assertEquals($expectEndCount, $queue->numberOfItems()); + $this->assertEquals($expectEndCount, $queue->getStatistic('total')); $this->assertEquals($expectOutcome, $result['outcome']); }; @@ -411,7 +414,7 @@ public function testUserJobQueue_Completion() { 'runner' => 'task', 'error' => 'delete', ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $userJob = \Civi\Api4\UserJob::create(FALSE)->setValues([ 'job_type:name' => 'contact_import', @@ -429,20 +432,20 @@ public function testUserJobQueue_Completion() { )); // Verify initial status - $this->assertEquals(2, $queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $queue); $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); $this->assertEquals(TRUE, $queue->isActive()); $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); // OK, let's run both items - and check status afterward. Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $queue); $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); $this->assertEquals(TRUE, $queue->isActive()); $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $this->assertEquals('completed', $firedQueueStatus[$queueName]); $this->assertEquals(FALSE, $queue->isActive()); $this->assertEquals(1, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']); @@ -468,7 +471,7 @@ public function testServiceQueue_NeverComplete() { 'runner' => 'task', 'error' => 'delete', ]); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); \Civi::queue($queueName)->createItem(new \CRM_Queue_Task( [QueueTest::class, 'doSomething'], @@ -480,18 +483,18 @@ public function testServiceQueue_NeverComplete() { )); // Verify initial status - $this->assertEquals(2, $queue->numberOfItems()); + $this->assertQueueStats(2, 2, 0, $queue); $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); $this->assertEquals(TRUE, $queue->isActive()); // OK, let's run both items - and check status afterward. Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); - $this->assertEquals(1, $queue->numberOfItems()); + $this->assertQueueStats(1, 1, 0, $queue); $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); $this->assertEquals(TRUE, $queue->isActive()); Queue::runItems(FALSE)->setQueue($queueName)->execute()->single(); - $this->assertEquals(0, $queue->numberOfItems()); + $this->assertQueueStats(0, 0, 0, $queue); $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName])); $this->assertEquals(TRUE, $queue->isActive()); }