Skip to content

Commit

Permalink
Merge branch 'master' of github.com:reactphp-parallel/psr-15-adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus committed Jun 20, 2020
2 parents 913d07d + 88bf1ef commit 19d1d45
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 16 deletions.
2 changes: 1 addition & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
parameters:
ignoreErrors:
- '#n method "ReactParallel\\Psr15Adapter\\ReactMiddleware::__invoke", caught "Throwable" must be rethrown.#'
- '#n method "ReactParallel\\Psr15Adapter\\ReactMiddleware::__construct", caught "Throwable" must be rethrown.#'

includes:
- vendor/wyrihaximus/async-test-utilities/rules.neon
24 changes: 11 additions & 13 deletions src/ReactMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@ final class ReactMiddleware

private PoolInterface $runtime;

private MiddlewareInterface $middleware;
private string $fun;

public function __construct(Factory $streamFactory, PoolInterface $pool, MiddlewareInterface $middleware)
{
$this->streamFactory = $streamFactory;
$this->runtime = $pool;
$this->middleware = $middleware;
$this->fun = serialize(new SerializableClosure(static function (ServerRequestInterface $request, string $input, string $output) use ($middleware): PromiseInterface {
try {
return resolve($middleware->process($request, new Psr15RequestHandlerAdapter(Channel::open($input), Channel::open($output))));
} catch (Throwable $throwable) {
return reject($throwable);
}
}));
}

public function __invoke(ServerRequestInterface $request, callable $next): PromiseInterface
{
return new Promise(function (callable $resolve, callable $reject) use ($request, $next): void {
$name = spl_object_hash($this) . '_' . bin2hex(random_bytes(32));
$middleware = $this->middleware;
$name = spl_object_hash($this) . '_' . bin2hex(random_bytes(32));

$input = Channel::make($name . '_to_thread', Channel::Infinite);
$output = Channel::make($name . '_from_thread', Channel::Infinite);
Expand All @@ -52,19 +57,12 @@ public function __invoke(ServerRequestInterface $request, callable $next): Promi
$input->send($response);
}, $reject);

$fun = serialize(new SerializableClosure(static function (ServerRequestInterface $request, Channel $input, Channel $output) use ($middleware): PromiseInterface {
try {
return resolve($middleware->process($request, new Psr15RequestHandlerAdapter($input, $output)));
} catch (Throwable $throwable) {
return reject($throwable);
}
}));
$request = serialize($request);
$this->runtime->run(static function (string $fun, string $request, Channel $input, Channel $output): PromiseInterface {
$this->runtime->run(static function (string $fun, string $request, string $input, string $output): PromiseInterface {
$request = unserialize($request);

return resolve((unserialize($fun))($request, $input, $output));
}, [$fun, $request, $input, $output])->then($resolve, $reject);
}, [$this->fun, $request, (string) $input, (string) $output])->then($resolve, $reject);
});
}
}
2 changes: 1 addition & 1 deletion tests/Psr15MiddlewareStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ final class Psr15MiddlewareStub implements MiddlewareInterface
{
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
sleep(1);
sleep((int)$request->getAttribute('sleep'));
return $handler->handle($request)->withAddedHeader('__CLASS__', __CLASS__);
}
}
39 changes: 38 additions & 1 deletion tests/ReactMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use RingCentral\Psr7\Response;
use RingCentral\Psr7\ServerRequest;
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
use function React\Promise\all;
use function React\Promise\resolve;

/**
Expand All @@ -24,14 +25,15 @@ final class ReactMiddlewareTest extends AsyncTestCase
/**
* @test
*/
public function handle(): void
public function one(): void
{
$rnd = bin2hex(random_bytes(1024));
$loop = Factory::create();
$pool = new Infinite($loop, 10);
$stub = new Psr15MiddlewareStub();
$middleware = new ReactMiddleware(new StreamFactory(new EventLoopBridge($loop)), $pool, $stub);
$request = new ServerRequest('GET', 'https://example.com/');
$request = $request->withAttribute('sleep', 1);
$request = $request->withAttribute('body', $rnd);

$promise = $middleware(
Expand All @@ -48,4 +50,39 @@ static function (ServerRequestInterface $request): PromiseInterface {
self::assertSame($rnd, $response->getBody()->getContents());
self::assertSame(Psr15MiddlewareStub::class, $response->getHeaderLine('__CLASS__'));
}

/**
* @test
*/
public function five(): void
{
$loop = Factory::create();
$pool = new Infinite($loop, 10);
$stub = new Psr15MiddlewareStub();
$middleware = new ReactMiddleware(new StreamFactory(new EventLoopBridge($loop)), $pool, $stub);
$baseRequest = new ServerRequest('GET', 'https://example.com/');
$requests = [
'a' => $baseRequest->withAttribute('body', '1')->withAttribute('sleep', 5),
'b' => $baseRequest->withAttribute('body', '2')->withAttribute('sleep', 1),
'c' => $baseRequest->withAttribute('body', '3')->withAttribute('sleep', 3),
];

$promises = [];
foreach ($requests as $request) {
$promises[] = $middleware(
$request,
static function (ServerRequestInterface $request): PromiseInterface {
return resolve(new Response(666, [
'X-Time' => time(),
], new ReadOnlyStringStream($request->getAttribute('body'))));
}
);
}

/** @var array<ResponseInterface> $responses */
$responses = $this->await(all($promises), $loop, 13.666);
\Safe\usort($responses, fn (ResponseInterface $a, ResponseInterface $b) => (int)$a->getHeaderLine('X-Time') > (int)$b->getHeaderLine('X-Time'));

self::assertSame(['2', '3', '1'], array_map(fn (ResponseInterface $response) => $response->getBody()->getContents(), $responses));
}
}

0 comments on commit 19d1d45

Please sign in to comment.