Skip to content

Commit

Permalink
(REF) CRM_Queue_Queue_Sql* - Extract trait for common functions
Browse files Browse the repository at this point in the history
Most of the methods in CRM/Queue/Queue/Sql.php and CRM/Queue/Queue/SqlParallel.php are
identical. Move them to a common trait so that they're easier to patch.
  • Loading branch information
totten committed Feb 2, 2022
1 parent 39384a9 commit b28ab68
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 127 deletions.
62 changes: 2 additions & 60 deletions CRM/Queue/Queue/Sql.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class CRM_Queue_Queue_Sql extends CRM_Queue_Queue {

use CRM_Queue_Queue_SqlTrait;

/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
Expand All @@ -32,41 +34,6 @@ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Add a new item to the queue.
*
Expand All @@ -85,21 +52,6 @@ public function createItem($data, $options = []) {
$dao->save();
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Get the next item.
*
Expand Down Expand Up @@ -185,16 +137,6 @@ public function stealItem($lease_time = 3600) {
}
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
}

/**
* Return an item that could not be processed.
*
Expand Down
69 changes: 2 additions & 67 deletions CRM/Queue/Queue/SqlParallel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {

use CRM_Queue_Queue_SqlTrait;

/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
Expand All @@ -32,32 +34,6 @@ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
Expand Down Expand Up @@ -85,21 +61,6 @@ public function createItem($data, $options = []) {
$dao->save();
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Get the next item.
*
Expand Down Expand Up @@ -182,30 +143,4 @@ public function stealItem($lease_time = 3600) {
}
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
$dao->free();
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
$dao->free();
}

}
114 changes: 114 additions & 0 deletions CRM/Queue/Queue/SqlTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

/*
+--------------------------------------------------------------------+
| Copyright CiviCRM LLC. All rights reserved. |
| |
| This work is published under the GNU AGPLv3 license with some |
| permitted exceptions and without any warranty. For full license |
| and copyright information, see https://civicrm.org/licensing |
+--------------------------------------------------------------------+
*/

/**
* Trait defines methods that are commonly used to implement a SQL-backed queue.
*/
trait CRM_Queue_Queue_SqlTrait {

/**
* Perform any registation or resource-allocation for a new queue
*/
public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Perform any loading or pre-fetch for an existing queue.
*/
public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}

/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Check if the queue exists.
*
* @return bool
*/
public function existsQueue() {
return ($this->numberOfItems() > 0);
}

/**
* Determine number of items remaining in the queue.
*
* @return int
*/
public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", [
1 => [$this->getName(), 'String'],
]);
}

/**
* Remove an item from the queue.
*
* @param CRM_Core_DAO|stdClass $dao
* The item returned by claimItem.
*/
public function deleteItem($dao) {
$dao->delete();
$dao->free();
}

/**
* Get the full data for an item.
*
* This is a passive peek - it does not claim/steal/release anything.
*
* @param int|string $id
* The unique ID of the task within the queue.
* @return CRM_Queue_DAO_QueueItem|object|null $dao
*/
public function fetchItem($id) {
$dao = new CRM_Queue_DAO_QueueItem();
$dao->id = $id;
$dao->queue_name = $this->getName();
if (!$dao->find(TRUE)) {
return NULL;
}
$dao->data = unserialize($dao->data);
return $dao;
}

/**
* Return an item that could not be processed.
*
* @param CRM_Core_DAO $dao
* The item returned by claimItem.
*/
public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = [
1 => [$dao->id, 'Integer'],
];
CRM_Core_DAO::executeQuery($sql, $params);
$dao->free();
}

}

0 comments on commit b28ab68

Please sign in to comment.