From 7c7ac39d04e897a21797db21e2b1568bd14bac70 Mon Sep 17 00:00:00 2001 From: Makarov Pavel Date: Thu, 7 Apr 2022 00:49:41 +0300 Subject: [PATCH 1/3] Exception thrown by ::pop is checked by Illuminate\Database\DetectsLostConnections::causedByLostConnection (#457) It should throw exception with special message in order to end queue on error correctly. Otherwise, it continues to work with broken connection. --- src/Queue/RabbitMQQueue.php | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 2b855fed..4a684a5d 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -13,7 +13,10 @@ use JsonException; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -254,6 +257,16 @@ public function pop($queue = null) } throw $exception; + } catch (AMQPChannelClosedException|AMQPConnectionClosedException $exception) { + // Queue::pop used by worker to receive new job + // Thrown exception is checked by Illuminate\Database\DetectsLostConnections::causedByLostConnection + // Is has to contain one of the several phrases in exception message in order to restart worker + // Otherwise worker continues to work with broken connection + throw new AMQPRuntimeException( + 'Lost connection: '.$exception->getMessage(), + $exception->getCode(), + $exception + ); } return null; From 7ff9188c3f04a1358ae54d85a81085f5e96dfc0e Mon Sep 17 00:00:00 2001 From: Vladimir Yuldashev Date: Wed, 6 Apr 2022 23:51:59 +0200 Subject: [PATCH 2/3] Apply fixes from StyleCI (#473) Co-authored-by: StyleCI Bot --- src/Console/ExchangeDeclareCommand.php | 3 +- src/Console/ExchangeDeleteCommand.php | 3 +- src/Console/QueueBindCommand.php | 3 +- src/Console/QueueDeclareCommand.php | 3 +- src/Console/QueueDeleteCommand.php | 3 +- src/Console/QueuePurgeCommand.php | 3 +- src/Consumer.php | 15 +-- src/Horizon/Listeners/RabbitMQFailedEvent.php | 4 +- src/Horizon/RabbitMQQueue.php | 13 +- src/Queue/Connectors/RabbitMQConnector.php | 17 +-- src/Queue/Jobs/RabbitMQJob.php | 3 +- src/Queue/RabbitMQQueue.php | 112 ++++++++++-------- tests/Functional/TestCase.php | 5 +- 13 files changed, 105 insertions(+), 82 deletions(-) diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 955da0a9..99f0b753 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -18,7 +18,8 @@ class ExchangeDeclareCommand extends Command protected $description = 'Declare exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/ExchangeDeleteCommand.php b/src/Console/ExchangeDeleteCommand.php index af4473a2..904f38ba 100644 --- a/src/Console/ExchangeDeleteCommand.php +++ b/src/Console/ExchangeDeleteCommand.php @@ -16,7 +16,8 @@ class ExchangeDeleteCommand extends Command protected $description = 'Delete exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueBindCommand.php b/src/Console/QueueBindCommand.php index 686a3430..40799460 100644 --- a/src/Console/QueueBindCommand.php +++ b/src/Console/QueueBindCommand.php @@ -17,7 +17,8 @@ class QueueBindCommand extends Command protected $description = 'Bind queue to exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 97742c28..d43f49ce 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -19,7 +19,8 @@ class QueueDeclareCommand extends Command protected $description = 'Declare queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeleteCommand.php b/src/Console/QueueDeleteCommand.php index a1e331e2..8bca7c20 100644 --- a/src/Console/QueueDeleteCommand.php +++ b/src/Console/QueueDeleteCommand.php @@ -17,7 +17,8 @@ class QueueDeleteCommand extends Command protected $description = 'Delete queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueuePurgeCommand.php b/src/Console/QueuePurgeCommand.php index 8b03604a..95b765ea 100644 --- a/src/Console/QueuePurgeCommand.php +++ b/src/Console/QueuePurgeCommand.php @@ -19,7 +19,8 @@ class QueuePurgeCommand extends Command protected $description = 'Purge all messages in queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Consumer.php b/src/Consumer.php index e32b3efc..54438097 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -63,10 +63,11 @@ public function setPrefetchCount(int $value): void /** * Listen to the given queue in a loop. * - * @param string $connectionName - * @param string $queue - * @param WorkerOptions $options + * @param string $connectionName + * @param string $queue + * @param WorkerOptions $options * @return int + * * @throws Throwable */ public function daemon($connectionName, $queue, WorkerOptions $options) @@ -147,7 +148,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $this->exceptions->report($exception); $this->kill(1); - } catch (Exception | Throwable $exception) { + } catch (Exception|Throwable $exception) { $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); @@ -180,9 +181,9 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** * Determine if the daemon should process on this iteration. * - * @param WorkerOptions $options - * @param string $connectionName - * @param string $queue + * @param WorkerOptions $options + * @param string $connectionName + * @param string $queue * @return bool */ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool diff --git a/src/Horizon/Listeners/RabbitMQFailedEvent.php b/src/Horizon/Listeners/RabbitMQFailedEvent.php index c034c341..1edfa551 100644 --- a/src/Horizon/Listeners/RabbitMQFailedEvent.php +++ b/src/Horizon/Listeners/RabbitMQFailedEvent.php @@ -19,7 +19,7 @@ class RabbitMQFailedEvent /** * Create a new listener instance. * - * @param Dispatcher $events + * @param Dispatcher $events * @return void */ public function __construct(Dispatcher $events) @@ -30,7 +30,7 @@ public function __construct(Dispatcher $events) /** * Handle the event. * - * @param LaravelJobFailed $event + * @param LaravelJobFailed $event * @return void */ public function handle(LaravelJobFailed $event): void diff --git a/src/Horizon/RabbitMQQueue.php b/src/Horizon/RabbitMQQueue.php index de577897..a4ffdf24 100644 --- a/src/Horizon/RabbitMQQueue.php +++ b/src/Horizon/RabbitMQQueue.php @@ -25,8 +25,9 @@ class RabbitMQQueue extends BaseRabbitMQQueue /** * Get the number of queue jobs that are ready to process. * - * @param string|null $queue + * @param string|null $queue * @return int + * * @throws AMQPProtocolChannelException */ public function readyNow($queue = null): int @@ -93,9 +94,10 @@ public function release($delay, $job, $data, $queue, $attempts = 0) /** * Fire the job deleted event. * - * @param string $queue - * @param RabbitMQJob $job + * @param string $queue + * @param RabbitMQJob $job * @return void + * * @throws BindingResolutionException */ public function deleteReserved($queue, $job): void @@ -106,9 +108,10 @@ public function deleteReserved($queue, $job): void /** * Fire the given event if a dispatcher is bound. * - * @param string $queue - * @param mixed $event + * @param string $queue + * @param mixed $event * @return void + * * @throws BindingResolutionException */ protected function event($queue, $event): void diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index a6958082..746a410f 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -31,9 +31,9 @@ public function __construct(Dispatcher $dispatcher) /** * Establish a queue connection. * - * @param array $config - * + * @param array $config * @return RabbitMQQueue + * * @throws Exception */ public function connect(array $config): Queue @@ -63,8 +63,9 @@ public function connect(array $config): Queue } /** - * @param array $config + * @param array $config * @return AbstractConnection + * * @throws Exception */ protected function createConnection(array $config): AbstractConnection @@ -84,10 +85,10 @@ protected function createConnection(array $config): AbstractConnection /** * Create a queue for the worker. * - * @param string $worker - * @param AbstractConnection $connection - * @param string $queue - * @param array $options + * @param string $worker + * @param AbstractConnection $connection + * @param string $queue + * @param array $options * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue */ protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = []) @@ -105,7 +106,7 @@ protected function createQueue(string $worker, AbstractConnection $connection, s /** * Recursively filter only null values. * - * @param array $array + * @param array $array * @return array */ private function filter(array $array): array diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 70e9e7f5..4176d80d 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -118,7 +118,8 @@ public function delete(): void /** * Release the job back into the queue. * - * @param int $delay + * @param int $delay + * * @throws AMQPProtocolChannelException */ public function release($delay = 0): void diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 4a684a5d..928979d8 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -82,9 +82,9 @@ class RabbitMQQueue extends Queue implements QueueContract /** * RabbitMQQueue constructor. * - * @param AbstractConnection $connection - * @param string $default - * @param array $options + * @param AbstractConnection $connection + * @param string $default + * @param array $options */ public function __construct( AbstractConnection $connection, @@ -163,9 +163,10 @@ public function later($delay, $job, $data = '', $queue = null) /** * @param $delay * @param $payload - * @param null $queue - * @param int $attempts + * @param null $queue + * @param int $attempts * @return mixed + * * @throws AMQPProtocolChannelException */ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) @@ -204,10 +205,11 @@ public function bulk($jobs, $data = '', $queue = null): void } /** - * @param string $payload - * @param null $queue - * @param array $options + * @param string $payload + * @param null $queue + * @param array $options * @return mixed + * * @throws AMQPProtocolChannelException */ public function bulkRaw(string $payload, $queue = null, array $options = []) @@ -292,6 +294,7 @@ public function getChannel(): AMQPChannel * Job class to use. * * @return string + * * @throws Throwable */ public function getJobClass(): string @@ -310,7 +313,7 @@ public function getJobClass(): string /** * Gets a queue/destination, by default the queue option set on the connection. * - * @param null $queue + * @param null $queue * @return string */ public function getQueue($queue = null): string @@ -322,8 +325,9 @@ public function getQueue($queue = null): string * Checks if the given exchange already present/defined in RabbitMQ. * Returns false when when the exchange is missing. * - * @param string $exchange + * @param string $exchange * @return bool + * * @throws AMQPProtocolChannelException */ public function isExchangeExists(string $exchange): bool @@ -353,11 +357,11 @@ public function isExchangeExists(string $exchange): bool /** * Declare a exchange in rabbitMQ, when not already declared. * - * @param string $name - * @param string $type - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments + * @param string $name + * @param string $type + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments * @return void */ public function declareExchange( @@ -386,9 +390,10 @@ public function declareExchange( /** * Delete a exchange from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $unused + * @param string $name + * @param bool $unused * @return void + * * @throws AMQPProtocolChannelException */ public function deleteExchange(string $name, bool $unused = false): void @@ -410,8 +415,9 @@ public function deleteExchange(string $name, bool $unused = false): void * Checks if the given queue already present/defined in RabbitMQ. * Returns false when when the queue is missing. * - * @param string|null $name + * @param string|null $name * @return bool + * * @throws AMQPProtocolChannelException */ public function isQueueExists(string $name = null): bool @@ -435,10 +441,10 @@ public function isQueueExists(string $name = null): bool /** * Declare a queue in rabbitMQ, when not already declared. * - * @param string $name - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments + * @param string $name + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments * @return void */ public function declareQueue( @@ -465,10 +471,11 @@ public function declareQueue( /** * Delete a queue from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $if_unused - * @param bool $if_empty + * @param string $name + * @param bool $if_unused + * @param bool $if_empty * @return void + * * @throws AMQPProtocolChannelException */ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void @@ -483,9 +490,9 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt /** * Bind a queue to an exchange. * - * @param string $queue - * @param string $exchange - * @param string $routingKey + * @param string $queue + * @param string $exchange + * @param string $routingKey * @return void */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void @@ -504,7 +511,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = /** * Purge the queue of messages. * - * @param string|null $queue + * @param string|null $queue * @return void */ public function purge(string $queue = null): void @@ -518,7 +525,7 @@ public function purge(string $queue = null): void /** * Acknowledge the message. * - * @param RabbitMQJob $job + * @param RabbitMQJob $job * @return void */ public function ack(RabbitMQJob $job): void @@ -529,9 +536,8 @@ public function ack(RabbitMQJob $job): void /** * Reject the message. * - * @param RabbitMQJob $job - * @param bool $requeue - * + * @param RabbitMQJob $job + * @param bool $requeue * @return void */ public function reject(RabbitMQJob $job, bool $requeue = false): void @@ -543,8 +549,9 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void * Create a AMQP message. * * @param $payload - * @param int $attempts + * @param int $attempts * @return array + * * @throws JsonException */ protected function createMessage($payload, int $attempts = 0): array @@ -587,9 +594,9 @@ protected function createMessage($payload, int $attempts = 0): array /** * Create a payload array from the given job and data. * - * @param string|object $job - * @param string $queue - * @param mixed $data + * @param string|object $job + * @param string $queue + * @param mixed $data * @return array */ protected function createPayloadArray($job, $queue, $data = ''): array @@ -613,6 +620,7 @@ protected function getRandomId(): string * Close the connection to RabbitMQ. * * @return void + * * @throws Exception */ public function close(): void @@ -631,7 +639,7 @@ public function close(): void /** * Get the Queue arguments. * - * @param string $destination + * @param string $destination * @return array */ protected function getQueueArguments(string $destination): array @@ -661,8 +669,8 @@ protected function getQueueArguments(string $destination): array /** * Get the Delay queue arguments. * - * @param string $destination - * @param int $ttl + * @param string $destination + * @param int $ttl * @return array */ protected function getDelayQueueArguments(string $destination, int $ttl): array @@ -691,6 +699,7 @@ protected function isPrioritizeDelayed(): bool * Using more priority layers, will consume more CPU resources and would affect runtimes. * * @see https://www.rabbitmq.com/priority.html + * * @return int */ protected function getQueueMaxPriority(): int @@ -701,7 +710,7 @@ protected function getQueueMaxPriority(): int /** * Get the exchange name, or &null; as default value. * - * @param string|null $exchange + * @param string|null $exchange * @return string|null */ protected function getExchange(string $exchange = null): ?string @@ -713,7 +722,7 @@ protected function getExchange(string $exchange = null): ?string * Get the routing-key for when you use exchanges * The default routing-key is the given destination. * - * @param string $destination + * @param string $destination * @return string */ protected function getRoutingKey(string $destination): string @@ -724,7 +733,7 @@ protected function getRoutingKey(string $destination): string /** * Get the exchangeType, or AMQPExchangeType::DIRECT as default. * - * @param string|null $type + * @param string|null $type * @return string */ protected function getExchangeType(?string $type = null): string @@ -758,7 +767,7 @@ protected function isQuorum(): bool /** * Get the exchange for failed messages. * - * @param string|null $exchange + * @param string|null $exchange * @return string|null */ protected function getFailedExchange(string $exchange = null): ?string @@ -770,7 +779,7 @@ protected function getFailedExchange(string $exchange = null): ?string * Get the routing-key for failed messages * The default routing-key is the given destination substituted by '.failed'. * - * @param string $destination + * @param string $destination * @return string */ protected function getFailedRoutingKey(string $destination): string @@ -781,7 +790,7 @@ protected function getFailedRoutingKey(string $destination): string /** * Checks if the exchange was already declared. * - * @param string $name + * @param string $name * @return bool */ protected function isExchangeDeclared(string $name): bool @@ -792,7 +801,7 @@ protected function isExchangeDeclared(string $name): bool /** * Checks if the queue was already declared. * - * @param string $name + * @param string $name * @return bool */ protected function isQueueDeclared(string $name): bool @@ -803,10 +812,11 @@ protected function isQueueDeclared(string $name): bool /** * Declare the destination when necessary. * - * @param string $destination - * @param string|null $exchange - * @param string|null $exchangeType + * @param string $destination + * @param string|null $exchange + * @param string|null $exchangeType * @return void + * * @throws AMQPProtocolChannelException */ protected function declareDestination( @@ -837,7 +847,7 @@ protected function declareDestination( * Determine all publish properties. * * @param $queue - * @param array $options + * @param array $options * @return array */ protected function publishProperties($queue, array $options = []): array diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index ace2c497..40e3e116 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -162,9 +162,10 @@ protected function getEnvironmentSetUp($app): void /** * @param $object - * @param string $method - * @param array $parameters + * @param string $method + * @param array $parameters * @return mixed + * * @throws Exception */ protected function callMethod($object, string $method, array $parameters = []) From 2401f7c1bcc988b3b694bba160feb824d3306e05 Mon Sep 17 00:00:00 2001 From: vyuldashev Date: Wed, 6 Apr 2022 23:52:10 +0200 Subject: [PATCH 3/3] update changelog --- CHANGELOG-12x.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG-12x.md b/CHANGELOG-12x.md index 750c82a0..3887c2d1 100644 --- a/CHANGELOG-12x.md +++ b/CHANGELOG-12x.md @@ -2,7 +2,11 @@ All notable changes to this project will be documented in this file. -## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...master) +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...master) + +## [12.0.1 (2022-04-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...v12.0.1) + +- Allow laravel to end workers with lost connection [#457](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/457) ## [12.0.0 (2022-02-23)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...v12.0.0)