-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathRabbitMqDlxDelayStrategy.php
51 lines (41 loc) · 2.28 KB
/
RabbitMqDlxDelayStrategy.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php
declare(strict_types=1);
namespace Enqueue\AmqpTools;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpDestination;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Interop\Queue\Exception\InvalidDestinationException;
class RabbitMqDlxDelayStrategy implements DelayStrategy
{
public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message, int $delay): void
{
$properties = $message->getProperties();
// The x-death header must be removed because of the bug in RabbitMQ.
// It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there.
// https://github.com/rabbitmq/rabbitmq-server/issues/216
unset($properties['x-death']);
$delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders());
$delayMessage->setRoutingKey($message->getRoutingKey());
if ($dest instanceof AmqpTopic) {
$routingKey = $message->getRoutingKey() ? '.'.$message->getRoutingKey() : '';
$name = sprintf('enqueue.%s%s.%s.x.delay', $dest->getTopicName(), $routingKey, $delay);
$delayQueue = $context->createQueue($name);
$delayQueue->addFlag(AmqpTopic::FLAG_DURABLE);
$delayQueue->setArgument('x-message-ttl', $delay);
$delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName());
$delayQueue->setArgument('x-dead-letter-routing-key', (string) $delayMessage->getRoutingKey());
} elseif ($dest instanceof AmqpQueue) {
$delayQueue = $context->createQueue('enqueue.'.$dest->getQueueName().'.'.$delay.'.delayed');
$delayQueue->addFlag(AmqpTopic::FLAG_DURABLE);
$delayQueue->setArgument('x-message-ttl', $delay);
$delayQueue->setArgument('x-dead-letter-exchange', '');
$delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName());
} else {
throw new InvalidDestinationException(sprintf('The destination must be an instance of %s but got %s.', AmqpTopic::class.'|'.AmqpQueue::class, $dest::class));
}
$context->declareQueue($delayQueue);
$context->createProducer()->send($delayQueue, $delayMessage);
}
}