diff --git a/CHANGELOG-12x.md b/CHANGELOG-12x.md index 750c82a0..3887c2d1 100644 --- a/CHANGELOG-12x.md +++ b/CHANGELOG-12x.md @@ -2,7 +2,11 @@ All notable changes to this project will be documented in this file. -## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...master) +## [Unreleased](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.1...master) + +## [12.0.1 (2022-04-06)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v12.0.0...v12.0.1) + +- Allow laravel to end workers with lost connection [#457](https://github.com/vyuldashev/laravel-queue-rabbitmq/pull/457) ## [12.0.0 (2022-02-23)](https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/v11.4.0...v12.0.0) diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 955da0a9..99f0b753 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -18,7 +18,8 @@ class ExchangeDeclareCommand extends Command protected $description = 'Declare exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/ExchangeDeleteCommand.php b/src/Console/ExchangeDeleteCommand.php index af4473a2..904f38ba 100644 --- a/src/Console/ExchangeDeleteCommand.php +++ b/src/Console/ExchangeDeleteCommand.php @@ -16,7 +16,8 @@ class ExchangeDeleteCommand extends Command protected $description = 'Delete exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueBindCommand.php b/src/Console/QueueBindCommand.php index 686a3430..40799460 100644 --- a/src/Console/QueueBindCommand.php +++ b/src/Console/QueueBindCommand.php @@ -17,7 +17,8 @@ class QueueBindCommand extends Command protected $description = 'Bind queue to exchange'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeclareCommand.php b/src/Console/QueueDeclareCommand.php index 97742c28..d43f49ce 100644 --- a/src/Console/QueueDeclareCommand.php +++ b/src/Console/QueueDeclareCommand.php @@ -19,7 +19,8 @@ class QueueDeclareCommand extends Command protected $description = 'Declare queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueueDeleteCommand.php b/src/Console/QueueDeleteCommand.php index a1e331e2..8bca7c20 100644 --- a/src/Console/QueueDeleteCommand.php +++ b/src/Console/QueueDeleteCommand.php @@ -17,7 +17,8 @@ class QueueDeleteCommand extends Command protected $description = 'Delete queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Console/QueuePurgeCommand.php b/src/Console/QueuePurgeCommand.php index 8b03604a..95b765ea 100644 --- a/src/Console/QueuePurgeCommand.php +++ b/src/Console/QueuePurgeCommand.php @@ -19,7 +19,8 @@ class QueuePurgeCommand extends Command protected $description = 'Purge all messages in queue'; /** - * @param RabbitMQConnector $connector + * @param RabbitMQConnector $connector + * * @throws Exception */ public function handle(RabbitMQConnector $connector): void diff --git a/src/Consumer.php b/src/Consumer.php index e32b3efc..54438097 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -63,10 +63,11 @@ public function setPrefetchCount(int $value): void /** * Listen to the given queue in a loop. * - * @param string $connectionName - * @param string $queue - * @param WorkerOptions $options + * @param string $connectionName + * @param string $queue + * @param WorkerOptions $options * @return int + * * @throws Throwable */ public function daemon($connectionName, $queue, WorkerOptions $options) @@ -147,7 +148,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu $this->exceptions->report($exception); $this->kill(1); - } catch (Exception | Throwable $exception) { + } catch (Exception|Throwable $exception) { $this->exceptions->report($exception); $this->stopWorkerIfLostConnection($exception); @@ -180,9 +181,9 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu /** * Determine if the daemon should process on this iteration. * - * @param WorkerOptions $options - * @param string $connectionName - * @param string $queue + * @param WorkerOptions $options + * @param string $connectionName + * @param string $queue * @return bool */ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool diff --git a/src/Horizon/Listeners/RabbitMQFailedEvent.php b/src/Horizon/Listeners/RabbitMQFailedEvent.php index c034c341..1edfa551 100644 --- a/src/Horizon/Listeners/RabbitMQFailedEvent.php +++ b/src/Horizon/Listeners/RabbitMQFailedEvent.php @@ -19,7 +19,7 @@ class RabbitMQFailedEvent /** * Create a new listener instance. * - * @param Dispatcher $events + * @param Dispatcher $events * @return void */ public function __construct(Dispatcher $events) @@ -30,7 +30,7 @@ public function __construct(Dispatcher $events) /** * Handle the event. * - * @param LaravelJobFailed $event + * @param LaravelJobFailed $event * @return void */ public function handle(LaravelJobFailed $event): void diff --git a/src/Horizon/RabbitMQQueue.php b/src/Horizon/RabbitMQQueue.php index de577897..a4ffdf24 100644 --- a/src/Horizon/RabbitMQQueue.php +++ b/src/Horizon/RabbitMQQueue.php @@ -25,8 +25,9 @@ class RabbitMQQueue extends BaseRabbitMQQueue /** * Get the number of queue jobs that are ready to process. * - * @param string|null $queue + * @param string|null $queue * @return int + * * @throws AMQPProtocolChannelException */ public function readyNow($queue = null): int @@ -93,9 +94,10 @@ public function release($delay, $job, $data, $queue, $attempts = 0) /** * Fire the job deleted event. * - * @param string $queue - * @param RabbitMQJob $job + * @param string $queue + * @param RabbitMQJob $job * @return void + * * @throws BindingResolutionException */ public function deleteReserved($queue, $job): void @@ -106,9 +108,10 @@ public function deleteReserved($queue, $job): void /** * Fire the given event if a dispatcher is bound. * - * @param string $queue - * @param mixed $event + * @param string $queue + * @param mixed $event * @return void + * * @throws BindingResolutionException */ protected function event($queue, $event): void diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 0f50164a..cded1f70 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -31,9 +31,9 @@ public function __construct(Dispatcher $dispatcher) /** * Establish a queue connection. * - * @param array $config - * + * @param array $config * @return RabbitMQQueue + * * @throws Exception */ public function connect(array $config): Queue @@ -64,8 +64,9 @@ public function connect(array $config): Queue } /** - * @param array $config + * @param array $config * @return AbstractConnection + * * @throws Exception */ protected function createConnection(array $config): AbstractConnection @@ -112,7 +113,7 @@ protected function createQueue( /** * Recursively filter only null values. * - * @param array $array + * @param array $array * @return array */ private function filter(array $array): array diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 70e9e7f5..4176d80d 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -118,7 +118,8 @@ public function delete(): void /** * Release the job back into the queue. * - * @param int $delay + * @param int $delay + * * @throws AMQPProtocolChannelException */ public function release($delay = 0): void diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index ed32bdf7..a6a0a89e 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -13,7 +13,10 @@ use JsonException; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -175,9 +178,10 @@ function ($payload, $queue, $delay) { /** * @param $delay * @param $payload - * @param null $queue - * @param int $attempts + * @param null $queue + * @param int $attempts * @return mixed + * * @throws AMQPProtocolChannelException */ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) @@ -216,10 +220,11 @@ public function bulk($jobs, $data = '', $queue = null): void } /** - * @param string $payload - * @param null $queue - * @param array $options + * @param string $payload + * @param null $queue + * @param array $options * @return mixed + * * @throws AMQPProtocolChannelException */ public function bulkRaw(string $payload, $queue = null, array $options = []) @@ -269,6 +274,16 @@ public function pop($queue = null) } throw $exception; + } catch (AMQPChannelClosedException|AMQPConnectionClosedException $exception) { + // Queue::pop used by worker to receive new job + // Thrown exception is checked by Illuminate\Database\DetectsLostConnections::causedByLostConnection + // Is has to contain one of the several phrases in exception message in order to restart worker + // Otherwise worker continues to work with broken connection + throw new AMQPRuntimeException( + 'Lost connection: '.$exception->getMessage(), + $exception->getCode(), + $exception + ); } return null; @@ -294,6 +309,7 @@ public function getChannel(): AMQPChannel * Job class to use. * * @return string + * * @throws Throwable */ public function getJobClass(): string @@ -312,7 +328,7 @@ public function getJobClass(): string /** * Gets a queue/destination, by default the queue option set on the connection. * - * @param null $queue + * @param null $queue * @return string */ public function getQueue($queue = null): string @@ -324,8 +340,9 @@ public function getQueue($queue = null): string * Checks if the given exchange already present/defined in RabbitMQ. * Returns false when when the exchange is missing. * - * @param string $exchange + * @param string $exchange * @return bool + * * @throws AMQPProtocolChannelException */ public function isExchangeExists(string $exchange): bool @@ -355,11 +372,11 @@ public function isExchangeExists(string $exchange): bool /** * Declare a exchange in rabbitMQ, when not already declared. * - * @param string $name - * @param string $type - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments + * @param string $name + * @param string $type + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments * @return void */ public function declareExchange( @@ -388,9 +405,10 @@ public function declareExchange( /** * Delete a exchange from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $unused + * @param string $name + * @param bool $unused * @return void + * * @throws AMQPProtocolChannelException */ public function deleteExchange(string $name, bool $unused = false): void @@ -412,8 +430,9 @@ public function deleteExchange(string $name, bool $unused = false): void * Checks if the given queue already present/defined in RabbitMQ. * Returns false when when the queue is missing. * - * @param string|null $name + * @param string|null $name * @return bool + * * @throws AMQPProtocolChannelException */ public function isQueueExists(string $name = null): bool @@ -437,10 +456,10 @@ public function isQueueExists(string $name = null): bool /** * Declare a queue in rabbitMQ, when not already declared. * - * @param string $name - * @param bool $durable - * @param bool $autoDelete - * @param array $arguments + * @param string $name + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments * @return void */ public function declareQueue( @@ -467,10 +486,11 @@ public function declareQueue( /** * Delete a queue from rabbitMQ, only when present in RabbitMQ. * - * @param string $name - * @param bool $if_unused - * @param bool $if_empty + * @param string $name + * @param bool $if_unused + * @param bool $if_empty * @return void + * * @throws AMQPProtocolChannelException */ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empty = false): void @@ -485,9 +505,9 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt /** * Bind a queue to an exchange. * - * @param string $queue - * @param string $exchange - * @param string $routingKey + * @param string $queue + * @param string $exchange + * @param string $routingKey * @return void */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void @@ -506,7 +526,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = /** * Purge the queue of messages. * - * @param string|null $queue + * @param string|null $queue * @return void */ public function purge(string $queue = null): void @@ -520,7 +540,7 @@ public function purge(string $queue = null): void /** * Acknowledge the message. * - * @param RabbitMQJob $job + * @param RabbitMQJob $job * @return void */ public function ack(RabbitMQJob $job): void @@ -531,9 +551,8 @@ public function ack(RabbitMQJob $job): void /** * Reject the message. * - * @param RabbitMQJob $job - * @param bool $requeue - * + * @param RabbitMQJob $job + * @param bool $requeue * @return void */ public function reject(RabbitMQJob $job, bool $requeue = false): void @@ -545,8 +564,9 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void * Create a AMQP message. * * @param $payload - * @param int $attempts + * @param int $attempts * @return array + * * @throws JsonException */ protected function createMessage($payload, int $attempts = 0): array @@ -589,9 +609,9 @@ protected function createMessage($payload, int $attempts = 0): array /** * Create a payload array from the given job and data. * - * @param string|object $job - * @param string $queue - * @param mixed $data + * @param string|object $job + * @param string $queue + * @param mixed $data * @return array */ protected function createPayloadArray($job, $queue, $data = ''): array @@ -615,6 +635,7 @@ protected function getRandomId(): string * Close the connection to RabbitMQ. * * @return void + * * @throws Exception */ public function close(): void @@ -633,7 +654,7 @@ public function close(): void /** * Get the Queue arguments. * - * @param string $destination + * @param string $destination * @return array */ protected function getQueueArguments(string $destination): array @@ -663,8 +684,8 @@ protected function getQueueArguments(string $destination): array /** * Get the Delay queue arguments. * - * @param string $destination - * @param int $ttl + * @param string $destination + * @param int $ttl * @return array */ protected function getDelayQueueArguments(string $destination, int $ttl): array @@ -693,6 +714,7 @@ protected function isPrioritizeDelayed(): bool * Using more priority layers, will consume more CPU resources and would affect runtimes. * * @see https://www.rabbitmq.com/priority.html + * * @return int */ protected function getQueueMaxPriority(): int @@ -703,7 +725,7 @@ protected function getQueueMaxPriority(): int /** * Get the exchange name, or &null; as default value. * - * @param string|null $exchange + * @param string|null $exchange * @return string|null */ protected function getExchange(string $exchange = null): ?string @@ -715,7 +737,7 @@ protected function getExchange(string $exchange = null): ?string * Get the routing-key for when you use exchanges * The default routing-key is the given destination. * - * @param string $destination + * @param string $destination * @return string */ protected function getRoutingKey(string $destination): string @@ -726,7 +748,7 @@ protected function getRoutingKey(string $destination): string /** * Get the exchangeType, or AMQPExchangeType::DIRECT as default. * - * @param string|null $type + * @param string|null $type * @return string */ protected function getExchangeType(?string $type = null): string @@ -760,7 +782,7 @@ protected function isQuorum(): bool /** * Get the exchange for failed messages. * - * @param string|null $exchange + * @param string|null $exchange * @return string|null */ protected function getFailedExchange(string $exchange = null): ?string @@ -772,7 +794,7 @@ protected function getFailedExchange(string $exchange = null): ?string * Get the routing-key for failed messages * The default routing-key is the given destination substituted by '.failed'. * - * @param string $destination + * @param string $destination * @return string */ protected function getFailedRoutingKey(string $destination): string @@ -783,7 +805,7 @@ protected function getFailedRoutingKey(string $destination): string /** * Checks if the exchange was already declared. * - * @param string $name + * @param string $name * @return bool */ protected function isExchangeDeclared(string $name): bool @@ -794,7 +816,7 @@ protected function isExchangeDeclared(string $name): bool /** * Checks if the queue was already declared. * - * @param string $name + * @param string $name * @return bool */ protected function isQueueDeclared(string $name): bool @@ -805,10 +827,11 @@ protected function isQueueDeclared(string $name): bool /** * Declare the destination when necessary. * - * @param string $destination - * @param string|null $exchange - * @param string|null $exchangeType + * @param string $destination + * @param string|null $exchange + * @param string|null $exchangeType * @return void + * * @throws AMQPProtocolChannelException */ protected function declareDestination( @@ -839,7 +862,7 @@ protected function declareDestination( * Determine all publish properties. * * @param $queue - * @param array $options + * @param array $options * @return array */ protected function publishProperties($queue, array $options = []): array diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index ace2c497..40e3e116 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -162,9 +162,10 @@ protected function getEnvironmentSetUp($app): void /** * @param $object - * @param string $method - * @param array $parameters + * @param string $method + * @param array $parameters * @return mixed + * * @throws Exception */ protected function callMethod($object, string $method, array $parameters = [])