Skip to content

Commit

Permalink
Improve promise cancellation and clean up any garbage references
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Nov 16, 2018
1 parent 469b8ee commit 920be8d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 33 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"require": {
"php": ">=5.3",
"react/promise": "^2.1 || ^1.2",
"react/socket": "^1.0 || ^0.8.6"
"react/socket": "^1.1"
},
"require-dev": {
"phpunit/phpunit": "^6.0 || ^5.7 || ^4.8.35",
Expand Down
83 changes: 52 additions & 31 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,53 @@ public function connect($uri)
$socksUri .= '#' . $parts['fragment'];
}

$that = $this;
// start TCP/IP connection to SOCKS server
$connecting = $this->connector->connect($socksUri);

$deferred = new Deferred(function ($_, $reject) use ($connecting) {
$reject(new RuntimeException(
'Connection cancelled while waiting for proxy (ECONNABORTED)',
defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
));

// either close active connection or cancel pending connection attempt
$connecting->then(function (ConnectionInterface $stream) {
$stream->close();
});
$connecting->cancel();
});

// start TCP/IP connection to SOCKS server and then
// handle SOCKS protocol once connection is ready
// resolve plain connection once SOCKS protocol is completed
return $this->connector->connect($socksUri)->then(
function (ConnectionInterface $stream) use ($that, $host, $port) {
return $that->handleConnectedSocks($stream, $host, $port);
$that = $this;
$connecting->then(
function (ConnectionInterface $stream) use ($that, $host, $port, $deferred) {
$that->handleConnectedSocks($stream, $host, $port, $deferred);
},
function (Exception $e) {
throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e);
function (Exception $e) use ($deferred) {
$deferred->reject($e = new RuntimeException(
'Connection failed because connection to proxy failed (ECONNREFUSED)',
defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111,
$e
));

// avoid garbage references by replacing all closures in call stack.
// what a lovely piece of code!
$r = new \ReflectionProperty('Exception', 'trace');
$r->setAccessible(true);
$trace = $r->getValue($e);
foreach ($trace as &$one) {
foreach ($one['args'] as &$arg) {
if ($arg instanceof \Closure) {
$arg = 'Object(' . get_class($arg) . ')';
}
}
}
$r->setValue($e, $trace);
}
);

return $deferred->promise();
}

/**
Expand All @@ -178,15 +212,12 @@ function (Exception $e) {
* @param ConnectionInterface $stream
* @param string $host
* @param int $port
* @return Promise Promise<ConnectionInterface, Exception>
* @param Deferred $deferred
* @return void
* @internal
*/
public function handleConnectedSocks(ConnectionInterface $stream, $host, $port)
public function handleConnectedSocks(ConnectionInterface $stream, $host, $port, Deferred $deferred)
{
$deferred = new Deferred(function ($_, $reject) {
$reject(new RuntimeException('Connection canceled while establishing SOCKS session (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103));
});

$reader = new StreamReader();
$stream->on('data', array($reader, 'write'));

Expand All @@ -203,32 +234,22 @@ public function handleConnectedSocks(ConnectionInterface $stream, $host, $port)
} else {
$promise = $this->handleSocks4($stream, $host, $port, $reader);
}
$promise->then(function () use ($deferred, $stream) {

$promise->then(function () use ($deferred, $stream, $reader, $onError, $onClose) {
$stream->removeListener('data', array($reader, 'write'));
$stream->removeListener('error', $onError);
$stream->removeListener('close', $onClose);

$deferred->resolve($stream);
}, function (Exception $error) use ($deferred) {
}, function (Exception $error) use ($deferred, $stream) {
// pass custom RuntimeException through as-is, otherwise wrap in protocol error
if (!$error instanceof RuntimeException) {
$error = new RuntimeException('Invalid response received from proxy (EBADMSG)', defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG: 71, $error);
}

$deferred->reject($error);
$stream->close();
});

return $deferred->promise()->then(
function (ConnectionInterface $stream) use ($reader, $onError, $onClose) {
$stream->removeListener('data', array($reader, 'write'));
$stream->removeListener('error', $onError);
$stream->removeListener('close', $onClose);

return $stream;
},
function ($error) use ($stream, $onClose) {
$stream->removeListener('close', $onClose);
$stream->close();

throw $error;
}
);
}

private function handleSocks4(ConnectionInterface $stream, $host, $port, StreamReader $reader)
Expand Down
77 changes: 76 additions & 1 deletion tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use Clue\React\Socks\Client;
use React\Promise\Promise;
use Clue\React\Socks\Server;
use React\Promise\Deferred;

class ClientTest extends TestCase
{
Expand Down Expand Up @@ -199,6 +200,22 @@ public function testCancelConnectionDuringSessionWillCloseStream()
$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
}

public function testCancelConnectionDuringDeferredSessionWillCloseStream()
{
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->getMock();
$stream->expects($this->once())->method('close');

$deferred = new Deferred();

$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:1080?hostname=google.com')->willReturn($deferred->promise());

$promise = $this->client->connect('google.com:80');
$deferred->resolve($stream);
$promise->cancel();

$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
}

public function testEmitConnectionCloseDuringSessionWillRejectConnection()
{
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock();
Expand All @@ -217,7 +234,6 @@ public function testEmitConnectionCloseDuringSessionWillRejectConnection()
public function testEmitConnectionErrorDuringSessionWillRejectConnection()
{
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock();
$stream->expects($this->once())->method('close');

$promise = \React\Promise\resolve($stream);

Expand Down Expand Up @@ -400,4 +416,63 @@ public function testEmitSocks5DataErrorMapsToExceptionCode($error, $expectedCode

$promise->then(null, $this->expectCallableOnceWithExceptionCode($expectedCode));
}

public function testConnectionErrorShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$deferred = new Deferred();
$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());

gc_collect_cycles();

$promise = $this->client->connect('google.com:80');
$deferred->reject(new RuntimeException());
unset($deferred, $promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testCancelConnectionDuringConnectionShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$promise = new Promise(function () { }, function () {
throw new \RuntimeException();
});
$this->connector->expects($this->once())->method('connect')->willReturn($promise);

gc_collect_cycles();

$promise = $this->client->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testCancelConnectionDuringSessionShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock();
$stream->expects($this->once())->method('close');

$promise = new Promise(function ($resolve) use ($stream) { $resolve($stream); });
$this->connector->expects($this->once())->method('connect')->willReturn($promise);

gc_collect_cycles();

$promise = $this->client->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}
}

0 comments on commit 920be8d

Please sign in to comment.