Skip to content

Commit

Permalink
Improve promise cancellation and clean up any garbage references
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Oct 23, 2018
1 parent 5675206 commit fe3d3f0
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 24 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"require": {
"php": ">=5.3",
"react/promise": " ^2.1 || ^1.2.1",
"react/socket": "^1.0 || ^0.8.4",
"react/socket": "^1.1",
"ringcentral/psr7": "^1.2"
},
"require-dev": {
Expand Down
62 changes: 45 additions & 17 deletions src/ProxyConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,34 @@ public function connect($uri)
$proxyUri .= '#' . $parts['fragment'];
}

$auth = $this->proxyAuth;
$connecting = $this->connector->connect($proxyUri);

$deferred = new Deferred(function ($_, $reject) use ($connecting) {
$reject(new RuntimeException(
'Connection cancelled while waiting for proxy (ECONNABORTED)',
defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
));

return $this->connector->connect($proxyUri)->then(function (ConnectionInterface $stream) use ($target, $auth) {
$deferred = new Deferred(function ($_, $reject) use ($stream) {
$reject(new RuntimeException('Connection canceled while waiting for response from proxy (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103));
// either close active connection or cancel pending connection attempt
$connecting->then(function (ConnectionInterface $stream) {
$stream->close();
});
$connecting->cancel();
});

$auth = $this->proxyAuth;
$connecting->then(function (ConnectionInterface $stream) use ($target, $auth, $deferred) {
// keep buffering data until headers are complete
$buffer = '';
$fn = function ($chunk) use (&$buffer, $deferred, $stream) {
$stream->on('data', $fn = function ($chunk) use (&$buffer, $deferred, $stream, &$fn) {
$buffer .= $chunk;

$pos = strpos($buffer, "\r\n\r\n");
if ($pos !== false) {
// end of headers received => stop buffering
$stream->removeListener('data', $fn);
$fn = null;

// try to parse headers as response message
try {
$response = Psr7\parse_response(substr($buffer, 0, $pos));
Expand All @@ -163,11 +176,13 @@ public function connect($uri)
if ($response->getStatusCode() === 407) {
// map status code 407 (Proxy Authentication Required) to EACCES
$deferred->reject(new RuntimeException('Proxy denied connection due to invalid authentication ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (EACCES)', defined('SOCKET_EACCES') ? SOCKET_EACCES : 13));
return $stream->close();
$stream->close();
return;
} elseif ($response->getStatusCode() < 200 || $response->getStatusCode() >= 300) {
// map non-2xx status code to ECONNREFUSED
$deferred->reject(new RuntimeException('Proxy refused connection with HTTP error code ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111));
return $stream->close();
$stream->close();
return;
}

// all okay, resolve with stream instance
Expand All @@ -187,8 +202,7 @@ public function connect($uri)
$deferred->reject(new RuntimeException('Proxy must not send more than 8 KiB of headers (EMSGSIZE)', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90));
$stream->close();
}
};
$stream->on('data', $fn);
});

$stream->on('error', function (Exception $e) use ($deferred) {
$deferred->reject(new RuntimeException('Stream error while waiting for response from proxy (EIO)', defined('SOCKET_EIO') ? SOCKET_EIO : 5, $e));
Expand All @@ -199,14 +213,28 @@ public function connect($uri)
});

$stream->write("CONNECT " . $target . " HTTP/1.1\r\nHost: " . $target . "\r\n" . $auth . "\r\n");

return $deferred->promise()->then(function (ConnectionInterface $stream) use ($fn) {
// Stop buffering when connection has been established.
$stream->removeListener('data', $fn);
return new Promise\FulfilledPromise($stream);
});
}, function (Exception $e) use ($proxyUri) {
throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e);
}, function (Exception $e) use ($deferred) {
$deferred->reject($e = new RuntimeException(
'Unable to connect to proxy (ECONNREFUSED)',
defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111,
$e
));

// avoid garbage references by replacing all closures in call stack.
// what a lovely piece of code!
$r = new \ReflectionProperty('Exception', 'trace');
$r->setAccessible(true);
$trace = $r->getValue($e);
foreach ($trace as &$one) {
foreach ($one['args'] as &$arg) {
if ($arg instanceof \Closure) {
$arg = 'Object(' . get_class($arg) . ')';
}
}
}
$r->setValue($e, $trace);
});

return $deferred->promise();
}
}
9 changes: 6 additions & 3 deletions tests/AbstractTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ protected function expectCallableOnceWithExceptionCode($code)
$mock
->expects($this->once())
->method('__invoke')
->with($this->callback(function ($e) use ($code) {
return $e->getCode() === $code;
}));
->with($this->logicalAnd(
$this->isInstanceOf('Exception'),
$this->callback(function ($e) use ($code) {
return $e->getCode() === $code;
})
));

return $mock;
}
Expand Down
16 changes: 16 additions & 0 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,20 @@ public function testSecureGoogleDoesNotAcceptPlainStream()
$this->setExpectedException('RuntimeException', 'Connection to proxy lost', SOCKET_ECONNRESET);
Block\await($promise, $this->loop, 3.0);
}

/**
* @requires PHP 7
*/
public function testCancelWhileConnectingShouldNotCreateGarbageCycles()
{
$proxy = new ProxyConnector('google.com', $this->dnsConnector);

gc_collect_cycles();

$promise = $proxy->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}
}
69 changes: 66 additions & 3 deletions tests/ProxyConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Clue\React\HttpProxy\ProxyConnector;
use React\Promise\Promise;
use React\Socket\ConnectionInterface;
use React\Promise\Deferred;

class ProxyConnectorTest extends AbstractTestCase
{
Expand Down Expand Up @@ -355,22 +356,84 @@ public function testResolvesIfStreamReturnsSuccessAndEmitsExcessiveData()
$stream->emit('data', array("HTTP/1.1 200 OK\r\n\r\nhello!"));
}

public function testCancelPromiseWillCloseOpenConnectionAndReject()
public function testCancelPromiseWhileConnectionIsReadyWillCloseOpenConnectionAndReject()
{
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();
$stream->expects($this->once())->method('close');

$promise = \React\Promise\resolve($stream);
$this->connector->expects($this->once())->method('connect')->willReturn($promise);
$deferred = new Deferred();

$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');

$deferred->resolve($stream);

$this->assertInstanceOf('React\Promise\CancellablePromiseInterface', $promise);

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
}

public function testCancelPromiseDuringConnectionShouldNotCreateGarbageCycles()
{
$pending = new Promise(function () { });
$this->connector->expects($this->once())->method('connect')->willReturn($pending);

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testCancelPromiseWhileConnectionIsReadyShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();

$deferred = new Deferred();

$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
$deferred->resolve($stream);
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testRejectedConnectionShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$rejected = \React\Promise\reject(new \RuntimeException());
$this->connector->expects($this->once())->method('connect')->willReturn($rejected);

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}
}

0 comments on commit fe3d3f0

Please sign in to comment.