diff --git a/CHANGELOG.md b/CHANGELOG.md index f723d8d..8b5f140 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,15 @@ ## v1.2.x -- Add Batch IP from - - https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper - - https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1 - - https://github.com/wazum/symfony-messenger-batch -- YFlow rework +- Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP + - Add Flow\AsyncHandler\AsyncHandler the default handler when Event::SYNC is dispatched + - Add Batch IP with Flow\AsyncHandler\BatchAsyncHandler from + - https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper + - https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1 + - https://github.com/wazum/symfony-messenger-batch + - Add Flow\AsyncHandler\DeferAsyncHandler to gain granular control on the async Event::SYNC step event +- Flow\Flow\YFlow rework +- Add more exemples in `examples/yflow.php` to play with Y-Combinators - Update DX for Flow\DriverInterface : add `defer` to gain much granular control on asynchronous callbacks ## v1.2.0 diff --git a/docs/src/content/en/docs/getting-started/async-handler.md b/docs/src/content/en/docs/getting-started/async-handler.md index 3db793b..56ffd18 100644 --- a/docs/src/content/en/docs/getting-started/async-handler.md +++ b/docs/src/content/en/docs/getting-started/async-handler.md @@ -25,6 +25,12 @@ This is the default one. Ip is async processed immediately. This async process Ip as batch capability : the handler will wait for a certain amount of async messages ($batchSize) to be processed before pushing them. +## DeferAsyncHandler + +This async process Ip to offer defer capability : the handler will pass [$data, $defer] as entry for the job. In that case, the job can fine control the async process. $defer is a callable that embark two callbacks +- an complete callback to store result +- an async callback to go to the next async call. + ## Make your Async Handler You can make your custom Ip strategy by implementing `Flow\AsyncHandlerInterface` diff --git a/examples/flow.php b/examples/flow.php index e697db3..26edddd 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -18,7 +18,7 @@ use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; -$driver = match (random_int(1, 4)) { +$driver = match (random_int(4, 4)) { 1 => new AmpDriver(), 2 => new FiberDriver(), 3 => new ReactDriver(), diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index 88612a4..fafcef3 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -60,16 +60,32 @@ public function async(Closure $callback): Closure }; } + public function defer(Closure $callback): mixed + { + return null; + } + public function await(array &$stream): void { - $async = function ($ip, $fnFlows, $index, $onResolve) { - $async = $this->async($fnFlows[$index]['job']); + $async = function (Closure $job) { + return function (mixed $data) use ($job) { + $async = $this->async($job); + + $next = fn($value) => $value; + if ($data === null) { + $async($next)(); + } else { + $async($next)($data); + } - if ($ip->data === null) { - return $async($onResolve)(); - } + return static function($onResolve) use ($next) { + $next($onResolve); + }; + }; + }; - return $async($onResolve)($ip->data); + $defer = function (Closure $job) use ($async) { + return $async($job); }; $nextIp = null; @@ -78,7 +94,7 @@ public function await(array &$stream): void foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index ee2e846..30846bd 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -54,6 +54,11 @@ public function async(Closure $callback): Closure }; } + public function defer(Closure $callback): mixed + { + return null; + } + public function await(array &$stream): void { $async = function ($ip, $fnFlows, $index, $onResolve) {