-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathHandleAsyncTransitionProcessor.php
92 lines (79 loc) · 2.37 KB
/
HandleAsyncTransitionProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<?php
namespace Formapro\Pvm\Enqueue;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Formapro\Pvm\Process;
use Formapro\Pvm\ProcessEngine;
use Formapro\Pvm\ProcessStorage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
class HandleAsyncTransitionProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
{
const COMMAND = 'pvm_handle_async_transition';
/**
* @var ProcessEngine
*/
private $processEngine;
/**
* @var ProcessStorage
*/
private $processExecutionStorage;
/**
* @var LoggerInterface
*/
private $logger;
public function __construct(ProcessEngine $processEngine, ProcessStorage $processExecutionStorage, LoggerInterface $logger = null)
{
$this->processEngine = $processEngine;
$this->processExecutionStorage = $processExecutionStorage;
$this->logger = $logger ?: new NullLogger();
}
/**
* {@inheritdoc}
*/
public function process(PsrMessage $psrMessage, PsrContext $psrContext)
{
try {
$data = JSON::decode($psrMessage->getBody());
} catch (\Exception $e) {
return Result::reject($e->getMessage());
}
if (false == array_key_exists('token', $data)) {
return Result::reject('Message miss required token field.');
}
/** @var Process $process */
if (false == $process = $this->processExecutionStorage->getByToken($data['token'])) {
return Result::reject('Process was not found');
}
try {
$this->processEngine->proceed($process->getToken($data['token']), $this->logger);
} finally {
$this->processExecutionStorage->persist($process);
}
return self::ACK;
}
/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
{
return [
'processorName' => static::COMMAND,
'queueName' => static::COMMAND,
'queueNameHardcoded' => true,
'exclusive' => true,
];
}
/**
* {@inheritdoc}
*/
public static function getSubscribedQueues()
{
return [static::COMMAND];
}
}