Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fs] add DSN support #82

Merged
merged 2 commits into from
May 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ enqueue:
fs:

# The store directory where all queue\topics files will be created and messages are stored
store_dir: ~ # Required
path: ~ # Required

# The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.
pre_fetch_count: 1
Expand Down
19 changes: 18 additions & 1 deletion docs/transport/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,25 @@ $ composer require enqueue/fs
<?php
use Enqueue\Fs\FsConnectionFactory;

// stores messages in /tmp/enqueue folder
$connectionFactory = new FsConnectionFactory();

// same as above
$connectionFactory = new FsConnectionFactory('file://');

// stores in custom folder
$connectionFactory = new FsConnectionFactory('/path/to/queue/dir');

// same as above
$connectionFactory = new FsConnectionFactory('file://path/to/queue/dir');

// with options
$connectionFactory = new FsConnectionFactory('file://path/to/queue/dir?pre_fetch_count=1');

// as an array
$connectionFactory = new FsConnectionFactory([
'store_dir' => '/tmp'
'path' => '/path/to/queue/dir',
'pre_fetch_count' => 1,
]);

$psrContext = $connectionFactory->createContext();
Expand Down
28 changes: 12 additions & 16 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ class AmqpConnectionFactory implements PsrConnectionFactory
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default credentials.
*
* [
* 'host' => amqp.host The host to connect too. Note: Max 1024 characters.
* 'port' => amqp.port Port on the host.
* 'vhost' => amqp.vhost The virtual host on the host. Note: Max 128 characters.
* 'user' => amqp.user The user name to use. Note: Max 128 characters.
* 'pass' => amqp.password Password. Note: Max 128 characters.
* 'read_timeout' => Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
* 'write_timeout' => Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
* 'connect_timeout' => Connection timeout. Note: 0 or greater seconds. May be fractional.
* 'persisted' => bool, Whether it use single persisted connection or open a new one for every context
* 'lazy' => the connection will be performed as later as possible, if the option set to true
* 'host' => 'amqp.host The host to connect too. Note: Max 1024 characters.',
* 'port' => 'amqp.port Port on the host.',
* 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.',
* 'user' => 'amqp.user The user name to use. Note: Max 128 characters.',
* 'pass' => 'amqp.password Password. Note: Max 128 characters.',
* 'read_timeout' => 'Timeout in for income activity. Note: 0 or greater seconds. May be fractional.',
* 'write_timeout' => 'Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.',
* 'connect_timeout' => 'Connection timeout. Note: 0 or greater seconds. May be fractional.',
* 'persisted' => 'bool, Whether it use single persisted connection or open a new one for every context',
* 'lazy' => 'the connection will be performed as later as possible, if the option set to true',
* ]
*
* or
Expand All @@ -40,13 +40,13 @@ class AmqpConnectionFactory implements PsrConnectionFactory
*/
public function __construct($config = 'amqp://')
{
if (empty($config)) {
if (empty($config) || 'amqp://' === $config) {
$config = [];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be eaither an array of options, a DSN string or null');
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

$this->config = array_replace($this->defaultConfig(), $config);
Expand Down Expand Up @@ -94,10 +94,6 @@ private function establishConnection()
*/
private function parseDsn($dsn)
{
if ('amqp://' == $dsn) {
return [];
}

$dsnConfig = parse_url($dsn);
if (false === $dsnConfig) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
Expand Down
6 changes: 3 additions & 3 deletions pkg/amqp-ext/Symfony/AmqpTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public function addConfiguration(ArrayNodeDefinition $builder)
$builder
->beforeNormalization()
->ifString()
->then(function ($v) {
return ['dsn' => $v];
})
->then(function ($v) {
return ['dsn' => $v];
})
->end()
->children()
->scalarNode('dsn')
Expand Down
30 changes: 16 additions & 14 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
namespace Enqueue\AmqpExt\Tests;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

/**
* The class contains the factory tests dedicated to configuration
* The class contains the factory tests dedicated to configuration.
*/
class AmqpConnectionFactoryConfigTest extends TestCase
{
Expand All @@ -17,7 +16,7 @@ class AmqpConnectionFactoryConfigTest extends TestCase
public function testThrowNeitherArrayStringNorNullGivenAsConfig()
{
$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The config must be eaither an array of options, a DSN string or null');
$this->expectExceptionMessage('The config must be either an array of options, a DSN string or null');

new AmqpConnectionFactory(new \stdClass());
}
Expand All @@ -40,6 +39,9 @@ public function testThrowIfDsnCouldNotBeParsed()

/**
* @dataProvider provideConfigs
*
* @param mixed $config
* @param mixed $expectedConfig
*/
public function testShouldParseConfigurationAsExpected($config, $expectedConfig)
{
Expand All @@ -63,13 +65,13 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
]
],
];

// some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html)

yield [
"amqp://user:pass@host:10000/vhost",
'amqp://user:pass@host:10000/vhost',
[
'host' => 'host',
'port' => 10000,
Expand All @@ -81,11 +83,11 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
]
],
];

yield [
"amqp://user%61:%61pass@ho%61st:10000/v%2fhost",
'amqp://user%61:%61pass@ho%61st:10000/v%2fhost',
[
'host' => 'hoast',
'port' => 10000,
Expand All @@ -97,11 +99,11 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
]
],
];

yield [
"amqp://",
'amqp://',
[
'host' => 'localhost',
'port' => 5672,
Expand All @@ -113,11 +115,11 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
]
],
];

yield [
"amqp://user:pass@host:10000/vhost?connect_timeout=2&lazy=",
'amqp://user:pass@host:10000/vhost?connect_timeout=2&lazy=',
[
'host' => 'host',
'port' => 10000,
Expand All @@ -129,7 +131,7 @@ public static function provideConfigs()
'connect_timeout' => '2',
'persisted' => false,
'lazy' => '',
]
],
];

yield [
Expand All @@ -145,7 +147,7 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
]
],
];

yield [
Expand All @@ -161,7 +163,7 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => false,
]
],
];
}
}
9 changes: 8 additions & 1 deletion pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ public function provideEnqueueConfigs()
'transport' => [
'default' => 'fs',
'fs' => [
'store_dir' => sys_get_temp_dir(),
'path' => sys_get_temp_dir(),
],
],
]];

yield 'fs_dsn' => [[
'transport' => [
'default' => 'fs',
'fs' => 'file:/'.sys_get_temp_dir(),
],
]];

yield 'dbal' => [[
'transport' => [
'default' => 'dbal',
Expand Down
84 changes: 76 additions & 8 deletions pkg/fs/FsConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,33 @@ class FsConnectionFactory implements PsrConnectionFactory
private $config;

/**
* @param array $config
* The config could be an array, string DSN or null. In case of null it will attempt to store files in /tmp/enqueue folder.
*
* [
* 'path' => 'the directory where all queue\topic files remain. For example /home/foo/enqueue',
* 'pre_fetch_count' => 'Integer. Defines how many messages to fetch from the file.',
* 'chmod' => 'Defines a mode the files are created with',
* ]
*
* or
*
* file://home/foo/enqueue
* file://home/foo/enqueue?pre_fetch_count=20&chmod=0777
*
* @param array|string|null $config
*/
public function __construct(array $config)
public function __construct($config = 'file://')
{
$this->config = array_replace([
'store_dir' => null,
'pre_fetch_count' => 1,
'chmod' => 0600,
], $config);
if (empty($config) || 'file://' === $config) {
$config = ['path' => sys_get_temp_dir().'/enqueue'];
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

$this->config = array_replace($this->defaultConfig(), $config);
}

/**
Expand All @@ -30,6 +48,56 @@ public function __construct(array $config)
*/
public function createContext()
{
return new FsContext($this->config['store_dir'], $this->config['pre_fetch_count'], $this->config['chmod']);
return new FsContext($this->config['path'], $this->config['pre_fetch_count'], $this->config['chmod']);
}

/**
* @param string $dsn
*
* @return array
*/
private function parseDsn($dsn)
{
if ($dsn && '/' === $dsn[0]) {
return ['path' => $dsn];
}

$scheme = parse_url($dsn, PHP_URL_SCHEME);
$path = parse_url($dsn, PHP_URL_PATH);
$host = parse_url($dsn, PHP_URL_HOST);
$query = parse_url($dsn, PHP_URL_QUERY);
if (false === $scheme) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

if ('file' !== $scheme) {
throw new \LogicException('The given DSN scheme "%s" is not supported. Could be "file" only.');
}

if ($query) {
$config = [];
parse_str($query, $config);
}

if (isset($config['pre_fetch_count'])) {
$config['pre_fetch_count'] = (int) $config['pre_fetch_count'];
}

if (isset($config['chmod'])) {
$config['chmod'] = intval($config['chmod'], 8);
}

$config['path'] = sprintf('/%s%s', $host, $path);

return $config;
}

private function defaultConfig()
{
return [
'path' => null,
'pre_fetch_count' => 1,
'chmod' => 0600,
];
}
}
14 changes: 11 additions & 3 deletions pkg/fs/Symfony/FsTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ public function __construct($name = 'fs')
public function addConfiguration(ArrayNodeDefinition $builder)
{
$builder
->beforeNormalization()
->ifString()
->then(function ($v) {
return ['dsn' => $v];
})
->end()
->children()
->scalarNode('store_dir')
->isRequired()
->scalarNode('dsn')
->info('The path to a directory where to store messages given as DSN. For example file://tmp/foo')
->end()
->scalarNode('path')
->cannotBeEmpty()
->info('The store directory where all queue\topics files will be created and messages are stored')
->end()
Expand All @@ -56,7 +64,7 @@ public function addConfiguration(ArrayNodeDefinition $builder)
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factory = new Definition(FsConnectionFactory::class);
$factory->setArguments([$config]);
$factory->setArguments(isset($config['dsn']) ? [$config['dsn']] : [$config]);

$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
$container->setDefinition($factoryId, $factory);
Expand Down
Loading