-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathLimitConsumptionTimeExtension.php
62 lines (52 loc) · 1.77 KB
/
LimitConsumptionTimeExtension.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php
namespace Enqueue\Consumption\Extension;
use Enqueue\Consumption\Context\PostConsume;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\Context\PreConsume;
use Enqueue\Consumption\PostConsumeExtensionInterface;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\PreConsumeExtensionInterface;
use Psr\Log\LoggerInterface;
class LimitConsumptionTimeExtension implements PreConsumeExtensionInterface, PostConsumeExtensionInterface, PostMessageReceivedExtensionInterface
{
/**
* @var \DateTime
*/
protected $timeLimit;
public function __construct(\DateTime $timeLimit)
{
$this->timeLimit = $timeLimit;
}
public function onPreConsume(PreConsume $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}
public function onPostConsume(PostConsume $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}
public function onPostMessageReceived(PostMessageReceived $context): void
{
if ($this->shouldBeStopped($context->getLogger())) {
$context->interruptExecution();
}
}
protected function shouldBeStopped(LoggerInterface $logger): bool
{
$now = new \DateTime();
if ($now >= $this->timeLimit) {
$logger->debug(sprintf(
'[LimitConsumptionTimeExtension] Execution interrupted as limit time has passed.'.
' now: "%s", time-limit: "%s"',
$now->format(\DATE_ISO8601),
$this->timeLimit->format(\DATE_ISO8601)
));
return true;
}
return false;
}
}