Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏗️ YFlow rework #53

Merged
merged 19 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

## v1.2.x

- Add Batch IP from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch
- Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP
- Add Flow\AsyncHandler\AsyncHandler the default handler when Event::SYNC is dispatched
- Add Batch IP with Flow\AsyncHandler\BatchAsyncHandler from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch
- Add Flow\AsyncHandler\DeferAsyncHandler to gain granular control on the async Event::SYNC step event
- Flow\Flow\YFlow rework
- Add more exemples in `examples/yflow.php` to play with Y-Combinators
- Update DX for Flow\DriverInterface : add `defer` to gain much granular control on asynchronous callbacks

## v1.2.0

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ $flow($ip);
A working script is available in the bundled `examples` directory

- Run Flow : `php examples/flow.php`
- Run Y-Combinator Flow : `php examples/yflow.php`
- Start Server : `php examples/server.php`
Start Client(s) : `php examples/client.php`

Expand Down
6 changes: 6 additions & 0 deletions docs/src/content/en/docs/getting-started/async-handler.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ This is the default one. Ip is async processed immediately.

This async process Ip as batch capability : the handler will wait for a certain amount of async messages ($batchSize) to be processed before pushing them.

## DeferAsyncHandler

This async process Ip to offer defer capability : the handler will pass [$data, $defer] as entry for the job. In that case, the job can fine control the async process. $defer is a callable that embark two callbacks
- an complete callback to store result
- an async callback to go to the next async call.

## Make your Async Handler

You can make your custom Ip strategy by implementing `Flow\AsyncHandlerInterface`
1 change: 1 addition & 0 deletions docs/src/content/en/docs/getting-started/ressources.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Video of Y-Combinator : [https://www.youtube.com/watch?v=QSS_ZcO8Q1g](https://ww
- Lambda calculus language explanation : [https://tgdwyer.github.io/lambdacalculus](https://tgdwyer.github.io/lambdacalculus)
- Combinator : [https://github.com/loophp/combinator](https://github.com/loophp/combinator)
- Lambda-php : [https://github.com/igorw/lambda-php](https://github.com/igorw/lambda-php)
- Deriving the y combinator in 7 easy steps : [https://gist.github.com/igstan/388351](https://gist.github.com/igstan/388351)

## Messaging approach with East oriented code from [Frédéric Hardy](https://twitter.com/mageekguy)

Expand Down
10 changes: 10 additions & 0 deletions examples/Data/YFlowData.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

class YFlowData
{
public function __construct(public int $id, public ?int $number, public ?int $result = null) {}
}
1 change: 0 additions & 1 deletion examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,3 @@
};
$asyncTask($job1, $job2, $job3, $errorJob1, $errorJob2, $driver);
echo "ended - synchronous\n";
echo 'maths - 4 + 4 = ' . (4 + 4) . "\n";
175 changes: 175 additions & 0 deletions examples/yflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

declare(strict_types=1);

require __DIR__ . '/../vendor/autoload.php';

use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data\YFlowData;
use Flow\Flow\Flow;
use Flow\Flow\YFlow;
use Flow\Ip;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
2 => new ReactDriver(),
3 => new FiberDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
};
printf("Use %s\n", $driver::class);

function factorial(int $n): int
{
return ($n <= 1) ? 1 : $n * factorial($n - 1);
}

function Ywrap(callable $func, callable $wrapperFunc): Closure
{
$U = static fn ($f) => $f($f);
$Y = static fn (callable $f, callable $g) => $U(static fn (Closure $x) => $f($g(static fn ($y) => $U($x)($y))));

return $Y($func, $wrapperFunc);
}

function memoWrapperGenerator(callable $f): Closure
{
static $cache = [];

return static function ($y) use ($f, &$cache) {
if (!isset($cache[$y])) {
$cache[$y] = $f($y);
}

return $cache[$y];
};
}

function Ymemo(callable $f): Closure
{
return Ywrap($f, 'memoWrapperGenerator');
}

function factorialGen(callable $func): Closure
{
return static function (int $n) use ($func): int {
return ($n <= 1) ? 1 : $n * $func($n - 1);
};
}

function factorialYMemo(int $n): int
{
return Ymemo('factorialGen')($n);
}

$factorialJob = static function (YFlowData $data): YFlowData {
printf("*... #%d - Job 1 : Calculating factorial(%d)\n", $data->id, $data->number);

// raw factorial calculation
$result = factorial($data->number);

printf("*... #%d - Job 1 : Result for factorial(%d) = %d\n", $data->id, $data->number, $result);

return new YFlowData($data->id, $data->number);
};

$factorialYJobBefore = static function (YFlowData $data): YFlowData {
printf(".*.. #%d - Job 2 : Calculating factorialYJob(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$factorialYJob = static function ($factorial) {
return static function (YFlowData $data) use ($factorial): YFlowData {
return new YFlowData(
$data->id,
$data->number,
($data->result <= 1) ? 1 : $data->result * $factorial(new YFlowData($data->id, $data->number, $data->result - 1))->result
);
};
};

$factorialYJobAfter = static function (YFlowData $data): YFlowData {
printf(".*.. #%d - Job 2 : Result for factorialYJob(%d) = %d\n", $data->id, $data->number, $data->result);

return new YFlowData($data->id, $data->number);
};

$factorialYMemoJob = static function (YFlowData $data): YFlowData {
printf("..*. #%d - Job 3 : Calculating factorialYMemo(%d)\n", $data->id, $data->number);

$result = factorialYMemo($data->number);

printf("..*. #%d - Job 3 : Result for factorialYMemo(%d) = %d\n", $data->id, $data->number, $result);

return new YFlowData($data->id, $data->number);
};

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

$factorialYJobDeferBefore = static function (YFlowData $data) {
printf("...* #%d - Job 4 : Calculating factorialYJobDefer(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$factorialYJobDefer = $Y(static function ($factorial) {
return static function ($args) use ($factorial) {
[$data, $defer] = $args;

return $defer(static function ($complete, $async) use ($data, $defer, $factorial) {
if ($data->result <= 1) {
$complete([new YFlowData($data->id, $data->number, 1), $defer]);
} else {
$async($factorial([new YFlowData($data->id, $data->number, $data->result - 1), $defer]), static function ($result) use ($data, $complete) {
[$resultData, $defer] = $result;
$complete([new YFlowData($data->id, $data->number, $data->result * $resultData->result), $defer]);
});
}
});
};
});

$factorialYJobDeferAfter = static function ($args) {
[$data, $defer] = $args;

return $defer(static function ($complete) use ($data, $defer) {
printf("...* #%d - Job 4 : Result for factorialYJobDefer(%d) = %d\n", $data->id, $data->number, $data->result);

$complete([new YFlowData($data->id, $data->number), $defer]);
});
};

$flow = Flow::do(static function () use (
$factorialJob,
$factorialYJobBefore,
$factorialYJob,
$factorialYJobAfter,
$factorialYMemoJob,
$factorialYJobDeferBefore,
$factorialYJobDefer,
$factorialYJobDeferAfter
) {
yield [$factorialJob];
yield [$factorialYJobBefore];
yield new YFlow($factorialYJob);
yield [$factorialYJobAfter];
yield [$factorialYMemoJob];
yield [$factorialYJobDeferBefore];
yield [$factorialYJobDefer, null, null, null, new DeferAsyncHandler()];
yield [$factorialYJobDeferAfter, null, null, null, new DeferAsyncHandler()];
}, ['driver' => $driver]);

for ($i = 1; $i <= 5; $i++) {
$ip = new Ip(new YFlowData($i, $i));
$flow($ip);
}

$flow->await();
13 changes: 10 additions & 3 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
use Flow\Event;
use Flow\Event\AsyncEvent;

use function call_user_func_array;

/**
* @template T
*
* @implements AsyncHandlerInterface<T>
*/
final class AsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
Expand All @@ -21,6 +24,10 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
call_user_func_array($event->getAsync(), $event->getArgs());
$ip = $event->getIp();
$async = $event->getAsync();
$asyncJob = $async($event->getJob());
$next = $asyncJob($ip->data);
$next($event->getCallback());
}
}
27 changes: 21 additions & 6 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,30 @@
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

use function call_user_func_array;

/**
* @template T1
* @template T2
*
* @implements AsyncHandlerInterface<T1>
*/
final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

/**
* @var AsyncHandlerInterface<T2>
*/
private AsyncHandlerInterface $asyncHandler;

/**
* @param null|AsyncHandlerInterface<T2> $asyncHandler
*/
public function __construct(
private int $batchSize = 10,
) {}
?AsyncHandlerInterface $asyncHandler = null,
) {
$this->asyncHandler = $asyncHandler ?? new AsyncHandler();
}

public static function getSubscribedEvents()
{
Expand All @@ -31,8 +46,8 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
$ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) {
call_user_func_array($event->getAsync(), $event->getArgs());
$ack = new Acknowledger(get_debug_type($this), function (?Throwable $e = null, $event = null) {
$this->asyncHandler->async($event);
});

$this->handle($event, $ack);
Expand All @@ -43,7 +58,7 @@ public function async(AsyncEvent $event): void
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs
* @param array{0: AsyncEvent<T1>, 1: Acknowledger}[] $jobs
*
* @phpstan-ignore method.unused
*/
Expand Down
36 changes: 36 additions & 0 deletions src/AsyncHandler/DeferAsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;

/**
* @template T
*
* @implements AsyncHandlerInterface<T>
*/
final class DeferAsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$ip = $event->getIp();
$job = $event->getJob();
$next = $job([$ip->data, $event->getDefer()]);
$next(static function ($result) use ($event) {
[$data] = $result;
$callback = $event->getCallback();
$callback($data);
});
}
}
12 changes: 11 additions & 1 deletion src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

namespace Flow;

use Flow\Event\AsyncEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

interface AsyncHandlerInterface extends EventSubscriberInterface {}
/**
* @template T
*/
interface AsyncHandlerInterface extends EventSubscriberInterface
{
/**
* @param AsyncEvent<T> $event
*/
public function async(AsyncEvent $event): void;
}
Loading
Loading