Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add any() helper to await first successful fulfillment of operations #15

Merged
merged 1 commit into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 117 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ much any API that already uses Promises.

* [Quickstart example](#quickstart-example)
* [Usage](#usage)
* [Transformer](#transformer)
* [Promises](#promises)
* [Timeout](#timeout)
* [Streaming](#streaming)
* [all()](#all)
* [Transformer](#transformer)
* [Promises](#promises)
* [Timeout](#timeout)
* [Streaming](#streaming)
* [all()](#all)
* [any()](#any)
* [Install](#install)
* [Tests](#tests)
* [License](#license)
Expand Down Expand Up @@ -510,7 +511,7 @@ operation and returns a Promise as a placeholder for its future result.
The fulfillment value for each job will be ignored, so for best
performance it's recommended to not return any excessive data structures.
If the given argument is not a valid callable, this method will reject
with an `InvalidArgumentExceptionn` without processing any jobs.
with an `InvalidArgumentException` without processing any jobs.

```php
// using a Closure as handler is usually recommended
Expand All @@ -532,6 +533,116 @@ under the hood. If your input data is small enough to fit into memory
[clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
all operations in memory without using a streaming approach.

#### any()

The static `any(ReadableStreamInterface $input, int $concurrency, callable $handler): PromiseInterface<mixed,Exception>` method can be used to
concurrently process some jobs from the input stream through the given `$handler`.

This is a convenience method which uses the `Transformer` internally to
schedule the jobs from the input stream while limiting concurrency to
ensure no more than `$concurrency` jobs ever run at once. It will return
a promise which resolves with the first successful resolution value on
success.

```php
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
return $browser->post($url, [], json_encode($data));
});

$promise->then(function (ResponseInterface $response) {
echo 'First successful job: ' . $response->getBody() . PHP_EOL;
});
```

If the first job succeeds, it will resolve the resulting promise with its
resolution value, `close()` the input stream and will try to cancel all
other outstanding jobs.

If either of the jobs fails, it will stay in a pending state and will
wait for one of the other jobs to succeed. If all jobs fail, it will
reject the resulting promise. Calling `cancel()` on the pending promise
will `close()` the input stream and will try to cancel all outstanding
jobs. Similarly, if the `$input` stream emits an `error` event, it will
reject the resulting promise and will try to cancel all outstanding jobs.

The `$input` parameter must be a `ReadableStreamInterface` which emits
one `data` event for each job to process. Each element will be passed to
the `$handler` to start one job. The fulfillment value for the first
successful job will be used to fulfill the resulting promise. When the
stream emits an `end` or `close` event, this method will wait for all
outstanding jobs to complete and then resolve or reject accordingly. If
this stream is already closed or does not emit any `data` events, this
method will reject with an `UnderflowException` without processing any
jobs.

```php
$input = new ThroughStream();

$promise = Transformer::any($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();
```

Because streams are one of the core abstractions of ReactPHP, a large number
of stream implementations are available for many different use cases. For
example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
large lists of structured input data. See also [streaming](#streaming) for
more details.

The `$concurrency` parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a `1` value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
`InvalidArgumentException` without processing any jobs.

```php
// handle up to 10 jobs concurrently
$promise = Transformer::any($stream, 10, $handler);
```

```php
// handle each job after another without concurrency (waterfall)
$promise = Transformer::any($stream, 1, $handler);
```

The `$handler` parameter must be a valid callable that accepts your job
parameter (the data from the `$input` stream), invokes the appropriate
operation and returns a Promise as a placeholder for its future result.
The fulfillment value for the first successful job will be used to
fulfill the resulting promise. If the given argument is not a valid
callable, this method will reject with an `InvalidArgumentException`
without processing any jobs.

```php
// using a Closure as handler is usually recommended
$promise = Transformer::any($stream, 10, function ($url) use ($browser) {
return $browser->get($url);
});
```

```php
// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::any($stream, 10, array($browser, 'get'));
```

Note that this method returns a promise that resolves with the first
successful resolution value only if any operation succeeds. This is
mostly a convenience method that uses the [`Transformer`](#transformer)
under the hood. If your input data is small enough to fit into memory
(a few dozens or hundreds of operations), you may want to use
[clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
all operations in memory without using a streaming approach.

## Install

The recommended way to install this library is [through Composer](https://getcomposer.org).
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"react/stream": "^1.0 || ^0.7.7"
},
"require-dev": {
"clue/buzz-react": "^2.3",
"clue/buzz-react": "^2.4",
"clue/ndjson-react": "^1.0",
"phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35"
}
Expand Down
56 changes: 56 additions & 0 deletions examples/03-transform-any.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$concurrency = isset($argv[1]) ? $argv[1] : 3;
$url = isset($argv[2]) ? $argv[2] : 'http://httpbin.org/post';

// load a huge number of users to process from NDJSON file
$input = new Clue\React\NDJson\Decoder(
new React\Stream\ReadableResourceStream(
fopen(__DIR__ . '/users.ndjson', 'r'),
$loop
),
true
);

// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) use ($user) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
throw new RuntimeException('Unexpected response');
}

// demo result includes full user from NDJSON with additional properties
$user['result'] = $body;
return $user;
});
});

$promise->then(
function ($user) {
echo 'Successfully processed user record:' . print_r($user, true) . PHP_EOL;
},
function (Exception $e) {
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

$loop->run();
171 changes: 170 additions & 1 deletion src/Transformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
* The fulfillment value for each job will be ignored, so for best
* performance it's recommended to not return any excessive data structures.
* If the given argument is not a valid callable, this method will reject
* with an `InvalidArgumentExceptionn` without processing any jobs.
* with an `InvalidArgumentException` without processing any jobs.
*
* ```php
* // using a Closure as handler is usually recommended
Expand Down Expand Up @@ -453,6 +453,175 @@ public static function all(ReadableStreamInterface $input, $concurrency, $callba
return $deferred->promise();
}

/**
* Concurrently process some jobs from the input stream through the given `$handler`.
*
* This is a convenience method which uses the `Transformer` internally to
* schedule the jobs from the input stream while limiting concurrency to
* ensure no more than `$concurrency` jobs ever run at once. It will return
* a promise which resolves with the first successful resolution value on
* success.
*
* ```php
* $loop = React\EventLoop\Factory::create();
* $browser = new Clue\React\Buzz\Browser($loop);
*
* $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
* return $browser->post($url, [], json_encode($data));
* });
*
* $promise->then(function (ResponseInterface $response) {
* echo 'First successful job: ' . $response->getBody() . PHP_EOL;
* });
* ```
*
* If the first job succeeds, it will resolve the resulting promise with its
* resolution value, `close()` the input stream and will try to cancel all
* other outstanding jobs.
*
* If either of the jobs fails, it will stay in a pending state and will
* wait for one of the other jobs to succeed. If all jobs fail, it will
* reject the resulting promise. Calling `cancel()` on the pending promise
* will `close()` the input stream and will try to cancel all outstanding
* jobs. Similarly, if the `$input` stream emits an `error` event, it will
* reject the resulting promise and will try to cancel all outstanding jobs.
*
* The `$input` parameter must be a `ReadableStreamInterface` which emits
* one `data` event for each job to process. Each element will be passed to
* the `$handler` to start one job. The fulfillment value for the first
* successful job will be used to fulfill the resulting promise. When the
* stream emits an `end` or `close` event, this method will wait for all
* outstanding jobs to complete and then resolve or reject accordingly. If
* this stream is already closed or does not emit any `data` events, this
* method will reject with an `UnderflowException` without processing any
* jobs.
*
* ```php
* $input = new ThroughStream();
*
* $promise = Transformer::any($input, 2, $handler);
*
* $input->write('a');
* $input->write('b');
* $input->write('c');
* $input->end();
* ```
*
* Because streams are one of the core abstractions of ReactPHP, a large number
* of stream implementations are available for many different use cases. For
* example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
* or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
* large lists of structured input data. See also [streaming](#streaming) for
* more details.
*
* The `$concurrency` parameter sets a new soft limit for the maximum number
* of jobs to handle concurrently. Finding a good concurrency limit depends
* on your particular use case. It's common to limit concurrency to a rather
* small value, as doing more than a dozen of things at once may easily
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
* are processed one after another, effectively creating a "waterfall" of
* jobs. Using a value less than 1 will reject with an
* `InvalidArgumentException` without processing any jobs.
*
* ```php
* // handle up to 10 jobs concurrently
* $promise = Transformer::any($stream, 10, $handler);
* ```
*
* ```php
* // handle each job after another without concurrency (waterfall)
* $promise = Transformer::any($stream, 1, $handler);
* ```
*
* The `$handler` parameter must be a valid callable that accepts your job
* parameter (the data from the `$input` stream), invokes the appropriate
* operation and returns a Promise as a placeholder for its future result.
* The fulfillment value for the first successful job will be used to
* fulfill the resulting promise. If the given argument is not a valid
* callable, this method will reject with an `InvalidArgumentException`
* without processing any jobs.
*
* ```php
* // using a Closure as handler is usually recommended
* $promise = Transformer::any($stream, 10, function ($url) use ($browser) {
* return $browser->get($url);
* });
* ```
*
* ```php
* // accepts any callable, so PHP's array notation is also supported
* $promise = Transformer::any($stream, 10, array($browser, 'get'));
* ```
*
* Note that this method returns a promise that resolves with the first
* successful resolution value only if any operation succeeds. This is
* mostly a convenience method that uses the [`Transformer`](#transformer)
* under the hood. If your input data is small enough to fit into memory
* (a few dozens or hundreds of operations), you may want to use
* [clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
* all operations in memory without using a streaming approach.
*
* @param ReadableStreamInterface $input
* @param int $concurrency
* @param callable $callback
* @return PromiseInterface Returns a Promise<mixed,Exception>
*/
public static function any(ReadableStreamInterface $input, $concurrency, $callback)
{
if (!$input->isReadable()) {
return Promise\reject(new \UnderflowException('Input stream already closed'));
}

$ignore = new \stdClass();
if (is_callable($callback)) {
$callback = function ($data) use ($callback, $ignore) {
return $callback($data)->then(null, function ($e) use ($ignore) {
// operation failed => ignore by returning ignore marker
return $ignore;
});
};
}

try {
$stream = new self($concurrency, $callback);
} catch (\InvalidArgumentException $e) {
return Promise\reject($e);
}

$deferred = new Deferred(function ($_, $reject) use ($input, $stream) {
$reject(new \RuntimeException('Transformer cancelled'));
$input->close();
$stream->close();
});

// forward input data through transformer until input stream ends/closes
$input->pipe($stream);
$input->on('close', array($stream, 'end'));

// resolve promise when first successful transformation completes
$stream->on('data', function ($result) use ($ignore, $deferred, $input, $stream) {
if ($result !== $ignore) {
$deferred->resolve($result);
$input->close();
$stream->close();
}
});

// reject promise when all transformations are done without any successful transformation above
$stream->on('end', function () use ($deferred) {
$deferred->reject(new \UnderflowException('Stream ended without any successful transformation'));
});

// input error should reject result
$input->on('error', function ($error) use ($deferred, $stream) {
$deferred->reject($error);
$stream->close();
});

return $deferred->promise();
}


/**
* Instantiates a new Transformer instance.
*
Expand Down
Loading