From 920be8d8b9e77ddc0730bbda57eba48e91d05c91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Fri, 16 Nov 2018 11:16:43 +0100 Subject: [PATCH] Improve promise cancellation and clean up any garbage references --- composer.json | 2 +- src/Client.php | 83 +++++++++++++++++++++++++++----------------- tests/ClientTest.php | 77 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 129 insertions(+), 33 deletions(-) diff --git a/composer.json b/composer.json index 5272ac4..40e327a 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "require": { "php": ">=5.3", "react/promise": "^2.1 || ^1.2", - "react/socket": "^1.0 || ^0.8.6" + "react/socket": "^1.1" }, "require-dev": { "phpunit/phpunit": "^6.0 || ^5.7 || ^4.8.35", diff --git a/src/Client.php b/src/Client.php index e37a7e0..10138b6 100644 --- a/src/Client.php +++ b/src/Client.php @@ -157,19 +157,53 @@ public function connect($uri) $socksUri .= '#' . $parts['fragment']; } - $that = $this; + // start TCP/IP connection to SOCKS server + $connecting = $this->connector->connect($socksUri); + + $deferred = new Deferred(function ($_, $reject) use ($connecting) { + $reject(new RuntimeException( + 'Connection cancelled while waiting for proxy (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + + // either close active connection or cancel pending connection attempt + $connecting->then(function (ConnectionInterface $stream) { + $stream->close(); + }); + $connecting->cancel(); + }); - // start TCP/IP connection to SOCKS server and then // handle SOCKS protocol once connection is ready // resolve plain connection once SOCKS protocol is completed - return $this->connector->connect($socksUri)->then( - function (ConnectionInterface $stream) use ($that, $host, $port) { - return $that->handleConnectedSocks($stream, $host, $port); + $that = $this; + $connecting->then( + function (ConnectionInterface $stream) use ($that, $host, $port, $deferred) { + $that->handleConnectedSocks($stream, $host, $port, $deferred); }, - function (Exception $e) { - throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e); + function (Exception $e) use ($deferred) { + $deferred->reject($e = new RuntimeException( + 'Connection failed because connection to proxy failed (ECONNREFUSED)', + defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, + $e + )); + + // avoid garbage references by replacing all closures in call stack. + // what a lovely piece of code! + $r = new \ReflectionProperty('Exception', 'trace'); + $r->setAccessible(true); + $trace = $r->getValue($e); + foreach ($trace as &$one) { + foreach ($one['args'] as &$arg) { + if ($arg instanceof \Closure) { + $arg = 'Object(' . get_class($arg) . ')'; + } + } + } + $r->setValue($e, $trace); } ); + + return $deferred->promise(); } /** @@ -178,15 +212,12 @@ function (Exception $e) { * @param ConnectionInterface $stream * @param string $host * @param int $port - * @return Promise Promise + * @param Deferred $deferred + * @return void * @internal */ - public function handleConnectedSocks(ConnectionInterface $stream, $host, $port) + public function handleConnectedSocks(ConnectionInterface $stream, $host, $port, Deferred $deferred) { - $deferred = new Deferred(function ($_, $reject) { - $reject(new RuntimeException('Connection canceled while establishing SOCKS session (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103)); - }); - $reader = new StreamReader(); $stream->on('data', array($reader, 'write')); @@ -203,32 +234,22 @@ public function handleConnectedSocks(ConnectionInterface $stream, $host, $port) } else { $promise = $this->handleSocks4($stream, $host, $port, $reader); } - $promise->then(function () use ($deferred, $stream) { + + $promise->then(function () use ($deferred, $stream, $reader, $onError, $onClose) { + $stream->removeListener('data', array($reader, 'write')); + $stream->removeListener('error', $onError); + $stream->removeListener('close', $onClose); + $deferred->resolve($stream); - }, function (Exception $error) use ($deferred) { + }, function (Exception $error) use ($deferred, $stream) { // pass custom RuntimeException through as-is, otherwise wrap in protocol error if (!$error instanceof RuntimeException) { $error = new RuntimeException('Invalid response received from proxy (EBADMSG)', defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG: 71, $error); } $deferred->reject($error); + $stream->close(); }); - - return $deferred->promise()->then( - function (ConnectionInterface $stream) use ($reader, $onError, $onClose) { - $stream->removeListener('data', array($reader, 'write')); - $stream->removeListener('error', $onError); - $stream->removeListener('close', $onClose); - - return $stream; - }, - function ($error) use ($stream, $onClose) { - $stream->removeListener('close', $onClose); - $stream->close(); - - throw $error; - } - ); } private function handleSocks4(ConnectionInterface $stream, $host, $port, StreamReader $reader) diff --git a/tests/ClientTest.php b/tests/ClientTest.php index e304901..4f13f57 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -3,6 +3,7 @@ use Clue\React\Socks\Client; use React\Promise\Promise; use Clue\React\Socks\Server; +use React\Promise\Deferred; class ClientTest extends TestCase { @@ -199,6 +200,22 @@ public function testCancelConnectionDuringSessionWillCloseStream() $promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED)); } + public function testCancelConnectionDuringDeferredSessionWillCloseStream() + { + $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->getMock(); + $stream->expects($this->once())->method('close'); + + $deferred = new Deferred(); + + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:1080?hostname=google.com')->willReturn($deferred->promise()); + + $promise = $this->client->connect('google.com:80'); + $deferred->resolve($stream); + $promise->cancel(); + + $promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED)); + } + public function testEmitConnectionCloseDuringSessionWillRejectConnection() { $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock(); @@ -217,7 +234,6 @@ public function testEmitConnectionCloseDuringSessionWillRejectConnection() public function testEmitConnectionErrorDuringSessionWillRejectConnection() { $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock(); - $stream->expects($this->once())->method('close'); $promise = \React\Promise\resolve($stream); @@ -400,4 +416,63 @@ public function testEmitSocks5DataErrorMapsToExceptionCode($error, $expectedCode $promise->then(null, $this->expectCallableOnceWithExceptionCode($expectedCode)); } + + public function testConnectionErrorShouldNotCreateGarbageCycles() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $deferred = new Deferred(); + $this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); + + gc_collect_cycles(); + + $promise = $this->client->connect('google.com:80'); + $deferred->reject(new RuntimeException()); + unset($deferred, $promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testCancelConnectionDuringConnectionShouldNotCreateGarbageCycles() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $promise = new Promise(function () { }, function () { + throw new \RuntimeException(); + }); + $this->connector->expects($this->once())->method('connect')->willReturn($promise); + + gc_collect_cycles(); + + $promise = $this->client->connect('google.com:80'); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testCancelConnectionDuringSessionShouldNotCreateGarbageCycles() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock(); + $stream->expects($this->once())->method('close'); + + $promise = new Promise(function ($resolve) use ($stream) { $resolve($stream); }); + $this->connector->expects($this->once())->method('connect')->willReturn($promise); + + gc_collect_cycles(); + + $promise = $this->client->connect('google.com:80'); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } }