Skip to content

Commit

Permalink
Merge pull request #193 from php-enqueue/queue-consumer-options
Browse files Browse the repository at this point in the history
Queue Consumer Options
  • Loading branch information
makasim authored Sep 11, 2017
2 parents c5cb914 + 7b3af54 commit 1e1ee8e
Show file tree
Hide file tree
Showing 19 changed files with 332 additions and 32 deletions.
62 changes: 33 additions & 29 deletions docs/bundle/cli_commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ Usage:
enq:c
Arguments:
client-queue-names Queues to consume messages from
client-queue-names Queues to consume messages from
Options:
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "dev"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "test"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Help:
A client's worker that processes messages. By default it connects to default queue. It select an appropriate message processor based on a message headers
Expand Down Expand Up @@ -143,26 +145,28 @@ Help:

```
./bin/console enqueue:transport:consume --help
Usage:
Usage:ng mqdev_gearmand_1 ... done
enqueue:transport:consume [options] [--] <processor-service>
Arguments:
processor-service A message processor service
processor-service A message processor service
Options:
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--queue[=QUEUE] Queues to consume from (multiple values allowed)
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "dev"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
--queue[=QUEUE] Queues to consume from (multiple values allowed)
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "test"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Help:
A worker that consumes message from a broker. To use this broker you have to explicitly set a queue to consume from and a message processor service
Expand Down
9 changes: 9 additions & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ enqueue:
router_processor: enqueue.client.router_processor
default_processor_queue: default
redelivered_delay_time: 0
consumption:

# the time in milliseconds queue consumer waits if no message received
idle_timeout: 0

# the time in milliseconds queue consumer waits for a message (100 ms by default)
receive_timeout: 100
job: false
async_events:
enabled: false
Expand All @@ -306,6 +313,8 @@ enqueue:
doctrine_clear_identity_map_extension: false
signal_extension: true
reply_extension: true


```

[back to index](../index.md)
12 changes: 12 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public function getConfigTreeBuilder()
->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end()
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
->end()->end()
->arrayNode('consumption')->addDefaultsIfNotSet()->children()
->integerNode('idle_timeout')
->min(0)
->defaultValue(0)
->info('the time in milliseconds queue consumer waits if no message received')
->end()
->integerNode('receive_timeout')
->min(0)
->defaultValue(100)
->info('the time in milliseconds queue consumer waits for a message (100 ms by default)')
->end()
->end()->end()
->booleanNode('job')->defaultFalse()->end()
->arrayNode('async_events')
->addDefaultsIfNotSet()
Expand Down
13 changes: 13 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ public function load(array $configs, ContainerBuilder $container)
}
}

// configure queue consumer
$container->getDefinition('enqueue.consumption.queue_consumer')
->replaceArgument(2, $config['consumption']['idle_timeout'])
->replaceArgument(3, $config['consumption']['receive_timeout'])
;

if ($container->hasDefinition('enqueue.client.queue_consumer')) {
$container->getDefinition('enqueue.client.queue_consumer')
->replaceArgument(2, $config['consumption']['idle_timeout'])
->replaceArgument(3, $config['consumption']['receive_timeout'])
;
}

if ($config['job']) {
if (false == class_exists(Job::class)) {
throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ services:
arguments:
- '@enqueue.transport.context'
- '@enqueue.consumption.extensions'
- ~
- ~

enqueue.client.consume_messages_command:
class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand'
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
arguments:
- '@enqueue.transport.context'
- '@enqueue.consumption.extensions'
- ~
- ~

enqueue.command.consume_messages:
class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,42 @@ public function testShouldAllowEnableAsyncEvents()
],
], $config);
}

public function testShouldSetDefaultConfigurationForConsumption()
{
$configuration = new Configuration([]);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => [],
]]);

$this->assertArraySubset([
'consumption' => [
'idle_timeout' => 0,
'receive_timeout' => 100,
],
], $config);
}

public function testShouldAllowConfigureConsumption()
{
$configuration = new Configuration([]);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => [],
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
]]);

$this->assertArraySubset([
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
], $config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,28 @@ public function testShouldNotAddJobQueueEntityMappingIfDoctrineBundleIsNotRegist

$this->assertSame([], $container->getExtensionConfig('doctrine'));
}

public function testShouldConfigureQueueConsumer()
{
$container = new ContainerBuilder();

$extension = new EnqueueExtension();
$extension->load([[
'client' => [],
'transport' => [
],
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
]], $container);

$def = $container->getDefinition('enqueue.consumption.queue_consumer');
$this->assertSame(123, $def->getArgument(2));
$this->assertSame(456, $def->getArgument(3));

$def = $container->getDefinition('enqueue.client.queue_consumer');
$this->assertSame(123, $def->getArgument(2));
$this->assertSame(456, $def->getArgument(3));
}
}
32 changes: 32 additions & 0 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,38 @@ public function __construct(
$this->boundProcessors = [];
}

/**
* @param int $timeout
*/
public function setIdleTimeout($timeout)
{
$this->idleTimeout = (int) $timeout;
}

/**
* @return int
*/
public function getIdleTimeout()
{
return $this->idleTimeout;
}

/**
* @param int $timeout
*/
public function setReceiveTimeout($timeout)
{
$this->receiveTimeout = (int) $timeout;
}

/**
* @return int
*/
public function getReceiveTimeout()
{
return $this->receiveTimeout;
}

/**
* @return PsrContext
*/
Expand Down
5 changes: 5 additions & 0 deletions pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait;
use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -19,6 +20,7 @@ class ConsumeMessagesCommand extends Command
{
use LimitsExtensionsCommandTrait;
use SetupBrokerExtensionCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand Down Expand Up @@ -67,6 +69,7 @@ protected function configure()
{
$this->configureLimitsExtensions();
$this->configureSetupBrokerExtension();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:consume')
Expand All @@ -83,6 +86,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

$queueMetas = [];
if ($clientQueueNames = $input->getArgument('client-queue-names')) {
foreach ($clientQueueNames as $clientQueueName) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
{
use ContainerAwareTrait;
use LimitsExtensionsCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand All @@ -38,6 +39,7 @@ public function __construct(QueueConsumer $consumer)
protected function configure()
{
$this->configureLimitsExtensions();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:transport:consume')
Expand All @@ -51,6 +53,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

$extensions = $this->getLimitsExtensions($input, $output);
array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA
{
use ContainerAwareTrait;
use LimitsExtensionsCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand All @@ -44,6 +45,7 @@ public function __construct(QueueConsumer $consumer)
protected function configure()
{
$this->configureLimitsExtensions();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:transport:consume')
Expand All @@ -60,6 +62,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

/** @var PsrProcessor $processor */
$processor = $this->container->get($input->getArgument('processor-service'));
if (false == $processor instanceof PsrProcessor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Enqueue\Symfony\Consumption;

use Enqueue\Consumption\QueueConsumer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;

trait QueueConsumerOptionsCommandTrait
{
/**
* {@inheritdoc}
*/
protected function configureQueueConsumerOptions()
{
$this
->addOption('idle-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer idle if no message has been received.')
->addOption('receive-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer waits for a message.')
;
}

/**
* @param QueueConsumer $consumer
* @param InputInterface $input
*/
protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input)
{
if (null !== $idleTimeout = $input->getOption('idle-timeout')) {
$consumer->setIdleTimeout((int) $idleTimeout);
}

if (null !== $receiveTimeout = $input->getOption('receive-timeout')) {
$consumer->setReceiveTimeout((int) $receiveTimeout);
}
}
}
Loading

0 comments on commit 1e1ee8e

Please sign in to comment.