Skip to content

Commit

Permalink
Merge pull request #166 from clue-labs/callasync
Browse files Browse the repository at this point in the history
Add new public `callAsync()` method
  • Loading branch information
clue authored Dec 28, 2024
2 parents 41e54b7 + e89ae33 commit 652d56e
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 63 deletions.
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ It enables you to set and query its data or use its PubSub topics to react to in
* [RedisClient](#redisclient)
* [__construct()](#__construct)
* [__call()](#__call)
* [callAsync()](#callasync)
* [end()](#end)
* [close()](#close)
* [error event](#error-event)
Expand Down Expand Up @@ -124,7 +125,8 @@ Each method call matches the respective [Redis command](https://redis.io/command
For example, the `$redis->get()` method will invoke the [`GET` command](https://redis.io/commands/get).

All [Redis commands](https://redis.io/commands) are automatically available as
public methods via the magic [`__call()` method](#__call).
public methods via the magic [`__call()` method](#__call) or through the more
explicit [`callAsync()` method].
Listing all available commands is out of scope here, please refer to the
[Redis command reference](https://redis.io/commands).

Expand Down Expand Up @@ -432,6 +434,8 @@ $redis->get($key)->then(function (?string $value) {

All [Redis commands](https://redis.io/commands) are automatically available as
public methods via this magic `__call()` method.
Note that some static analysis tools may not understand this magic method, so
you may also the [`callAsync()` method](#callasync) as a more explicit alternative.
Listing all available commands is out of scope here, please refer to the
[Redis command reference](https://redis.io/commands).

Expand All @@ -445,6 +449,43 @@ Each of these commands supports async operation and returns a [Promise](#promise
that eventually *fulfills* with its *results* on success or *rejects* with an
`Exception` on error. See also [promises](#promises) for more details.

#### callAsync()

The `callAsync(string $command, string ...$args): PromiseInterface<mixed>` method can be used to
invoke a Redis command.

```php
$redis->callAsync('GET', 'name')->then(function (?string $name): void {
echo 'Name: ' . ($name ?? 'Unknown') . PHP_EOL;
}, function (Throwable $e): void {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

The `string $command` parameter can be any valid Redis command. All
[Redis commands](https://redis.io/commands/) are available through this
method. As an alternative, you may also use the magic
[`__call()` method](#__call), but note that not all static analysis tools
may understand this magic method. Listing all available commands is out
of scope here, please refer to the
[Redis command reference](https://redis.io/commands).

The optional `string ...$args` parameter can be used to pass any
additional arguments to the Redis command. Some commands may require or
support additional arguments that this method will simply forward as is.
Internally, Redis requires all arguments to be coerced to `string` values,
but you may also rely on PHP's type-juggling semantics and pass `int` or
`float` values:

```php
$redis->callAsync('SET', 'name', 'Alice', 'EX', 600);
```

This method supports async operation and returns a [Promise](#promises)
that eventually *fulfills* with its *results* on success or *rejects*
with an `Exception` on error. See also [promises](#promises) for more
details.

#### end()

The `end():void` method can be used to
Expand Down
16 changes: 8 additions & 8 deletions examples/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
return;
}

$params = explode(' ', $line);
$method = array_shift($params);

assert(is_callable([$redis, $method]));
$promise = $redis->$method(...$params);
$args = explode(' ', $line);
$command = strtolower(array_shift($args));

// special method such as end() / close() called
if (!$promise instanceof React\Promise\PromiseInterface) {
if (in_array($command, ['end', 'close'])) {
$redis->$command();
return;
}

$promise->then(function ($data) {
$promise = $redis->callAsync($command, ...$args);

$promise->then(function ($data): void {
echo '# reply: ' . json_encode($data) . PHP_EOL;
}, function ($e) {
}, function (Throwable $e): void {
echo '# error reply: ' . $e->getMessage() . PHP_EOL;
});
});
Expand Down
1 change: 0 additions & 1 deletion phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ parameters:
ignoreErrors:
# ignore undefined methods due to magic `__call()` method
- '/^Call to an undefined method Clue\\React\\Redis\\RedisClient::.+\(\)\.$/'
- '/^Call to an undefined method Clue\\React\\Redis\\Io\\StreamingClient::.+\(\)\.$/'
10 changes: 6 additions & 4 deletions src/Io/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ public function createClient(string $uri): PromiseInterface
// use `?password=secret` query or `user:secret@host` password form URL
if (isset($args['password']) || isset($parts['pass'])) {
$pass = $args['password'] ?? rawurldecode($parts['pass']); // @phpstan-ignore-line
\assert(\is_string($pass));
$promise = $promise->then(function (StreamingClient $redis) use ($pass, $uri) {
return $redis->auth($pass)->then(
return $redis->callAsync('auth', $pass)->then(
function () use ($redis) {
return $redis;
},
function (\Exception $e) use ($redis, $uri) {
function (\Throwable $e) use ($redis, $uri) {
$redis->close();

$const = '';
Expand All @@ -124,12 +125,13 @@ function (\Exception $e) use ($redis, $uri) {
// use `?db=1` query or `/1` path (skip first slash)
if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) {
$db = $args['db'] ?? substr($parts['path'], 1); // @phpstan-ignore-line
\assert(\is_string($db));
$promise = $promise->then(function (StreamingClient $redis) use ($db, $uri) {
return $redis->select($db)->then(
return $redis->callAsync('select', $db)->then(
function () use ($redis) {
return $redis;
},
function (\Exception $e) use ($redis, $uri) {
function (\Throwable $e) use ($redis, $uri) {
$redis->close();

$const = '';
Expand Down
13 changes: 6 additions & 7 deletions src/Io/StreamingClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars
}

/**
* @param string[] $args
* @return PromiseInterface<mixed>
*/
public function __call(string $name, array $args): PromiseInterface
public function callAsync(string $command, string ...$args): PromiseInterface
{
$request = new Deferred();
$promise = $request->promise();

$name = strtolower($name);
$command = strtolower($command);

// special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied
static $pubsubs = ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'];
Expand All @@ -90,22 +89,22 @@ public function __call(string $name, array $args): PromiseInterface
'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)',
defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107
));
} elseif (count($args) !== 1 && in_array($name, $pubsubs)) {
} elseif (count($args) !== 1 && in_array($command, $pubsubs)) {
$request->reject(new \InvalidArgumentException(
'PubSub commands limited to single argument (EINVAL)',
defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22
));
} elseif ($name === 'monitor') {
} elseif ($command === 'monitor') {
$request->reject(new \BadMethodCallException(
'MONITOR command explicitly not supported (ENOTSUP)',
defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95)
));
} else {
$this->stream->write($this->serializer->getRequestMessage($name, $args));
$this->stream->write($this->serializer->getRequestMessage($command, $args));
$this->requests []= $request;
}

if (in_array($name, $pubsubs)) {
if (in_array($command, $pubsubs)) {
$promise->then(function (array $array) {
$first = array_shift($array);

Expand Down
63 changes: 56 additions & 7 deletions src/RedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,65 @@ private function client(): PromiseInterface
}

/**
* Invoke the given command and return a Promise that will be resolved when the request has been replied to
* Invoke the given command and return a Promise that will be resolved when the command has been replied to
*
* This is a magic method that will be invoked when calling any redis
* command on this instance.
* command on this instance. See also `RedisClient::callAsync()`.
*
* @param string $name
* @param string[] $args
* @return PromiseInterface<mixed>
* @see self::callAsync()
*/
public function __call(string $name, array $args): PromiseInterface
{
return $this->callAsync($name, ...$args);
}

/**
* Invoke a Redis command.
*
* For example, the [`GET` command](https://redis.io/commands/get) can be invoked
* like this:
*
* ```php
* $redis->callAsync('GET', 'name')->then(function (?string $name): void {
* echo 'Name: ' . ($name ?? 'Unknown') . PHP_EOL;
* }, function (Throwable $e): void {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
* The `string $command` parameter can be any valid Redis command. All
* [Redis commands](https://redis.io/commands/) are available through this
* method. As an alternative, you may also use the magic
* [`__call()` method](#__call), but note that not all static analysis tools
* may understand this magic method. Listing all available commands is out
* of scope here, please refer to the
* [Redis command reference](https://redis.io/commands).
*
* The optional `string ...$args` parameter can be used to pass any
* additional arguments to the Redis command. Some commands may require or
* support additional arguments that this method will simply forward as is.
* Internally, Redis requires all arguments to be coerced to `string` values,
* but you may also rely on PHP's type-juggling semantics and pass `int` or
* `float` values:
*
* ```php
* $redis->callAsync('SET', 'name', 'Alice', 'EX', 600);
* ```
*
* This method supports async operation and returns a [Promise](#promises)
* that eventually *fulfills* with its *results* on success or *rejects*
* with an `Exception` on error. See also [promises](#promises) for more
* details.
*
* @param string $command
* @param string ...$args
* @return PromiseInterface<mixed>
* @throws void
*/
public function callAsync(string $command, string ...$args): PromiseInterface
{
if ($this->closed) {
return reject(new \RuntimeException(
Expand All @@ -178,17 +227,17 @@ public function __call(string $name, array $args): PromiseInterface
));
}

return $this->client()->then(function (StreamingClient $redis) use ($name, $args) {
return $this->client()->then(function (StreamingClient $redis) use ($command, $args): PromiseInterface {
$this->awake();
assert(\is_callable([$redis, $name])); // @phpstan-ignore-next-line
return \call_user_func_array([$redis, $name], $args)->then(
return $redis->callAsync($command, ...$args)->then(
function ($result) {
$this->idle();
return $result;
},
function (\Exception $error) {
function (\Throwable $e) {
\assert($e instanceof \Exception);
$this->idle();
throw $error;
throw $e;
}
);
});
Expand Down
28 changes: 14 additions & 14 deletions tests/Io/StreamingClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function testSending(): void
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));

$this->redis->ping();
$this->redis->callAsync('ping');
}

public function testClosingClientEmitsEvent(): void
Expand Down Expand Up @@ -121,7 +121,7 @@ public function testPingPong(): void
{
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'));

$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');

$this->redis->handleMessage(new BulkReply('PONG'));

Expand All @@ -131,7 +131,7 @@ public function testPingPong(): void

public function testMonitorCommandIsNotSupported(): void
{
$promise = $this->redis->monitor();
$promise = $this->redis->callAsync('monitor');

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand All @@ -148,7 +148,7 @@ public function testMonitorCommandIsNotSupported(): void

public function testErrorReply(): void
{
$promise = $this->redis->invalid();
$promise = $this->redis->callAsync('invalid');

$err = new ErrorReply("ERR unknown command 'invalid'");
$this->redis->handleMessage($err);
Expand All @@ -158,7 +158,7 @@ public function testErrorReply(): void

public function testClosingClientRejectsAllRemainingRequests(): void
{
$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');
$this->redis->close();

$promise->then(null, $this->expectCallableOnceWith(
Expand All @@ -183,7 +183,7 @@ public function testClosingStreamRejectsAllRemainingRequests(): void
assert($this->serializer instanceof SerializerInterface);
$this->redis = new StreamingClient($stream, $this->parser, $this->serializer);

$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');
$stream->close();

$promise->then(null, $this->expectCallableOnceWith(
Expand All @@ -201,9 +201,9 @@ public function testClosingStreamRejectsAllRemainingRequests(): void

public function testEndingClientRejectsAllNewRequests(): void
{
$this->redis->ping();
$this->redis->callAsync('ping');
$this->redis->end();
$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand All @@ -221,7 +221,7 @@ public function testEndingClientRejectsAllNewRequests(): void
public function testClosedClientRejectsAllNewRequests(): void
{
$this->redis->close();
$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand Down Expand Up @@ -250,7 +250,7 @@ public function testEndingBusyClosesClientWhenNotBusyAnymore(): void
++$closed;
});

$promise = $this->redis->ping();
$promise = $this->redis->callAsync('ping');
$this->assertEquals(0, $closed);

$this->redis->end();
Expand All @@ -277,7 +277,7 @@ public function testReceivingUnexpectedMessageThrowsException(): void

public function testPubsubSubscribe(): StreamingClient
{
$promise = $this->redis->subscribe('test');
$promise = $this->redis->callAsync('subscribe', 'test');
$this->expectPromiseResolve($promise);

$this->redis->on('subscribe', $this->expectCallableOnce());
Expand All @@ -291,7 +291,7 @@ public function testPubsubSubscribe(): StreamingClient
*/
public function testPubsubPatternSubscribe(StreamingClient $client): StreamingClient
{
$promise = $client->psubscribe('demo_*');
$promise = $client->callAsync('psubscribe', 'demo_*');
$this->expectPromiseResolve($promise);

$client->on('psubscribe', $this->expectCallableOnce());
Expand All @@ -311,7 +311,7 @@ public function testPubsubMessage(StreamingClient $client): void

public function testSubscribeWithMultipleArgumentsRejects(): void
{
$promise = $this->redis->subscribe('a', 'b');
$promise = $this->redis->callAsync('subscribe', 'a', 'b');

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand All @@ -328,7 +328,7 @@ public function testSubscribeWithMultipleArgumentsRejects(): void

public function testUnsubscribeWithoutArgumentsRejects(): void
{
$promise = $this->redis->unsubscribe();
$promise = $this->redis->callAsync('unsubscribe');

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand Down
Loading

0 comments on commit 652d56e

Please sign in to comment.