From d3e56cd1e7a2953b427c6ddd5793294789a47a90 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Tue, 13 Jun 2017 13:27:00 +0300 Subject: [PATCH 1/7] rpc deletes reply queue after receive message --- pkg/enqueue/Rpc/Promise.php | 62 +++++++++++++++++------------------ pkg/enqueue/Rpc/RpcClient.php | 43 +++++++++++++++++++++--- 2 files changed, 68 insertions(+), 37 deletions(-) diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index e3f8820a4..c7d05795b 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -2,36 +2,35 @@ namespace Enqueue\Rpc; -use Enqueue\Psr\PsrConsumer; use Enqueue\Psr\PsrMessage; class Promise { /** - * @var PsrConsumer + * @var \Closure */ - private $consumer; + private $receiveCallback; /** - * @var int + * @var \Closure */ - private $timeout; + private $finallyCallback; /** - * @var string + * @var bool */ - private $correlationId; + private $deleteReplyQueue; /** - * @param PsrConsumer $consumer - * @param string $correlationId - * @param int $timeout + * @param \Closure $receiveCallback + * @param \Closure $finallyCallback */ - public function __construct(PsrConsumer $consumer, $correlationId, $timeout) + public function __construct(\Closure $receiveCallback, \Closure $finallyCallback) { - $this->consumer = $consumer; - $this->timeout = $timeout; - $this->correlationId = $correlationId; + $this->receiveCallback = $receiveCallback; + $this->finallyCallback = $finallyCallback; + + $this->deleteReplyQueue = true; } /** @@ -41,36 +40,35 @@ public function __construct(PsrConsumer $consumer, $correlationId, $timeout) */ public function getMessage() { - $endTime = time() + $this->timeout; - - while (time() < $endTime) { - if ($message = $this->consumer->receive($this->timeout)) { - if ($message->getCorrelationId() === $this->correlationId) { - $this->consumer->acknowledge($message); - - return $message; - } + try { + $result = call_user_func($this->receiveCallback, $this); - $this->consumer->reject($message, true); + if (false == $result instanceof PsrMessage) { + throw new \LogicException(sprintf( + 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($result) ? get_class($result) : gettype($result))); } - } - throw TimeoutException::create($this->timeout, $this->correlationId); + return $result; + } finally { + call_user_func($this->finallyCallback, $this); + } } /** - * @param int $timeout + * On TRUE deletes reply queue after getMessage call + * + * @param bool $delete */ - public function setTimeout($timeout) + public function setDeleteReplyQueue($delete) { - $this->timeout = $timeout; + $this->deleteReplyQueue = (bool) $delete; } /** - * @return int + * @return bool */ - public function getTimeout() + public function isDeleteReplyQueue() { - return $this->timeout; + return $this->deleteReplyQueue; } } diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 89b0cedb9..45ff7bc4f 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -51,9 +51,11 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim if ($message->getReplyTo()) { $replyQueue = $this->context->createQueue($message->getReplyTo()); + $deleteReplyQueue = false; } else { $replyQueue = $this->context->createTemporaryQueue(); $message->setReplyTo($replyQueue->getQueueName()); + $deleteReplyQueue = true; } if (false == $message->getCorrelationId()) { @@ -62,10 +64,41 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim $this->context->createProducer()->send($destination, $message); - return new Promise( - $this->context->createConsumer($replyQueue), - $message->getCorrelationId(), - $timeout - ); + $correlationId = $message->getCorrelationId(); + + $receive = function() use ($replyQueue, $timeout, $correlationId) { + + $endTime = time() + ((int) ($timeout / 1000)); + $consumer = $this->context->createConsumer($replyQueue); + + do { + if ($message = $consumer->receive($timeout)) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + } while (time() < $endTime); + + throw TimeoutException::create($timeout, $correlationId); + }; + + $finally = function(Promise $promise) use ($replyQueue) { + if ($promise->isDeleteReplyQueue()) { + if (false == method_exists($this->context, 'deleteQueue')) { + throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context))); + } + + $this->context->deleteQueue($replyQueue); + } + }; + + $promise = new Promise($receive, $finally); + $promise->setDeleteReplyQueue($deleteReplyQueue); + + return $promise; } } From 1a60f4a30d8053ce6bab6ac210e7d47c09124b57 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Tue, 13 Jun 2017 16:54:07 +0300 Subject: [PATCH 2/7] receiveNoWait --- pkg/enqueue/Client/RpcClient.php | 65 ++++- pkg/enqueue/Rpc/Promise.php | 68 ++++- pkg/enqueue/Rpc/RpcClient.php | 25 +- pkg/enqueue/Tests/Client/RpcClientTest.php | 211 +++++++++++++-- pkg/enqueue/Tests/Rpc/PromiseTest.php | 295 +++++++++++++-------- pkg/enqueue/Tests/Rpc/RpcClientTest.php | 208 +++++++++++++-- 6 files changed, 698 insertions(+), 174 deletions(-) diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php index 9407d8867..2ff27f2c5 100644 --- a/pkg/enqueue/Client/RpcClient.php +++ b/pkg/enqueue/Client/RpcClient.php @@ -5,6 +5,7 @@ use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Rpc\Promise; +use Enqueue\Rpc\TimeoutException; use Enqueue\Util\UUID; class RpcClient @@ -38,7 +39,7 @@ public function __construct(ProducerInterface $producer, PsrContext $context) */ public function call($topic, $message, $timeout) { - return $this->callAsync($topic, $message, $timeout)->getMessage(); + return $this->callAsync($topic, $message, $timeout)->receive(); } /** @@ -62,9 +63,11 @@ public function callAsync($topic, $message, $timeout) if ($message->getReplyTo()) { $replyQueue = $this->context->createQueue($message->getReplyTo()); + $deleteReplyQueue = false; } else { $replyQueue = $this->context->createTemporaryQueue(); $message->setReplyTo($replyQueue->getQueueName()); + $deleteReplyQueue = true; } if (false == $message->getCorrelationId()) { @@ -73,10 +76,60 @@ public function callAsync($topic, $message, $timeout) $this->producer->send($topic, $message); - return new Promise( - $this->context->createConsumer($replyQueue), - $message->getCorrelationId(), - $timeout - ); + $correlationId = $message->getCorrelationId(); + + $receive = function() use ($replyQueue, $timeout, $correlationId) { + + $endTime = time() + ((int) ($timeout / 1000)); + $consumer = $this->context->createConsumer($replyQueue); + + do { + if ($message = $consumer->receive($timeout)) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + } while (time() < $endTime); + + throw TimeoutException::create($timeout, $correlationId); + }; + + $receiveNoWait = function() use ($replyQueue, $correlationId) { + + static $consumer; + + if (null === $consumer) { + $consumer = $this->context->createConsumer($replyQueue); + } + + if ($message = $consumer->receiveNoWait()) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + }; + + $finally = function(Promise $promise) use ($replyQueue) { + if ($promise->isDeleteReplyQueue()) { + if (false == method_exists($this->context, 'deleteQueue')) { + throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context))); + } + + $this->context->deleteQueue($replyQueue); + } + }; + + $promise = new Promise($receive, $receiveNoWait, $finally); + $promise->setDeleteReplyQueue($deleteReplyQueue); + + return $promise; } } diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index c7d05795b..9325bda28 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -11,6 +11,11 @@ class Promise */ private $receiveCallback; + /** + * @var \Closure + */ + private $receiveNoWaitCallback; + /** * @var \Closure */ @@ -21,37 +26,80 @@ class Promise */ private $deleteReplyQueue; + /** + * @var PsrMessage + */ + private $message; + /** * @param \Closure $receiveCallback + * @param \Closure $receiveNoWaitCallback * @param \Closure $finallyCallback */ - public function __construct(\Closure $receiveCallback, \Closure $finallyCallback) + public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCallback, \Closure $finallyCallback) { $this->receiveCallback = $receiveCallback; + $this->receiveNoWaitCallback = $receiveNoWaitCallback; $this->finallyCallback = $finallyCallback; $this->deleteReplyQueue = true; } /** + * Blocks until message received or timeout expired. + * * @throws TimeoutException if the wait timeout is reached * * @return PsrMessage */ - public function getMessage() + public function receive() { - try { - $result = call_user_func($this->receiveCallback, $this); + if (null == $this->message) { + try { + if ($message = $this->doReceive($this->receiveCallback)) { + $this->message = $message; + } + } finally { + call_user_func($this->finallyCallback, $this); + } + } - if (false == $result instanceof PsrMessage) { - throw new \LogicException(sprintf( - 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($result) ? get_class($result) : gettype($result))); + return $this->message; + } + + /** + * Non blocking function. Returns message or null. + * + * @return PsrMessage|null + */ + public function receiveNoWait() + { + if (null == $this->message) { + if ($message = $this->doReceive($this->receiveNoWaitCallback)) { + $this->message = $message; + + call_user_func($this->finallyCallback, $this); } + } - return $result; - } finally { - call_user_func($this->finallyCallback, $this); + return $this->message; + } + + /** + * @param \Closure $cb + * + * @return PsrMessage + */ + private function doReceive(\Closure $cb) + { + $message = call_user_func($cb, $this); + + if (null !== $message && false == $message instanceof PsrMessage) { + throw new \RuntimeException(sprintf( + 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message))); } + + return $message; } /** diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 45ff7bc4f..41f6f20d7 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -33,7 +33,7 @@ public function __construct(PsrContext $context) */ public function call(PsrDestination $destination, PsrMessage $message, $timeout) { - return $this->callAsync($destination, $message, $timeout)->getMessage(); + return $this->callAsync($destination, $message, $timeout)->receive(); } /** @@ -86,17 +86,36 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim throw TimeoutException::create($timeout, $correlationId); }; + $receiveNoWait = function() use ($replyQueue, $correlationId) { + + static $consumer; + + if (null === $consumer) { + $consumer = $this->context->createConsumer($replyQueue); + } + + if ($message = $consumer->receiveNoWait()) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + }; + $finally = function(Promise $promise) use ($replyQueue) { if ($promise->isDeleteReplyQueue()) { if (false == method_exists($this->context, 'deleteQueue')) { - throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context))); + throw new \RuntimeException(sprintf('Context does not support delete queue: "%s"', get_class($this->context))); } $this->context->deleteQueue($replyQueue); } }; - $promise = new Promise($receive, $finally); + $promise = new Promise($receive, $receiveNoWait, $finally); $promise->setDeleteReplyQueue($deleteReplyQueue); return $promise; diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php index d35f1fe51..ba03b111b 100644 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Client/RpcClientTest.php @@ -7,6 +7,7 @@ use Enqueue\Client\RpcClient; use Enqueue\Null\NullContext; use Enqueue\Null\NullMessage; +use Enqueue\Null\NullQueue; use Enqueue\Psr\PsrConsumer; use Enqueue\Psr\PsrContext; use Enqueue\Rpc\Promise; @@ -133,27 +134,6 @@ public function testShouldNotSetCorrelationIdIfSet() $rpc->callAsync('aTopic', $message, 2); } - public function testShouldPopulatePromiseWithExpectedArguments() - { - $context = new NullContext(); - - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - - $timeout = 123; - - $rpc = new RpcClient( - $this->createProducerMock(), - $context - ); - - $promise = $rpc->callAsync('aTopic', $message, $timeout); - - $this->assertInstanceOf(Promise::class, $promise); - $this->assertAttributeEquals('theCorrelationId', 'correlationId', $promise); - $this->assertAttributeEquals(123, 'timeout', $promise); - $this->assertAttributeInstanceOf(PsrConsumer::class, 'consumer', $promise); - } public function testShouldDoSyncCall() { @@ -163,7 +143,7 @@ public function testShouldDoSyncCall() $promiseMock = $this->createMock(Promise::class); $promiseMock ->expects($this->once()) - ->method('getMessage') + ->method('receive') ->willReturn($replyMessage) ; @@ -181,6 +161,185 @@ public function testShouldDoSyncCall() $this->assertSame($replyMessage, $actualReplyMessage); } + public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $rpc->callAsync('topic', $message, 2)->receive(); + } + + public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receiveNoWait') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $rpc->callAsync('topic', $message, 2)->receiveNoWait(); + } + + public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + + $context = $this->getMockBuilder(PsrContext::class) + ->disableOriginalConstructor() + ->setMethods(['deleteQueue']) + ->getMockForAbstractClass() + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + $context + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo($replyQueue)) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $promise = $rpc->callAsync('topic', $message, 2); + $promise->setDeleteReplyQueue(true); + $promise->receive(); + } + + public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $promise = $rpc->callAsync('topic', $message, 2); + $promise->setDeleteReplyQueue(true); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Context does not support delete queue'); + + $promise->receive(); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext */ @@ -196,4 +355,12 @@ private function createProducerMock() { return $this->createMock(ProducerInterface::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrConsumer + */ + private function createPsrConsumerMock() + { + return $this->createMock(PsrConsumer::class); + } } diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index f8d07e592..700f7d311 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -3,140 +3,207 @@ namespace Enqueue\Tests\Rpc; use Enqueue\Null\NullMessage; -use Enqueue\Psr\PsrConsumer; use Enqueue\Rpc\Promise; -use Enqueue\Rpc\TimeoutException; use PHPUnit\Framework\TestCase; class PromiseTest extends TestCase { - public function testCouldBeConstructedWithExpectedSetOfArguments() + public function testIsDeleteReplyQueueShouldReturnTrueByDefault() { - new Promise($this->createPsrConsumerMock(), 'aCorrelationId', 2); + $promise = new Promise(function(){}, function(){}, function(){}); + + $this->assertTrue($promise->isDeleteReplyQueue()); } - public function testShouldTimeoutIfNoResponseMessage() + public function testCouldSetGetDeleteReplyQueue() { - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->atLeastOnce()) - ->method('receive') - ->willReturn(null) - ; - - $promise = new Promise($psrConsumerMock, 'aCorrelationId', 2); - - $this->expectException(TimeoutException::class); - $this->expectExceptionMessage('Rpc call timeout is reached without receiving a reply message. Timeout: 2, CorrelationId: aCorrelationId'); - $promise->getMessage(); + $promise = new Promise(function(){}, function(){}, function(){}); + + $promise->setDeleteReplyQueue(false); + $this->assertFalse($promise->isDeleteReplyQueue()); + + $promise->setDeleteReplyQueue(true); + $this->assertTrue($promise->isDeleteReplyQueue()); + } + + public function testOnReceiveShouldCallReceiveCallBack() + { + $receiveInvoked = false; + $receivecb = function() use (&$receiveInvoked) { + $receiveInvoked = true; + }; + + $promise = new Promise($receivecb, function(){}, function(){}); + $promise->receive(); + + $this->assertTrue($receiveInvoked); + } + + public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack() + { + $receiveInvoked = false; + $receivecb = function() use (&$receiveInvoked) { + $receiveInvoked = true; + }; + + $promise = new Promise(function(){}, $receivecb, function(){}); + $promise->receiveNoWait(); + + $this->assertTrue($receiveInvoked); + } + + public function testOnReceiveShouldCallFinallyCallback() + { + $invoked = false; + $cb = function() use (&$invoked) { + $invoked = true; + }; + + $promise = new Promise(function(){}, function(){}, $cb); + $promise->receive(); + + $this->assertTrue($invoked); + } + + public function testOnReceiveShouldCallFinallyCallbackEvenIfExceptionThrown() + { + $invokedFinally = false; + $finallycb = function() use (&$invokedFinally) { + $invokedFinally = true; + }; + + $invokedReceive = false; + $receivecb = function() use (&$invokedReceive) { + $invokedReceive = true; + throw new \Exception(); + }; + + try { + $promise = new Promise($receivecb, function(){}, $finallycb); + $promise->receive(); + } catch (\Exception $e) {} + + $this->assertTrue($invokedReceive); + $this->assertTrue($invokedFinally); + } + + public function testOnReceiveShouldThrowExceptionIfCallbackReturnNotMessageInstance() + { + $receivecb = function() use (&$invokedReceive) { + return new \stdClass(); + }; + + $promise = new Promise($receivecb, function(){}, function(){}); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); + + $promise->receive(); } - public function testShouldReturnReplyMessageIfCorrelationIdSame() + public function testOnReceiveNoWaitShouldThrowExceptionIfCallbackReturnNotMessageInstance() { - $correlationId = 'theCorrelationId'; - - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); - - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->once()) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->once()) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; - - $promise = new Promise($psrConsumerMock, $correlationId, 2); - - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $receivecb = function() { + return new \stdClass(); + }; + + $promise = new Promise(function(){}, $receivecb, function(){}); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); + + $promise->receiveNoWait(); } - public function testShouldReQueueIfCorrelationIdNotSame() + public function testOnReceiveNoWaitShouldCallFinallyCallbackOnlyIfMessageReceived() { - $correlationId = 'theCorrelationId'; - - $anotherReplyMessage = new NullMessage(); - $anotherReplyMessage->setCorrelationId('theOtherCorrelationId'); - - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); - - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->at(0)) - ->method('receive') - ->willReturn($anotherReplyMessage) - ; - $psrConsumerMock - ->expects($this->at(1)) - ->method('reject') - ->with($this->identicalTo($anotherReplyMessage), true) - ; - $psrConsumerMock - ->expects($this->at(2)) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->at(3)) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; - - $promise = new Promise($psrConsumerMock, $correlationId, 2); - - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $invokedReceive = false; + $receivecb = function() use (&$invokedReceive) { + $invokedReceive = true; + }; + + $invokedFinally = false; + $finallycb = function() use (&$invokedFinally) { + $invokedFinally = true; + }; + + $promise = new Promise(function(){}, $receivecb, $finallycb); + $promise->receiveNoWait(); + + $this->assertTrue($invokedReceive); + $this->assertFalse($invokedFinally); + + // now should call finally too + + $invokedReceive = false; + $receivecb = function() use (&$invokedReceive) { + $invokedReceive = true; + return new NullMessage(); + }; + + $promise = new Promise(function(){}, $receivecb, $finallycb); + $promise->receiveNoWait(); + + $this->assertTrue($invokedReceive); + $this->assertTrue($invokedFinally); } - public function testShouldTrySeveralTimesToReceiveReplyMessage() + public function testOnReceiveShouldNotCallCallbackIfMessageReceivedByReceiveNoWaitBefore() { - $correlationId = 'theCorrelationId'; - - $anotherReplyMessage = new NullMessage(); - $anotherReplyMessage->setCorrelationId('theOtherCorrelationId'); - - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); - - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->at(0)) - ->method('receive') - ->willReturn(null) - ; - $psrConsumerMock - ->expects($this->at(1)) - ->method('receive') - ->willReturn(null) - ; - $psrConsumerMock - ->expects($this->at(2)) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->at(3)) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; - - $promise = new Promise($psrConsumerMock, $correlationId, 2); - - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $message = new NullMessage(); + + $invokedReceive = false; + $receivecb = function() use (&$invokedReceive) { + $invokedReceive = true; + }; + + $invokedReceiveNoWait = false; + $receiveNoWaitCb = function() use (&$invokedReceiveNoWait, $message) { + $invokedReceiveNoWait = true; + + return $message; + }; + + $promise = new Promise($receivecb, $receiveNoWaitCb, function(){}); + + $this->assertSame($message, $promise->receiveNoWait()); + $this->assertTrue($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); + + // receive should return message but not call callback + $invokedReceiveNoWait = false; + + $this->assertSame($message, $promise->receive()); + $this->assertFalse($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); } - /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrConsumer - */ - private function createPsrConsumerMock() + public function testOnReceiveNoWaitShouldNotCallCallbackIfMessageReceivedByReceiveBefore() { - return $this->createMock(PsrConsumer::class); + $message = new NullMessage(); + + $invokedReceive = false; + $receivecb = function() use (&$invokedReceive, $message) { + $invokedReceive = true; + return $message; + }; + + $invokedReceiveNoWait = false; + $receiveNoWaitCb = function() use (&$invokedReceiveNoWait) { + $invokedReceiveNoWait = true; + }; + + $promise = new Promise($receivecb, $receiveNoWaitCb, function(){}); + + $this->assertSame($message, $promise->receive()); + $this->assertTrue($invokedReceive); + $this->assertFalse($invokedReceiveNoWait); + + // receiveNoWait should return message but not call callback + $invokedReceive = false; + + $this->assertSame($message, $promise->receiveNoWait()); + $this->assertFalse($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); } } diff --git a/pkg/enqueue/Tests/Rpc/RpcClientTest.php b/pkg/enqueue/Tests/Rpc/RpcClientTest.php index c9b00df99..3436cb84b 100644 --- a/pkg/enqueue/Tests/Rpc/RpcClientTest.php +++ b/pkg/enqueue/Tests/Rpc/RpcClientTest.php @@ -73,28 +73,189 @@ public function testShouldNotSetCorrelationIdIfSet() $this->assertEquals('theCorrelationId', $message->getCorrelationId()); } - public function testShouldPopulatePromiseWithExpectedArguments() + public function testShouldProduceMessageToQueue() { - $context = new NullContext(); + $queue = new NullQueue('aQueue'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); - $queue = $context->createQueue('rpc.call'); - $message = $context->createMessage(); + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($message)) + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + + $rpc = new RpcClient($context); + + $rpc->callAsync($queue, $message, 2); + } + + public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); $message->setCorrelationId('theCorrelationId'); $message->setReplyTo('theReplyTo'); - $timeout = 123; + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($context); + + $rpc->callAsync($queue, $message, 2)->receive(); + } + + public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receiveNoWait') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; $rpc = new RpcClient($context); - $promise = $rpc->callAsync($queue, $message, $timeout); + $rpc->callAsync($queue, $message, 2)->receiveNoWait(); + } + + public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; - $this->assertInstanceOf(Promise::class, $promise); - $this->assertAttributeEquals('theCorrelationId', 'correlationId', $promise); - $this->assertAttributeEquals(123, 'timeout', $promise); - $this->assertAttributeInstanceOf(PsrConsumer::class, 'consumer', $promise); + $context = $this->getMockBuilder(PsrContext::class) + ->disableOriginalConstructor() + ->setMethods(['deleteQueue']) + ->getMockForAbstractClass() + ; + + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + $context + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo($replyQueue)) + ; + + $rpc = new RpcClient($context); + + $promise = $rpc->callAsync($queue, $message, 2); + $promise->setDeleteReplyQueue(true); + $promise->receive(); } - public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() + public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() { $queue = new NullQueue('aQueue'); $replyQueue = new NullQueue('theReplyTo'); @@ -102,18 +263,21 @@ public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() $message->setCorrelationId('theCorrelationId'); $message->setReplyTo('theReplyTo'); - $producer = $this->createPsrProducerMock(); - $producer + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($queue), $this->identicalTo($message)) + ->method('receive') + ->willReturn($receivedMessage) ; $context = $this->createPsrContextMock(); $context ->expects($this->once()) ->method('createProducer') - ->willReturn($producer) + ->willReturn($this->createPsrProducerMock()) ; $context ->expects($this->once()) @@ -125,12 +289,18 @@ public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() ->expects($this->once()) ->method('createConsumer') ->with($this->identicalTo($replyQueue)) - ->willReturn($this->createPsrConsumerMock()) + ->willReturn($consumer) ; $rpc = new RpcClient($context); - $rpc->callAsync($queue, $message, 2); + $promise = $rpc->callAsync($queue, $message, 2); + $promise->setDeleteReplyQueue(true); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Context does not support delete queue'); + + $promise->receive(); } public function testShouldDoSyncCall() @@ -143,7 +313,7 @@ public function testShouldDoSyncCall() $promiseMock = $this->createMock(Promise::class); $promiseMock ->expects($this->once()) - ->method('getMessage') + ->method('receive') ->willReturn($replyMessage) ; From 93cdf5cbe1477b4bf6a807cdd75611a2092eb295 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Tue, 13 Jun 2017 17:24:18 +0300 Subject: [PATCH 3/7] fix tests --- pkg/enqueue/Tests/Functional/Client/RpcClientTest.php | 2 +- pkg/enqueue/Tests/Rpc/PromiseTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php index 082a519e6..9f18af46a 100644 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php @@ -79,7 +79,7 @@ public function testProduceAndConsumeOneMessage() $this->assertInstanceOf(PsrMessage::class, $requestMessage); $this->assertEquals('Hi Thomas!', $requestMessage->getBody()); - $replyMessage = $promise->getMessage(); + $replyMessage = $promise->receive(); $this->assertEquals('Hi John!', $replyMessage->getBody()); } } diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index 700f7d311..89414dc1f 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -89,7 +89,7 @@ public function testOnReceiveShouldCallFinallyCallbackEvenIfExceptionThrown() public function testOnReceiveShouldThrowExceptionIfCallbackReturnNotMessageInstance() { - $receivecb = function() use (&$invokedReceive) { + $receivecb = function() { return new \stdClass(); }; From fbcf22df8007bd37a9f8c43517cb000bba64063e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Wed, 14 Jun 2017 09:18:50 +0300 Subject: [PATCH 4/7] add deprecated getMessage method --- pkg/enqueue/Client/RpcClient.php | 8 +-- pkg/enqueue/Rpc/Promise.php | 50 +++++++++------ pkg/enqueue/Rpc/RpcClient.php | 8 +-- pkg/enqueue/Tests/Client/RpcClientTest.php | 1 - pkg/enqueue/Tests/Rpc/PromiseTest.php | 73 ++++++++++++++-------- 5 files changed, 84 insertions(+), 56 deletions(-) diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php index 2ff27f2c5..577335133 100644 --- a/pkg/enqueue/Client/RpcClient.php +++ b/pkg/enqueue/Client/RpcClient.php @@ -78,8 +78,7 @@ public function callAsync($topic, $message, $timeout) $correlationId = $message->getCorrelationId(); - $receive = function() use ($replyQueue, $timeout, $correlationId) { - + $receive = function () use ($replyQueue, $timeout, $correlationId) { $endTime = time() + ((int) ($timeout / 1000)); $consumer = $this->context->createConsumer($replyQueue); @@ -98,8 +97,7 @@ public function callAsync($topic, $message, $timeout) throw TimeoutException::create($timeout, $correlationId); }; - $receiveNoWait = function() use ($replyQueue, $correlationId) { - + $receiveNoWait = function () use ($replyQueue, $correlationId) { static $consumer; if (null === $consumer) { @@ -117,7 +115,7 @@ public function callAsync($topic, $message, $timeout) } }; - $finally = function(Promise $promise) use ($replyQueue) { + $finally = function (Promise $promise) use ($replyQueue) { if ($promise->isDeleteReplyQueue()) { if (false == method_exists($this->context, 'deleteQueue')) { throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context))); diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index 9325bda28..53c84b498 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -45,6 +45,20 @@ public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCa $this->deleteReplyQueue = true; } + /** + * Blocks until message received or timeout expired. + * + * @deprecated use "receive" instead + * + * @throws TimeoutException if the wait timeout is reached + * + * @return PsrMessage + */ + public function getMessage() + { + return $this->receive(); + } + /** * Blocks until message received or timeout expired. * @@ -86,24 +100,7 @@ public function receiveNoWait() } /** - * @param \Closure $cb - * - * @return PsrMessage - */ - private function doReceive(\Closure $cb) - { - $message = call_user_func($cb, $this); - - if (null !== $message && false == $message instanceof PsrMessage) { - throw new \RuntimeException(sprintf( - 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message))); - } - - return $message; - } - - /** - * On TRUE deletes reply queue after getMessage call + * On TRUE deletes reply queue after getMessage call. * * @param bool $delete */ @@ -119,4 +116,21 @@ public function isDeleteReplyQueue() { return $this->deleteReplyQueue; } + + /** + * @param \Closure $cb + * + * @return PsrMessage + */ + private function doReceive(\Closure $cb) + { + $message = call_user_func($cb, $this); + + if (null !== $message && false == $message instanceof PsrMessage) { + throw new \RuntimeException(sprintf( + 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message))); + } + + return $message; + } } diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 41f6f20d7..b86044854 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -66,8 +66,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim $correlationId = $message->getCorrelationId(); - $receive = function() use ($replyQueue, $timeout, $correlationId) { - + $receive = function () use ($replyQueue, $timeout, $correlationId) { $endTime = time() + ((int) ($timeout / 1000)); $consumer = $this->context->createConsumer($replyQueue); @@ -86,8 +85,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim throw TimeoutException::create($timeout, $correlationId); }; - $receiveNoWait = function() use ($replyQueue, $correlationId) { - + $receiveNoWait = function () use ($replyQueue, $correlationId) { static $consumer; if (null === $consumer) { @@ -105,7 +103,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim } }; - $finally = function(Promise $promise) use ($replyQueue) { + $finally = function (Promise $promise) use ($replyQueue) { if ($promise->isDeleteReplyQueue()) { if (false == method_exists($this->context, 'deleteQueue')) { throw new \RuntimeException(sprintf('Context does not support delete queue: "%s"', get_class($this->context))); diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php index ba03b111b..3d144c47c 100644 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Client/RpcClientTest.php @@ -134,7 +134,6 @@ public function testShouldNotSetCorrelationIdIfSet() $rpc->callAsync('aTopic', $message, 2); } - public function testShouldDoSyncCall() { $timeout = 123; diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index 89414dc1f..ffc729500 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -10,14 +10,14 @@ class PromiseTest extends TestCase { public function testIsDeleteReplyQueueShouldReturnTrueByDefault() { - $promise = new Promise(function(){}, function(){}, function(){}); + $promise = new Promise(function () {}, function () {}, function () {}); $this->assertTrue($promise->isDeleteReplyQueue()); } public function testCouldSetGetDeleteReplyQueue() { - $promise = new Promise(function(){}, function(){}, function(){}); + $promise = new Promise(function () {}, function () {}, function () {}); $promise->setDeleteReplyQueue(false); $this->assertFalse($promise->isDeleteReplyQueue()); @@ -29,11 +29,11 @@ public function testCouldSetGetDeleteReplyQueue() public function testOnReceiveShouldCallReceiveCallBack() { $receiveInvoked = false; - $receivecb = function() use (&$receiveInvoked) { + $receivecb = function () use (&$receiveInvoked) { $receiveInvoked = true; }; - $promise = new Promise($receivecb, function(){}, function(){}); + $promise = new Promise($receivecb, function () {}, function () {}); $promise->receive(); $this->assertTrue($receiveInvoked); @@ -42,11 +42,11 @@ public function testOnReceiveShouldCallReceiveCallBack() public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack() { $receiveInvoked = false; - $receivecb = function() use (&$receiveInvoked) { + $receivecb = function () use (&$receiveInvoked) { $receiveInvoked = true; }; - $promise = new Promise(function(){}, $receivecb, function(){}); + $promise = new Promise(function () {}, $receivecb, function () {}); $promise->receiveNoWait(); $this->assertTrue($receiveInvoked); @@ -55,11 +55,11 @@ public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack() public function testOnReceiveShouldCallFinallyCallback() { $invoked = false; - $cb = function() use (&$invoked) { + $cb = function () use (&$invoked) { $invoked = true; }; - $promise = new Promise(function(){}, function(){}, $cb); + $promise = new Promise(function () {}, function () {}, $cb); $promise->receive(); $this->assertTrue($invoked); @@ -68,20 +68,21 @@ public function testOnReceiveShouldCallFinallyCallback() public function testOnReceiveShouldCallFinallyCallbackEvenIfExceptionThrown() { $invokedFinally = false; - $finallycb = function() use (&$invokedFinally) { + $finallycb = function () use (&$invokedFinally) { $invokedFinally = true; }; $invokedReceive = false; - $receivecb = function() use (&$invokedReceive) { + $receivecb = function () use (&$invokedReceive) { $invokedReceive = true; throw new \Exception(); }; try { - $promise = new Promise($receivecb, function(){}, $finallycb); + $promise = new Promise($receivecb, function () {}, $finallycb); $promise->receive(); - } catch (\Exception $e) {} + } catch (\Exception $e) { + } $this->assertTrue($invokedReceive); $this->assertTrue($invokedFinally); @@ -89,11 +90,11 @@ public function testOnReceiveShouldCallFinallyCallbackEvenIfExceptionThrown() public function testOnReceiveShouldThrowExceptionIfCallbackReturnNotMessageInstance() { - $receivecb = function() { + $receivecb = function () { return new \stdClass(); }; - $promise = new Promise($receivecb, function(){}, function(){}); + $promise = new Promise($receivecb, function () {}, function () {}); $this->expectException(\RuntimeException::class); $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); @@ -103,11 +104,11 @@ public function testOnReceiveShouldThrowExceptionIfCallbackReturnNotMessageInsta public function testOnReceiveNoWaitShouldThrowExceptionIfCallbackReturnNotMessageInstance() { - $receivecb = function() { + $receivecb = function () { return new \stdClass(); }; - $promise = new Promise(function(){}, $receivecb, function(){}); + $promise = new Promise(function () {}, $receivecb, function () {}); $this->expectException(\RuntimeException::class); $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); @@ -118,16 +119,16 @@ public function testOnReceiveNoWaitShouldThrowExceptionIfCallbackReturnNotMessag public function testOnReceiveNoWaitShouldCallFinallyCallbackOnlyIfMessageReceived() { $invokedReceive = false; - $receivecb = function() use (&$invokedReceive) { + $receivecb = function () use (&$invokedReceive) { $invokedReceive = true; }; $invokedFinally = false; - $finallycb = function() use (&$invokedFinally) { + $finallycb = function () use (&$invokedFinally) { $invokedFinally = true; }; - $promise = new Promise(function(){}, $receivecb, $finallycb); + $promise = new Promise(function () {}, $receivecb, $finallycb); $promise->receiveNoWait(); $this->assertTrue($invokedReceive); @@ -136,12 +137,13 @@ public function testOnReceiveNoWaitShouldCallFinallyCallbackOnlyIfMessageReceive // now should call finally too $invokedReceive = false; - $receivecb = function() use (&$invokedReceive) { + $receivecb = function () use (&$invokedReceive) { $invokedReceive = true; + return new NullMessage(); }; - $promise = new Promise(function(){}, $receivecb, $finallycb); + $promise = new Promise(function () {}, $receivecb, $finallycb); $promise->receiveNoWait(); $this->assertTrue($invokedReceive); @@ -153,18 +155,18 @@ public function testOnReceiveShouldNotCallCallbackIfMessageReceivedByReceiveNoWa $message = new NullMessage(); $invokedReceive = false; - $receivecb = function() use (&$invokedReceive) { + $receivecb = function () use (&$invokedReceive) { $invokedReceive = true; }; $invokedReceiveNoWait = false; - $receiveNoWaitCb = function() use (&$invokedReceiveNoWait, $message) { + $receiveNoWaitCb = function () use (&$invokedReceiveNoWait, $message) { $invokedReceiveNoWait = true; return $message; }; - $promise = new Promise($receivecb, $receiveNoWaitCb, function(){}); + $promise = new Promise($receivecb, $receiveNoWaitCb, function () {}); $this->assertSame($message, $promise->receiveNoWait()); $this->assertTrue($invokedReceiveNoWait); @@ -183,17 +185,18 @@ public function testOnReceiveNoWaitShouldNotCallCallbackIfMessageReceivedByRecei $message = new NullMessage(); $invokedReceive = false; - $receivecb = function() use (&$invokedReceive, $message) { + $receivecb = function () use (&$invokedReceive, $message) { $invokedReceive = true; + return $message; }; $invokedReceiveNoWait = false; - $receiveNoWaitCb = function() use (&$invokedReceiveNoWait) { + $receiveNoWaitCb = function () use (&$invokedReceiveNoWait) { $invokedReceiveNoWait = true; }; - $promise = new Promise($receivecb, $receiveNoWaitCb, function(){}); + $promise = new Promise($receivecb, $receiveNoWaitCb, function () {}); $this->assertSame($message, $promise->receive()); $this->assertTrue($invokedReceive); @@ -206,4 +209,20 @@ public function testOnReceiveNoWaitShouldNotCallCallbackIfMessageReceivedByRecei $this->assertFalse($invokedReceiveNoWait); $this->assertFalse($invokedReceive); } + + public function testDeprecatedGetMessageShouldCallReceiveMethod() + { + $promise = $this->getMockBuilder(Promise::class) + ->disableOriginalConstructor() + ->setMethods(['receive']) + ->getMock() + ; + + $promise + ->expects($this->once()) + ->method('receive') + ; + + $promise->getMessage(); + } } From 258146ad1994421885b00df21d77550043a80889 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Wed, 14 Jun 2017 09:36:39 +0300 Subject: [PATCH 5/7] ignore delete queue if context does not support that --- pkg/enqueue/Client/RpcClient.php | 6 +----- pkg/enqueue/Rpc/RpcClient.php | 6 +----- pkg/enqueue/Tests/Client/RpcClientTest.php | 5 +---- pkg/enqueue/Tests/Rpc/RpcClientTest.php | 5 +---- 4 files changed, 4 insertions(+), 18 deletions(-) diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php index 577335133..440c8f59b 100644 --- a/pkg/enqueue/Client/RpcClient.php +++ b/pkg/enqueue/Client/RpcClient.php @@ -116,11 +116,7 @@ public function callAsync($topic, $message, $timeout) }; $finally = function (Promise $promise) use ($replyQueue) { - if ($promise->isDeleteReplyQueue()) { - if (false == method_exists($this->context, 'deleteQueue')) { - throw new \RuntimeException(sprintf('Context does not support delete queues: "%s"', get_class($this->context))); - } - + if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { $this->context->deleteQueue($replyQueue); } }; diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index b86044854..87d65bf15 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -104,11 +104,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim }; $finally = function (Promise $promise) use ($replyQueue) { - if ($promise->isDeleteReplyQueue()) { - if (false == method_exists($this->context, 'deleteQueue')) { - throw new \RuntimeException(sprintf('Context does not support delete queue: "%s"', get_class($this->context))); - } - + if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { $this->context->deleteQueue($replyQueue); } }; diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php index 3d144c47c..37311ba1d 100644 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Client/RpcClientTest.php @@ -297,7 +297,7 @@ public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() $promise->receive(); } - public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() + public function testShouldNotCallDeleteQueueIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() { $replyQueue = new NullQueue('theReplyTo'); $message = new Message(); @@ -333,9 +333,6 @@ public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoD $promise = $rpc->callAsync('topic', $message, 2); $promise->setDeleteReplyQueue(true); - $this->expectException(\RuntimeException::class); - $this->expectExceptionMessage('Context does not support delete queue'); - $promise->receive(); } diff --git a/pkg/enqueue/Tests/Rpc/RpcClientTest.php b/pkg/enqueue/Tests/Rpc/RpcClientTest.php index 3436cb84b..068956879 100644 --- a/pkg/enqueue/Tests/Rpc/RpcClientTest.php +++ b/pkg/enqueue/Tests/Rpc/RpcClientTest.php @@ -255,7 +255,7 @@ public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() $promise->receive(); } - public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() + public function testShouldNotCallDeleteQueueIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() { $queue = new NullQueue('aQueue'); $replyQueue = new NullQueue('theReplyTo'); @@ -297,9 +297,6 @@ public function testShouldThrowExceptionIfDeleteReplyQueueIsTrueButContextHasNoD $promise = $rpc->callAsync($queue, $message, 2); $promise->setDeleteReplyQueue(true); - $this->expectException(\RuntimeException::class); - $this->expectExceptionMessage('Context does not support delete queue'); - $promise->receive(); } From 93e8f5da698fe258d0234356e9eda8a2b1fb7ee2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Wed, 14 Jun 2017 09:47:28 +0300 Subject: [PATCH 6/7] increace test expected runtime --- pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php index caeaae542..eb0a11b78 100644 --- a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php +++ b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php @@ -66,7 +66,7 @@ public function testReturnNullImmediatelyOnReceiveNoWait() $this->assertNull($message); - $this->assertLessThan(0.5, $endAt - $startAt); + $this->assertLessThan(1, $endAt - $startAt); } public function testProduceAndReceiveOneMessage() From 863166b3f0df66c77ad5ff5a6141d6bcccb8d7c0 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko <pascalamsg@gmail.com> Date: Wed, 14 Jun 2017 09:53:49 +0300 Subject: [PATCH 7/7] update docs --- docs/client/rpc_call.md | 2 +- docs/quick_tour.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index b8fc13698..4744fc074 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -43,7 +43,7 @@ $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5); $replyMessages = []; foreach ($promises as $promise) { - $replyMessages[] = $promise->getMessage(); + $replyMessages[] = $promise->receive(); } ``` diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 3d334d3a2..7159af26a 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -124,7 +124,7 @@ $message = $psrContext->createMessage('Hi there!'); $rpcClient = new RpcClient($psrContext); $promise = $rpcClient->callAsync($queue, $message, 1); -$replyMessage = $promise->getMessage(); +$replyMessage = $promise->receive(); ``` There is also extensions for the consumption component.