diff --git a/docs/client/rpc_call.md b/docs/client/rpc_call.md index b8fc13698..4744fc074 100644 --- a/docs/client/rpc_call.md +++ b/docs/client/rpc_call.md @@ -43,7 +43,7 @@ $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5); $replyMessages = []; foreach ($promises as $promise) { - $replyMessages[] = $promise->getMessage(); + $replyMessages[] = $promise->receive(); } ``` diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 3d334d3a2..7159af26a 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -124,7 +124,7 @@ $message = $psrContext->createMessage('Hi there!'); $rpcClient = new RpcClient($psrContext); $promise = $rpcClient->callAsync($queue, $message, 1); -$replyMessage = $promise->getMessage(); +$replyMessage = $promise->receive(); ``` There is also extensions for the consumption component. diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php index 9407d8867..440c8f59b 100644 --- a/pkg/enqueue/Client/RpcClient.php +++ b/pkg/enqueue/Client/RpcClient.php @@ -5,6 +5,7 @@ use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Rpc\Promise; +use Enqueue\Rpc\TimeoutException; use Enqueue\Util\UUID; class RpcClient @@ -38,7 +39,7 @@ public function __construct(ProducerInterface $producer, PsrContext $context) */ public function call($topic, $message, $timeout) { - return $this->callAsync($topic, $message, $timeout)->getMessage(); + return $this->callAsync($topic, $message, $timeout)->receive(); } /** @@ -62,9 +63,11 @@ public function callAsync($topic, $message, $timeout) if ($message->getReplyTo()) { $replyQueue = $this->context->createQueue($message->getReplyTo()); + $deleteReplyQueue = false; } else { $replyQueue = $this->context->createTemporaryQueue(); $message->setReplyTo($replyQueue->getQueueName()); + $deleteReplyQueue = true; } if (false == $message->getCorrelationId()) { @@ -73,10 +76,54 @@ public function callAsync($topic, $message, $timeout) $this->producer->send($topic, $message); - return new Promise( - $this->context->createConsumer($replyQueue), - $message->getCorrelationId(), - $timeout - ); + $correlationId = $message->getCorrelationId(); + + $receive = function () use ($replyQueue, $timeout, $correlationId) { + $endTime = time() + ((int) ($timeout / 1000)); + $consumer = $this->context->createConsumer($replyQueue); + + do { + if ($message = $consumer->receive($timeout)) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + } while (time() < $endTime); + + throw TimeoutException::create($timeout, $correlationId); + }; + + $receiveNoWait = function () use ($replyQueue, $correlationId) { + static $consumer; + + if (null === $consumer) { + $consumer = $this->context->createConsumer($replyQueue); + } + + if ($message = $consumer->receiveNoWait()) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + }; + + $finally = function (Promise $promise) use ($replyQueue) { + if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { + $this->context->deleteQueue($replyQueue); + } + }; + + $promise = new Promise($receive, $receiveNoWait, $finally); + $promise->setDeleteReplyQueue($deleteReplyQueue); + + return $promise; } } diff --git a/pkg/enqueue/Rpc/Promise.php b/pkg/enqueue/Rpc/Promise.php index e3f8820a4..53c84b498 100644 --- a/pkg/enqueue/Rpc/Promise.php +++ b/pkg/enqueue/Rpc/Promise.php @@ -2,75 +2,135 @@ namespace Enqueue\Rpc; -use Enqueue\Psr\PsrConsumer; use Enqueue\Psr\PsrMessage; class Promise { /** - * @var PsrConsumer + * @var \Closure */ - private $consumer; + private $receiveCallback; /** - * @var int + * @var \Closure */ - private $timeout; + private $receiveNoWaitCallback; /** - * @var string + * @var \Closure */ - private $correlationId; + private $finallyCallback; /** - * @param PsrConsumer $consumer - * @param string $correlationId - * @param int $timeout + * @var bool */ - public function __construct(PsrConsumer $consumer, $correlationId, $timeout) + private $deleteReplyQueue; + + /** + * @var PsrMessage + */ + private $message; + + /** + * @param \Closure $receiveCallback + * @param \Closure $receiveNoWaitCallback + * @param \Closure $finallyCallback + */ + public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCallback, \Closure $finallyCallback) { - $this->consumer = $consumer; - $this->timeout = $timeout; - $this->correlationId = $correlationId; + $this->receiveCallback = $receiveCallback; + $this->receiveNoWaitCallback = $receiveNoWaitCallback; + $this->finallyCallback = $finallyCallback; + + $this->deleteReplyQueue = true; } /** + * Blocks until message received or timeout expired. + * + * @deprecated use "receive" instead + * * @throws TimeoutException if the wait timeout is reached * * @return PsrMessage */ public function getMessage() { - $endTime = time() + $this->timeout; - - while (time() < $endTime) { - if ($message = $this->consumer->receive($this->timeout)) { - if ($message->getCorrelationId() === $this->correlationId) { - $this->consumer->acknowledge($message); + return $this->receive(); + } - return $message; + /** + * Blocks until message received or timeout expired. + * + * @throws TimeoutException if the wait timeout is reached + * + * @return PsrMessage + */ + public function receive() + { + if (null == $this->message) { + try { + if ($message = $this->doReceive($this->receiveCallback)) { + $this->message = $message; } + } finally { + call_user_func($this->finallyCallback, $this); + } + } + + return $this->message; + } + + /** + * Non blocking function. Returns message or null. + * + * @return PsrMessage|null + */ + public function receiveNoWait() + { + if (null == $this->message) { + if ($message = $this->doReceive($this->receiveNoWaitCallback)) { + $this->message = $message; - $this->consumer->reject($message, true); + call_user_func($this->finallyCallback, $this); } } - throw TimeoutException::create($this->timeout, $this->correlationId); + return $this->message; } /** - * @param int $timeout + * On TRUE deletes reply queue after getMessage call. + * + * @param bool $delete */ - public function setTimeout($timeout) + public function setDeleteReplyQueue($delete) { - $this->timeout = $timeout; + $this->deleteReplyQueue = (bool) $delete; } /** - * @return int + * @return bool */ - public function getTimeout() + public function isDeleteReplyQueue() { - return $this->timeout; + return $this->deleteReplyQueue; + } + + /** + * @param \Closure $cb + * + * @return PsrMessage + */ + private function doReceive(\Closure $cb) + { + $message = call_user_func($cb, $this); + + if (null !== $message && false == $message instanceof PsrMessage) { + throw new \RuntimeException(sprintf( + 'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message))); + } + + return $message; } } diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 89b0cedb9..87d65bf15 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -33,7 +33,7 @@ public function __construct(PsrContext $context) */ public function call(PsrDestination $destination, PsrMessage $message, $timeout) { - return $this->callAsync($destination, $message, $timeout)->getMessage(); + return $this->callAsync($destination, $message, $timeout)->receive(); } /** @@ -51,9 +51,11 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim if ($message->getReplyTo()) { $replyQueue = $this->context->createQueue($message->getReplyTo()); + $deleteReplyQueue = false; } else { $replyQueue = $this->context->createTemporaryQueue(); $message->setReplyTo($replyQueue->getQueueName()); + $deleteReplyQueue = true; } if (false == $message->getCorrelationId()) { @@ -62,10 +64,54 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim $this->context->createProducer()->send($destination, $message); - return new Promise( - $this->context->createConsumer($replyQueue), - $message->getCorrelationId(), - $timeout - ); + $correlationId = $message->getCorrelationId(); + + $receive = function () use ($replyQueue, $timeout, $correlationId) { + $endTime = time() + ((int) ($timeout / 1000)); + $consumer = $this->context->createConsumer($replyQueue); + + do { + if ($message = $consumer->receive($timeout)) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + } while (time() < $endTime); + + throw TimeoutException::create($timeout, $correlationId); + }; + + $receiveNoWait = function () use ($replyQueue, $correlationId) { + static $consumer; + + if (null === $consumer) { + $consumer = $this->context->createConsumer($replyQueue); + } + + if ($message = $consumer->receiveNoWait()) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + }; + + $finally = function (Promise $promise) use ($replyQueue) { + if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { + $this->context->deleteQueue($replyQueue); + } + }; + + $promise = new Promise($receive, $receiveNoWait, $finally); + $promise->setDeleteReplyQueue($deleteReplyQueue); + + return $promise; } } diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php index d35f1fe51..37311ba1d 100644 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Client/RpcClientTest.php @@ -7,6 +7,7 @@ use Enqueue\Client\RpcClient; use Enqueue\Null\NullContext; use Enqueue\Null\NullMessage; +use Enqueue\Null\NullQueue; use Enqueue\Psr\PsrConsumer; use Enqueue\Psr\PsrContext; use Enqueue\Rpc\Promise; @@ -133,28 +134,6 @@ public function testShouldNotSetCorrelationIdIfSet() $rpc->callAsync('aTopic', $message, 2); } - public function testShouldPopulatePromiseWithExpectedArguments() - { - $context = new NullContext(); - - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - - $timeout = 123; - - $rpc = new RpcClient( - $this->createProducerMock(), - $context - ); - - $promise = $rpc->callAsync('aTopic', $message, $timeout); - - $this->assertInstanceOf(Promise::class, $promise); - $this->assertAttributeEquals('theCorrelationId', 'correlationId', $promise); - $this->assertAttributeEquals(123, 'timeout', $promise); - $this->assertAttributeInstanceOf(PsrConsumer::class, 'consumer', $promise); - } - public function testShouldDoSyncCall() { $timeout = 123; @@ -163,7 +142,7 @@ public function testShouldDoSyncCall() $promiseMock = $this->createMock(Promise::class); $promiseMock ->expects($this->once()) - ->method('getMessage') + ->method('receive') ->willReturn($replyMessage) ; @@ -181,6 +160,182 @@ public function testShouldDoSyncCall() $this->assertSame($replyMessage, $actualReplyMessage); } + public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $rpc->callAsync('topic', $message, 2)->receive(); + } + + public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receiveNoWait') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $rpc->callAsync('topic', $message, 2)->receiveNoWait(); + } + + public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + + $context = $this->getMockBuilder(PsrContext::class) + ->disableOriginalConstructor() + ->setMethods(['deleteQueue']) + ->getMockForAbstractClass() + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + $context + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo($replyQueue)) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $promise = $rpc->callAsync('topic', $message, 2); + $promise->setDeleteReplyQueue(true); + $promise->receive(); + } + + public function testShouldNotCallDeleteQueueIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() + { + $replyQueue = new NullQueue('theReplyTo'); + $message = new Message(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($this->createProducerMock(), $context); + + $promise = $rpc->callAsync('topic', $message, 2); + $promise->setDeleteReplyQueue(true); + + $promise->receive(); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext */ @@ -196,4 +351,12 @@ private function createProducerMock() { return $this->createMock(ProducerInterface::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrConsumer + */ + private function createPsrConsumerMock() + { + return $this->createMock(PsrConsumer::class); + } } diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php index 082a519e6..9f18af46a 100644 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php @@ -79,7 +79,7 @@ public function testProduceAndConsumeOneMessage() $this->assertInstanceOf(PsrMessage::class, $requestMessage); $this->assertEquals('Hi Thomas!', $requestMessage->getBody()); - $replyMessage = $promise->getMessage(); + $replyMessage = $promise->receive(); $this->assertEquals('Hi John!', $replyMessage->getBody()); } } diff --git a/pkg/enqueue/Tests/Rpc/PromiseTest.php b/pkg/enqueue/Tests/Rpc/PromiseTest.php index f8d07e592..ffc729500 100644 --- a/pkg/enqueue/Tests/Rpc/PromiseTest.php +++ b/pkg/enqueue/Tests/Rpc/PromiseTest.php @@ -3,140 +3,226 @@ namespace Enqueue\Tests\Rpc; use Enqueue\Null\NullMessage; -use Enqueue\Psr\PsrConsumer; use Enqueue\Rpc\Promise; -use Enqueue\Rpc\TimeoutException; use PHPUnit\Framework\TestCase; class PromiseTest extends TestCase { - public function testCouldBeConstructedWithExpectedSetOfArguments() + public function testIsDeleteReplyQueueShouldReturnTrueByDefault() { - new Promise($this->createPsrConsumerMock(), 'aCorrelationId', 2); + $promise = new Promise(function () {}, function () {}, function () {}); + + $this->assertTrue($promise->isDeleteReplyQueue()); } - public function testShouldTimeoutIfNoResponseMessage() + public function testCouldSetGetDeleteReplyQueue() { - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->atLeastOnce()) - ->method('receive') - ->willReturn(null) - ; + $promise = new Promise(function () {}, function () {}, function () {}); - $promise = new Promise($psrConsumerMock, 'aCorrelationId', 2); + $promise->setDeleteReplyQueue(false); + $this->assertFalse($promise->isDeleteReplyQueue()); - $this->expectException(TimeoutException::class); - $this->expectExceptionMessage('Rpc call timeout is reached without receiving a reply message. Timeout: 2, CorrelationId: aCorrelationId'); - $promise->getMessage(); + $promise->setDeleteReplyQueue(true); + $this->assertTrue($promise->isDeleteReplyQueue()); } - public function testShouldReturnReplyMessageIfCorrelationIdSame() + public function testOnReceiveShouldCallReceiveCallBack() { - $correlationId = 'theCorrelationId'; + $receiveInvoked = false; + $receivecb = function () use (&$receiveInvoked) { + $receiveInvoked = true; + }; - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); + $promise = new Promise($receivecb, function () {}, function () {}); + $promise->receive(); - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->once()) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->once()) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; + $this->assertTrue($receiveInvoked); + } + + public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack() + { + $receiveInvoked = false; + $receivecb = function () use (&$receiveInvoked) { + $receiveInvoked = true; + }; - $promise = new Promise($psrConsumerMock, $correlationId, 2); + $promise = new Promise(function () {}, $receivecb, function () {}); + $promise->receiveNoWait(); - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $this->assertTrue($receiveInvoked); } - public function testShouldReQueueIfCorrelationIdNotSame() + public function testOnReceiveShouldCallFinallyCallback() { - $correlationId = 'theCorrelationId'; + $invoked = false; + $cb = function () use (&$invoked) { + $invoked = true; + }; - $anotherReplyMessage = new NullMessage(); - $anotherReplyMessage->setCorrelationId('theOtherCorrelationId'); + $promise = new Promise(function () {}, function () {}, $cb); + $promise->receive(); - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); + $this->assertTrue($invoked); + } - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->at(0)) - ->method('receive') - ->willReturn($anotherReplyMessage) - ; - $psrConsumerMock - ->expects($this->at(1)) - ->method('reject') - ->with($this->identicalTo($anotherReplyMessage), true) - ; - $psrConsumerMock - ->expects($this->at(2)) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->at(3)) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; + public function testOnReceiveShouldCallFinallyCallbackEvenIfExceptionThrown() + { + $invokedFinally = false; + $finallycb = function () use (&$invokedFinally) { + $invokedFinally = true; + }; + + $invokedReceive = false; + $receivecb = function () use (&$invokedReceive) { + $invokedReceive = true; + throw new \Exception(); + }; + + try { + $promise = new Promise($receivecb, function () {}, $finallycb); + $promise->receive(); + } catch (\Exception $e) { + } + + $this->assertTrue($invokedReceive); + $this->assertTrue($invokedFinally); + } - $promise = new Promise($psrConsumerMock, $correlationId, 2); + public function testOnReceiveShouldThrowExceptionIfCallbackReturnNotMessageInstance() + { + $receivecb = function () { + return new \stdClass(); + }; - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $promise = new Promise($receivecb, function () {}, function () {}); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); + + $promise->receive(); } - public function testShouldTrySeveralTimesToReceiveReplyMessage() + public function testOnReceiveNoWaitShouldThrowExceptionIfCallbackReturnNotMessageInstance() { - $correlationId = 'theCorrelationId'; + $receivecb = function () { + return new \stdClass(); + }; - $anotherReplyMessage = new NullMessage(); - $anotherReplyMessage->setCorrelationId('theOtherCorrelationId'); + $promise = new Promise(function () {}, $receivecb, function () {}); - $replyMessage = new NullMessage(); - $replyMessage->setCorrelationId($correlationId); + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Expected "Enqueue\Psr\PsrMessage" but got: "stdClass"'); - $psrConsumerMock = $this->createPsrConsumerMock(); - $psrConsumerMock - ->expects($this->at(0)) - ->method('receive') - ->willReturn(null) - ; - $psrConsumerMock - ->expects($this->at(1)) - ->method('receive') - ->willReturn(null) - ; - $psrConsumerMock - ->expects($this->at(2)) - ->method('receive') - ->willReturn($replyMessage) - ; - $psrConsumerMock - ->expects($this->at(3)) - ->method('acknowledge') - ->with($this->identicalTo($replyMessage)) - ; + $promise->receiveNoWait(); + } + + public function testOnReceiveNoWaitShouldCallFinallyCallbackOnlyIfMessageReceived() + { + $invokedReceive = false; + $receivecb = function () use (&$invokedReceive) { + $invokedReceive = true; + }; - $promise = new Promise($psrConsumerMock, $correlationId, 2); + $invokedFinally = false; + $finallycb = function () use (&$invokedFinally) { + $invokedFinally = true; + }; - $actualReplyMessage = $promise->getMessage(); - $this->assertSame($replyMessage, $actualReplyMessage); + $promise = new Promise(function () {}, $receivecb, $finallycb); + $promise->receiveNoWait(); + + $this->assertTrue($invokedReceive); + $this->assertFalse($invokedFinally); + + // now should call finally too + + $invokedReceive = false; + $receivecb = function () use (&$invokedReceive) { + $invokedReceive = true; + + return new NullMessage(); + }; + + $promise = new Promise(function () {}, $receivecb, $finallycb); + $promise->receiveNoWait(); + + $this->assertTrue($invokedReceive); + $this->assertTrue($invokedFinally); } - /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrConsumer - */ - private function createPsrConsumerMock() + public function testOnReceiveShouldNotCallCallbackIfMessageReceivedByReceiveNoWaitBefore() { - return $this->createMock(PsrConsumer::class); + $message = new NullMessage(); + + $invokedReceive = false; + $receivecb = function () use (&$invokedReceive) { + $invokedReceive = true; + }; + + $invokedReceiveNoWait = false; + $receiveNoWaitCb = function () use (&$invokedReceiveNoWait, $message) { + $invokedReceiveNoWait = true; + + return $message; + }; + + $promise = new Promise($receivecb, $receiveNoWaitCb, function () {}); + + $this->assertSame($message, $promise->receiveNoWait()); + $this->assertTrue($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); + + // receive should return message but not call callback + $invokedReceiveNoWait = false; + + $this->assertSame($message, $promise->receive()); + $this->assertFalse($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); + } + + public function testOnReceiveNoWaitShouldNotCallCallbackIfMessageReceivedByReceiveBefore() + { + $message = new NullMessage(); + + $invokedReceive = false; + $receivecb = function () use (&$invokedReceive, $message) { + $invokedReceive = true; + + return $message; + }; + + $invokedReceiveNoWait = false; + $receiveNoWaitCb = function () use (&$invokedReceiveNoWait) { + $invokedReceiveNoWait = true; + }; + + $promise = new Promise($receivecb, $receiveNoWaitCb, function () {}); + + $this->assertSame($message, $promise->receive()); + $this->assertTrue($invokedReceive); + $this->assertFalse($invokedReceiveNoWait); + + // receiveNoWait should return message but not call callback + $invokedReceive = false; + + $this->assertSame($message, $promise->receiveNoWait()); + $this->assertFalse($invokedReceiveNoWait); + $this->assertFalse($invokedReceive); + } + + public function testDeprecatedGetMessageShouldCallReceiveMethod() + { + $promise = $this->getMockBuilder(Promise::class) + ->disableOriginalConstructor() + ->setMethods(['receive']) + ->getMock() + ; + + $promise + ->expects($this->once()) + ->method('receive') + ; + + $promise->getMessage(); } } diff --git a/pkg/enqueue/Tests/Rpc/RpcClientTest.php b/pkg/enqueue/Tests/Rpc/RpcClientTest.php index c9b00df99..068956879 100644 --- a/pkg/enqueue/Tests/Rpc/RpcClientTest.php +++ b/pkg/enqueue/Tests/Rpc/RpcClientTest.php @@ -73,28 +73,135 @@ public function testShouldNotSetCorrelationIdIfSet() $this->assertEquals('theCorrelationId', $message->getCorrelationId()); } - public function testShouldPopulatePromiseWithExpectedArguments() + public function testShouldProduceMessageToQueue() { - $context = new NullContext(); + $queue = new NullQueue('aQueue'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); - $queue = $context->createQueue('rpc.call'); - $message = $context->createMessage(); + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($message)) + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + + $rpc = new RpcClient($context); + + $rpc->callAsync($queue, $message, 2); + } + + public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); $message->setCorrelationId('theCorrelationId'); $message->setReplyTo('theReplyTo'); - $timeout = 123; + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; $rpc = new RpcClient($context); - $promise = $rpc->callAsync($queue, $message, $timeout); + $rpc->callAsync($queue, $message, 2)->receive(); + } + + public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); - $this->assertInstanceOf(Promise::class, $promise); - $this->assertAttributeEquals('theCorrelationId', 'correlationId', $promise); - $this->assertAttributeEquals(123, 'timeout', $promise); - $this->assertAttributeInstanceOf(PsrConsumer::class, 'consumer', $promise); + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receiveNoWait') + ->willReturn($receivedMessage) + ; + $consumer + ->expects($this->once()) + ->method('acknowledge') + ->with($this->identicalTo($receivedMessage)) + ; + $consumer + ->expects($this->never()) + ->method('reject') + ; + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + + $rpc = new RpcClient($context); + + $rpc->callAsync($queue, $message, 2)->receiveNoWait(); } - public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() + public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() { $queue = new NullQueue('aQueue'); $replyQueue = new NullQueue('theReplyTo'); @@ -102,18 +209,75 @@ public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() $message->setCorrelationId('theCorrelationId'); $message->setReplyTo('theReplyTo'); - $producer = $this->createPsrProducerMock(); - $producer + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($queue), $this->identicalTo($message)) + ->method('receive') + ->willReturn($receivedMessage) + ; + + $context = $this->getMockBuilder(PsrContext::class) + ->disableOriginalConstructor() + ->setMethods(['deleteQueue']) + ->getMockForAbstractClass() + ; + + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($this->createPsrProducerMock()) + ; + $context + ->expects($this->once()) + ->method('createQueue') + ->with('theReplyTo') + ->willReturn($replyQueue) + ; + $context + ->expects($this->once()) + ->method('createConsumer') + ->with($this->identicalTo($replyQueue)) + ->willReturn($consumer) + ; + $context + ->expects($this->once()) + ->method('deleteQueue') + ->with($this->identicalTo($replyQueue)) + ; + + $rpc = new RpcClient($context); + + $promise = $rpc->callAsync($queue, $message, 2); + $promise->setDeleteReplyQueue(true); + $promise->receive(); + } + + public function testShouldNotCallDeleteQueueIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() + { + $queue = new NullQueue('aQueue'); + $replyQueue = new NullQueue('theReplyTo'); + $message = new NullMessage(); + $message->setCorrelationId('theCorrelationId'); + $message->setReplyTo('theReplyTo'); + + $receivedMessage = new NullMessage(); + $receivedMessage->setCorrelationId('theCorrelationId'); + + $consumer = $this->createPsrConsumerMock(); + $consumer + ->expects($this->once()) + ->method('receive') + ->willReturn($receivedMessage) ; $context = $this->createPsrContextMock(); $context ->expects($this->once()) ->method('createProducer') - ->willReturn($producer) + ->willReturn($this->createPsrProducerMock()) ; $context ->expects($this->once()) @@ -125,12 +289,15 @@ public function testShouldProduceMessageToQueueAndCreateConsumerForReplyQueue() ->expects($this->once()) ->method('createConsumer') ->with($this->identicalTo($replyQueue)) - ->willReturn($this->createPsrConsumerMock()) + ->willReturn($consumer) ; $rpc = new RpcClient($context); - $rpc->callAsync($queue, $message, 2); + $promise = $rpc->callAsync($queue, $message, 2); + $promise->setDeleteReplyQueue(true); + + $promise->receive(); } public function testShouldDoSyncCall() @@ -143,7 +310,7 @@ public function testShouldDoSyncCall() $promiseMock = $this->createMock(Promise::class); $promiseMock ->expects($this->once()) - ->method('getMessage') + ->method('receive') ->willReturn($replyMessage) ; diff --git a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php index caeaae542..eb0a11b78 100644 --- a/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php +++ b/pkg/stomp/Tests/Functional/StompCommonUseCasesTest.php @@ -66,7 +66,7 @@ public function testReturnNullImmediatelyOnReceiveNoWait() $this->assertNull($message); - $this->assertLessThan(0.5, $endAt - $startAt); + $this->assertLessThan(1, $endAt - $startAt); } public function testProduceAndReceiveOneMessage()