diff --git a/docs/bundle/cli_commands.md b/docs/bundle/cli_commands.md index e6eb497a2..bf1166459 100644 --- a/docs/bundle/cli_commands.md +++ b/docs/bundle/cli_commands.md @@ -20,22 +20,24 @@ Usage: enq:c Arguments: - client-queue-names Queues to consume messages from + client-queue-names Queues to consume messages from Options: - --message-limit=MESSAGE-LIMIT Consume n messages and exit - --time-limit=TIME-LIMIT Consume messages during this time - --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB - --setup-broker Creates queues, topics, exchanges, binding etc on broker side. - -h, --help Display this help message - -q, --quiet Do not output any message - -V, --version Display this application version - --ansi Force ANSI output - --no-ansi Disable ANSI output - -n, --no-interaction Do not ask any interactive question - -e, --env=ENV The environment name [default: "dev"] - --no-debug Switches off debug mode - -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + --message-limit=MESSAGE-LIMIT Consume n messages and exit + --time-limit=TIME-LIMIT Consume messages during this time + --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB + --setup-broker Creates queues, topics, exchanges, binding etc on broker side. + --idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received. + --receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message. + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "test"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: A client's worker that processes messages. By default it connects to default queue. It select an appropriate message processor based on a message headers @@ -143,26 +145,28 @@ Help: ``` ./bin/console enqueue:transport:consume --help -Usage: +Usage:ng mqdev_gearmand_1 ... done enqueue:transport:consume [options] [--] Arguments: - processor-service A message processor service + processor-service A message processor service Options: - --message-limit=MESSAGE-LIMIT Consume n messages and exit - --time-limit=TIME-LIMIT Consume messages during this time - --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB - --queue[=QUEUE] Queues to consume from (multiple values allowed) - -h, --help Display this help message - -q, --quiet Do not output any message - -V, --version Display this application version - --ansi Force ANSI output - --no-ansi Disable ANSI output - -n, --no-interaction Do not ask any interactive question - -e, --env=ENV The environment name [default: "dev"] - --no-debug Switches off debug mode - -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug + --message-limit=MESSAGE-LIMIT Consume n messages and exit + --time-limit=TIME-LIMIT Consume messages during this time + --memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB + --idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received. + --receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message. + --queue[=QUEUE] Queues to consume from (multiple values allowed) + -h, --help Display this help message + -q, --quiet Do not output any message + -V, --version Display this application version + --ansi Force ANSI output + --no-ansi Disable ANSI output + -n, --no-interaction Do not ask any interactive question + -e, --env=ENV The environment name [default: "test"] + --no-debug Switches off debug mode + -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug Help: A worker that consumes message from a broker. To use this broker you have to explicitly set a queue to consume from and a message processor service diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index 6f684ee42..2dd7a888e 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -298,6 +298,13 @@ enqueue: router_processor: enqueue.client.router_processor default_processor_queue: default redelivered_delay_time: 0 + consumption: + + # the time in milliseconds queue consumer waits if no message received + idle_timeout: 0 + + # the time in milliseconds queue consumer waits for a message (100 ms by default) + receive_timeout: 100 job: false async_events: enabled: false @@ -306,6 +313,8 @@ enqueue: doctrine_clear_identity_map_extension: false signal_extension: true reply_extension: true + + ``` [back to index](../index.md) diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index c5c0877f5..92686d12e 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -50,6 +50,18 @@ public function getConfigTreeBuilder() ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() ->end()->end() + ->arrayNode('consumption')->addDefaultsIfNotSet()->children() + ->integerNode('idle_timeout') + ->min(0) + ->defaultValue(0) + ->info('the time in milliseconds queue consumer waits if no message received') + ->end() + ->integerNode('receive_timeout') + ->min(0) + ->defaultValue(100) + ->info('the time in milliseconds queue consumer waits for a message (100 ms by default)') + ->end() + ->end()->end() ->booleanNode('job')->defaultFalse()->end() ->arrayNode('async_events') ->addDefaultsIfNotSet() diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index f138d60fa..7d1221dcd 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -110,6 +110,19 @@ public function load(array $configs, ContainerBuilder $container) } } + // configure queue consumer + $container->getDefinition('enqueue.consumption.queue_consumer') + ->replaceArgument(2, $config['consumption']['idle_timeout']) + ->replaceArgument(3, $config['consumption']['receive_timeout']) + ; + + if ($container->hasDefinition('enqueue.client.queue_consumer')) { + $container->getDefinition('enqueue.client.queue_consumer') + ->replaceArgument(2, $config['consumption']['idle_timeout']) + ->replaceArgument(3, $config['consumption']['receive_timeout']) + ; + } + if ($config['job']) { if (false == class_exists(Job::class)) { throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.'); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index f7bf69ae8..f8a2b0c83 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -85,6 +85,8 @@ services: arguments: - '@enqueue.transport.context' - '@enqueue.consumption.extensions' + - ~ + - ~ enqueue.client.consume_messages_command: class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand' diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml index 66dc0d1a8..1d13655d7 100644 --- a/pkg/enqueue-bundle/Resources/config/services.yml +++ b/pkg/enqueue-bundle/Resources/config/services.yml @@ -10,6 +10,8 @@ services: arguments: - '@enqueue.transport.context' - '@enqueue.consumption.extensions' + - ~ + - ~ enqueue.command.consume_messages: class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand' diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 53e54819d..81be3a44c 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -423,4 +423,42 @@ public function testShouldAllowEnableAsyncEvents() ], ], $config); } + + public function testShouldSetDefaultConfigurationForConsumption() + { + $configuration = new Configuration([]); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => [], + ]]); + + $this->assertArraySubset([ + 'consumption' => [ + 'idle_timeout' => 0, + 'receive_timeout' => 100, + ], + ], $config); + } + + public function testShouldAllowConfigureConsumption() + { + $configuration = new Configuration([]); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => [], + 'consumption' => [ + 'idle_timeout' => 123, + 'receive_timeout' => 456, + ], + ]]); + + $this->assertArraySubset([ + 'consumption' => [ + 'idle_timeout' => 123, + 'receive_timeout' => 456, + ], + ], $config); + } } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index b522ccfad..014e7c3b5 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -535,4 +535,28 @@ public function testShouldNotAddJobQueueEntityMappingIfDoctrineBundleIsNotRegist $this->assertSame([], $container->getExtensionConfig('doctrine')); } + + public function testShouldConfigureQueueConsumer() + { + $container = new ContainerBuilder(); + + $extension = new EnqueueExtension(); + $extension->load([[ + 'client' => [], + 'transport' => [ + ], + 'consumption' => [ + 'idle_timeout' => 123, + 'receive_timeout' => 456, + ], + ]], $container); + + $def = $container->getDefinition('enqueue.consumption.queue_consumer'); + $this->assertSame(123, $def->getArgument(2)); + $this->assertSame(456, $def->getArgument(3)); + + $def = $container->getDefinition('enqueue.client.queue_consumer'); + $this->assertSame(123, $def->getArgument(2)); + $this->assertSame(456, $def->getArgument(3)); + } } diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 8d3f93a00..fe2aafcee 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -63,6 +63,38 @@ public function __construct( $this->boundProcessors = []; } + /** + * @param int $timeout + */ + public function setIdleTimeout($timeout) + { + $this->idleTimeout = (int) $timeout; + } + + /** + * @return int + */ + public function getIdleTimeout() + { + return $this->idleTimeout; + } + + /** + * @param int $timeout + */ + public function setReceiveTimeout($timeout) + { + $this->receiveTimeout = (int) $timeout; + } + + /** + * @return int + */ + public function getReceiveTimeout() + { + return $this->receiveTimeout; + } + /** * @return PsrContext */ diff --git a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php index 692c0d674..2ec98e2b3 100644 --- a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php @@ -9,6 +9,7 @@ use Enqueue\Consumption\Extension\LoggerExtension; use Enqueue\Consumption\QueueConsumer; use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait; +use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -19,6 +20,7 @@ class ConsumeMessagesCommand extends Command { use LimitsExtensionsCommandTrait; use SetupBrokerExtensionCommandTrait; + use QueueConsumerOptionsCommandTrait; /** * @var QueueConsumer @@ -67,6 +69,7 @@ protected function configure() { $this->configureLimitsExtensions(); $this->configureSetupBrokerExtension(); + $this->configureQueueConsumerOptions(); $this ->setName('enqueue:consume') @@ -83,6 +86,8 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { + $this->setQueueConsumerOptions($this->consumer, $input); + $queueMetas = []; if ($clientQueueNames = $input->getArgument('client-queue-names')) { foreach ($clientQueueNames as $clientQueueName) { diff --git a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php index 9de5805aa..c2c7695f1 100644 --- a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php @@ -16,6 +16,7 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface { use ContainerAwareTrait; use LimitsExtensionsCommandTrait; + use QueueConsumerOptionsCommandTrait; /** * @var QueueConsumer @@ -38,6 +39,7 @@ public function __construct(QueueConsumer $consumer) protected function configure() { $this->configureLimitsExtensions(); + $this->configureQueueConsumerOptions(); $this ->setName('enqueue:transport:consume') @@ -51,6 +53,8 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { + $this->setQueueConsumerOptions($this->consumer, $input); + $extensions = $this->getLimitsExtensions($input, $output); array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output))); diff --git a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php index 16020ccd3..297dfa58d 100644 --- a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php @@ -20,6 +20,7 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA { use ContainerAwareTrait; use LimitsExtensionsCommandTrait; + use QueueConsumerOptionsCommandTrait; /** * @var QueueConsumer @@ -44,6 +45,7 @@ public function __construct(QueueConsumer $consumer) protected function configure() { $this->configureLimitsExtensions(); + $this->configureQueueConsumerOptions(); $this ->setName('enqueue:transport:consume') @@ -60,6 +62,8 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { + $this->setQueueConsumerOptions($this->consumer, $input); + /** @var PsrProcessor $processor */ $processor = $this->container->get($input->getArgument('processor-service')); if (false == $processor instanceof PsrProcessor) { diff --git a/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php new file mode 100644 index 000000000..c6ffd985f --- /dev/null +++ b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php @@ -0,0 +1,36 @@ +addOption('idle-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer idle if no message has been received.') + ->addOption('receive-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer waits for a message.') + ; + } + + /** + * @param QueueConsumer $consumer + * @param InputInterface $input + */ + protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input) + { + if (null !== $idleTimeout = $input->getOption('idle-timeout')) { + $consumer->setIdleTimeout((int) $idleTimeout); + } + + if (null !== $receiveTimeout = $input->getOption('receive-timeout')) { + $consumer->setReceiveTimeout((int) $receiveTimeout); + } + } +} diff --git a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php index fa3b304e3..196f75598 100644 --- a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php +++ b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php @@ -108,6 +108,24 @@ public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable() $consumer->bind(new NullQueue(''), new \stdClass()); } + public function testCouldSetGetIdleTimeout() + { + $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); + + $consumer->setIdleTimeout(123456); + + $this->assertSame(123456, $consumer->getIdleTimeout()); + } + + public function testCouldSetGetReceiveTimeout() + { + $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); + + $consumer->setReceiveTimeout(123456); + + $this->assertSame(123456, $consumer->getReceiveTimeout()); + } + public function testShouldAllowBindCallbackToQueueName() { $callback = function () { diff --git a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php index 868c32c8f..b6017abdc 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php @@ -61,11 +61,13 @@ public function testShouldHaveExpectedOptions() $options = $command->getDefinition()->getOptions(); - $this->assertCount(4, $options); + $this->assertCount(6, $options); $this->assertArrayHasKey('memory-limit', $options); $this->assertArrayHasKey('message-limit', $options); $this->assertArrayHasKey('time-limit', $options); $this->assertArrayHasKey('setup-broker', $options); + $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('receive-timeout', $options); } public function testShouldHaveExpectedAttributes() diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php index a08bb5572..d83505a0c 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php @@ -29,10 +29,12 @@ public function testShouldHaveExpectedOptions() $options = $command->getDefinition()->getOptions(); - $this->assertCount(3, $options); + $this->assertCount(5, $options); $this->assertArrayHasKey('memory-limit', $options); $this->assertArrayHasKey('message-limit', $options); $this->assertArrayHasKey('time-limit', $options); + $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('receive-timeout', $options); } public function testShouldHaveExpectedAttributes() diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php index 87fbe559d..b451dc712 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php @@ -33,11 +33,13 @@ public function testShouldHaveExpectedOptions() $options = $command->getDefinition()->getOptions(); - $this->assertCount(4, $options); + $this->assertCount(6, $options); $this->assertArrayHasKey('memory-limit', $options); $this->assertArrayHasKey('message-limit', $options); $this->assertArrayHasKey('time-limit', $options); $this->assertArrayHasKey('queue', $options); + $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('receive-timeout', $options); } public function testShouldHaveExpectedAttributes() diff --git a/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php b/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php new file mode 100644 index 000000000..88a0d8cf2 --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php @@ -0,0 +1,38 @@ +consumer = $consumer; + } + + protected function configure() + { + parent::configure(); + + $this->configureQueueConsumerOptions(); + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $this->setQueueConsumerOptions($this->consumer, $input); + } +} diff --git a/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php new file mode 100644 index 000000000..57106aa24 --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php @@ -0,0 +1,53 @@ +createQueueConsumer()); + + $options = $trait->getDefinition()->getOptions(); + + $this->assertCount(2, $options); + $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('receive-timeout', $options); + } + + public function testShouldSetQueueConsumerOptions() + { + $consumer = $this->createQueueConsumer(); + $consumer + ->expects($this->once()) + ->method('setIdleTimeout') + ->with($this->identicalTo(123)) + ; + $consumer + ->expects($this->once()) + ->method('setReceiveTimeout') + ->with($this->identicalTo(456)) + ; + + $trait = new QueueConsumerOptionsCommand($consumer); + + $tester = new CommandTester($trait); + $tester->execute([ + '--idle-timeout' => '123', + '--receive-timeout' => '456', + ]); + } + + /** + * @return QueueConsumer|\PHPUnit_Framework_MockObject_MockObject|QueueConsumer + */ + private function createQueueConsumer() + { + return $this->createMock(QueueConsumer::class); + } +}