diff --git a/phpstan.neon b/phpstan.neon index f45cc00..f2fbd03 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,6 +1,6 @@ parameters: ignoreErrors: - - '#n method "ReactParallel\\Psr15Adapter\\ReactMiddleware::__invoke", caught "Throwable" must be rethrown.#' + - '#n method "ReactParallel\\Psr15Adapter\\ReactMiddleware::__construct", caught "Throwable" must be rethrown.#' includes: - vendor/wyrihaximus/async-test-utilities/rules.neon \ No newline at end of file diff --git a/src/ReactMiddleware.php b/src/ReactMiddleware.php index 73a098c..fd2b0a2 100644 --- a/src/ReactMiddleware.php +++ b/src/ReactMiddleware.php @@ -26,20 +26,25 @@ final class ReactMiddleware private PoolInterface $runtime; - private MiddlewareInterface $middleware; + private string $fun; public function __construct(Factory $streamFactory, PoolInterface $pool, MiddlewareInterface $middleware) { $this->streamFactory = $streamFactory; $this->runtime = $pool; - $this->middleware = $middleware; + $this->fun = serialize(new SerializableClosure(static function (ServerRequestInterface $request, string $input, string $output) use ($middleware): PromiseInterface { + try { + return resolve($middleware->process($request, new Psr15RequestHandlerAdapter(Channel::open($input), Channel::open($output)))); + } catch (Throwable $throwable) { + return reject($throwable); + } + })); } public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface { return new Promise(function (callable $resolve, callable $reject) use ($request, $next): void { - $name = spl_object_hash($this) . '_' . bin2hex(random_bytes(32)); - $middleware = $this->middleware; + $name = spl_object_hash($this) . '_' . bin2hex(random_bytes(32)); $input = Channel::make($name . '_to_thread', Channel::Infinite); $output = Channel::make($name . '_from_thread', Channel::Infinite); @@ -52,19 +57,12 @@ public function __invoke(ServerRequestInterface $request, callable $next): Promi $input->send($response); }, $reject); - $fun = serialize(new SerializableClosure(static function (ServerRequestInterface $request, Channel $input, Channel $output) use ($middleware): PromiseInterface { - try { - return resolve($middleware->process($request, new Psr15RequestHandlerAdapter($input, $output))); - } catch (Throwable $throwable) { - return reject($throwable); - } - })); $request = serialize($request); - $this->runtime->run(static function (string $fun, string $request, Channel $input, Channel $output): PromiseInterface { + $this->runtime->run(static function (string $fun, string $request, string $input, string $output): PromiseInterface { $request = unserialize($request); return resolve((unserialize($fun))($request, $input, $output)); - }, [$fun, $request, $input, $output])->then($resolve, $reject); + }, [$this->fun, $request, (string) $input, (string) $output])->then($resolve, $reject); }); } } diff --git a/tests/Psr15MiddlewareStub.php b/tests/Psr15MiddlewareStub.php index 8065e7c..d3dfe62 100644 --- a/tests/Psr15MiddlewareStub.php +++ b/tests/Psr15MiddlewareStub.php @@ -15,7 +15,7 @@ final class Psr15MiddlewareStub implements MiddlewareInterface { public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface { - sleep(1); + sleep((int)$request->getAttribute('sleep')); return $handler->handle($request)->withAddedHeader('__CLASS__', __CLASS__); } } diff --git a/tests/ReactMiddlewareTest.php b/tests/ReactMiddlewareTest.php index 4e7a31b..860e7c8 100644 --- a/tests/ReactMiddlewareTest.php +++ b/tests/ReactMiddlewareTest.php @@ -14,6 +14,7 @@ use RingCentral\Psr7\Response; use RingCentral\Psr7\ServerRequest; use WyriHaximus\AsyncTestUtilities\AsyncTestCase; +use function React\Promise\all; use function React\Promise\resolve; /** @@ -24,7 +25,7 @@ final class ReactMiddlewareTest extends AsyncTestCase /** * @test */ - public function handle(): void + public function one(): void { $rnd = bin2hex(random_bytes(1024)); $loop = Factory::create(); @@ -32,6 +33,7 @@ public function handle(): void $stub = new Psr15MiddlewareStub(); $middleware = new ReactMiddleware(new StreamFactory(new EventLoopBridge($loop)), $pool, $stub); $request = new ServerRequest('GET', 'https://example.com/'); + $request = $request->withAttribute('sleep', 1); $request = $request->withAttribute('body', $rnd); $promise = $middleware( @@ -48,4 +50,39 @@ static function (ServerRequestInterface $request): PromiseInterface { self::assertSame($rnd, $response->getBody()->getContents()); self::assertSame(Psr15MiddlewareStub::class, $response->getHeaderLine('__CLASS__')); } + + /** + * @test + */ + public function five(): void + { + $loop = Factory::create(); + $pool = new Infinite($loop, 10); + $stub = new Psr15MiddlewareStub(); + $middleware = new ReactMiddleware(new StreamFactory(new EventLoopBridge($loop)), $pool, $stub); + $baseRequest = new ServerRequest('GET', 'https://example.com/'); + $requests = [ + 'a' => $baseRequest->withAttribute('body', '1')->withAttribute('sleep', 5), + 'b' => $baseRequest->withAttribute('body', '2')->withAttribute('sleep', 1), + 'c' => $baseRequest->withAttribute('body', '3')->withAttribute('sleep', 3), + ]; + + $promises = []; + foreach ($requests as $request) { + $promises[] = $middleware( + $request, + static function (ServerRequestInterface $request): PromiseInterface { + return resolve(new Response(666, [ + 'X-Time' => time(), + ], new ReadOnlyStringStream($request->getAttribute('body')))); + } + ); + } + + /** @var array $responses */ + $responses = $this->await(all($promises), $loop, 13.666); + \Safe\usort($responses, fn (ResponseInterface $a, ResponseInterface $b) => (int)$a->getHeaderLine('X-Time') > (int)$b->getHeaderLine('X-Time')); + + self::assertSame(['2', '3', '1'], array_map(fn (ResponseInterface $response) => $response->getBody()->getContents(), $responses)); + } }