diff --git a/example/route_guide/lib/src/client.dart b/example/route_guide/lib/src/client.dart index c1545107..3297e3d0 100644 --- a/example/route_guide/lib/src/client.dart +++ b/example/route_guide/lib/src/client.dart @@ -18,12 +18,17 @@ class Client { Future main(List args) async { channel = new ClientChannel('127.0.0.1', port: 8080, options: const ChannelOptions.insecure()); - stub = new RouteGuideClient(channel); + stub = new RouteGuideClient(channel, + options: new CallOptions(timeout: new Duration(seconds: 30))); // Run all of the demos in order. - await runGetFeature(); - await runListFeatures(); - await runRecordRoute(); - await runRouteChat(); + try { + await runGetFeature(); + await runListFeatures(); + await runRecordRoute(); + await runRouteChat(); + } catch (e) { + print('Caught error: $e'); + } await channel.shutdown(); } diff --git a/lib/src/client.dart b/lib/src/client.dart index b87f04c4..3293f5e7 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -7,7 +7,9 @@ import 'dart:async'; import 'dart:io'; +import 'dart:math'; import 'package:http2/transport.dart'; +import 'package:meta/meta.dart'; import 'shared.dart'; import 'status.dart'; @@ -21,38 +23,95 @@ const _reservedHeaders = const [ 'user-agent', ]; +const defaultIdleTimeout = const Duration(minutes: 5); + +typedef Duration BackoffStrategy(Duration lastBackoff); + +// Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md +const _minConnectTimeout = const Duration(seconds: 20); +const _initialBackoff = const Duration(seconds: 1); +const _maxBackoff = const Duration(seconds: 120); +const _multiplier = 1.6; +const _jitter = 0.2; +final _random = new Random(); + +Duration defaultBackoffStrategy(Duration lastBackoff) { + if (lastBackoff == null) return _initialBackoff; + final jitter = _random.nextDouble() * 2 * _jitter - _jitter; + final nextBackoff = lastBackoff * (_multiplier + jitter); + return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff; +} + /// Options controlling how connections are made on a [ClientChannel]. class ChannelOptions { - final bool _useTls; + final bool isSecure; final List _certificateBytes; final String _certificatePassword; final String authority; - - const ChannelOptions._(this._useTls, - [this._certificateBytes, this._certificatePassword, this.authority]); - - /// Enable TLS using the default trust store. - const ChannelOptions() : this._(true); + final Duration idleTimeout; + final BackoffStrategy backoffStrategy; + + const ChannelOptions._( + this.isSecure, + this._certificateBytes, + this._certificatePassword, + this.authority, + Duration idleTimeout, + BackoffStrategy backoffStrategy) + : this.idleTimeout = idleTimeout ?? defaultIdleTimeout, + this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy; /// Disable TLS. RPCs are sent in clear text. - const ChannelOptions.insecure() : this._(false); - - /// Enable TLS and specify the [certificate]s to trust. - ChannelOptions.secure( - {List certificate, String password, String authority}) - : this._(true, certificate, password, authority); + const ChannelOptions.insecure( + {Duration idleTimeout, + BackoffStrategy backoffStrategy = + defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. + : this._(false, null, null, null, idleTimeout, backoffStrategy); + + /// Enable TLS and optionally specify the [certificate]s to trust. If + /// [certificates] is not provided, the default trust store is used. + const ChannelOptions.secure( + {List certificate, + String password, + String authority, + Duration idleTimeout, + BackoffStrategy backoffStrategy = + defaultBackoffStrategy}) // Remove when dart-lang/sdk#31066 is fixed. + : this._(true, certificate, password, authority, idleTimeout, + backoffStrategy); SecurityContext get securityContext { - if (!_useTls) return null; - final context = createSecurityContext(false); + if (!isSecure) return null; if (_certificateBytes != null) { - context.setTrustedCertificatesBytes(_certificateBytes, - password: _certificatePassword); + return createSecurityContext(false) + ..setTrustedCertificatesBytes(_certificateBytes, + password: _certificatePassword); + } + final context = SecurityContext.defaultContext; + if (SecurityContext.alpnSupported) { + context.setAlpnProtocols(supportedAlpnProtocols, false); } return context; } } +enum ConnectionState { + /// Actively trying to connect. + connecting, + + /// Connection successfully established. + ready, + + /// Some transient failure occurred, waiting to re-connect. + transientFailure, + + /// Not currently connected, and no pending RPCs. + idle, + + /// Shutting down, no further RPCs allowed. + shutdown +} + /// A connection to a single RPC endpoint. /// /// RPCs made on a connection are always sent to the same endpoint. @@ -67,9 +126,23 @@ class ClientConnection { new Header.ascii('grpc-accept-encoding', 'identity'); static final _userAgent = new Header.ascii('user-agent', 'dart-grpc/0.2.0'); - final ClientTransportConnection _transport; + final String host; + final int port; + final ChannelOptions options; - ClientConnection(this._transport); + ConnectionState _state = ConnectionState.idle; + void Function(ClientConnection connection) onStateChanged; + final _pendingCalls = []; + + ClientTransportConnection _transport; + + /// Used for idle and reconnect timeout, depending on [_state]. + Timer _timer; + Duration _currentReconnectDelay; + + ClientConnection(this.host, this.port, this.options); + + ConnectionState get state => _state; static List
createCallHeaders( bool useTls, String authority, String path, CallOptions options) { @@ -95,13 +168,83 @@ class ClientConnection { return headers; } + String get authority => options.authority ?? host; + + @visibleForTesting + Future connectTransport() async { + final securityContext = options.securityContext; + + var socket = await Socket.connect(host, port); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + throw 'Shutting down'; + } + if (securityContext != null) { + socket = await SecureSocket.secure(socket, + host: authority, context: securityContext); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + throw 'Shutting down'; + } + } + socket.done.then(_handleSocketClosed); + return new ClientTransportConnection.viaSocket(socket); + } + + void _connect() { + if (_state != ConnectionState.idle && + _state != ConnectionState.transientFailure) { + return; + } + _setState(ConnectionState.connecting); + connectTransport().then((transport) { + _currentReconnectDelay = null; + _transport = transport; + _transport.onActiveStateChanged = _handleActiveStateChanged; + _setState(ConnectionState.ready); + _pendingCalls.forEach(_startCall); + _pendingCalls.clear(); + }).catchError(_handleTransientFailure); + } + + void dispatchCall(ClientCall call) { + switch (_state) { + case ConnectionState.ready: + _startCall(call); + break; + case ConnectionState.shutdown: + _shutdownCall(call); + break; + default: + _pendingCalls.add(call); + if (_state == ConnectionState.idle) { + _connect(); + } + } + } + + void _startCall(ClientCall call) { + if (call._isCancelled) return; + final headers = + createCallHeaders(options.isSecure, authority, call.path, call.options); + final stream = _transport.makeRequest(headers); + call._onConnectedStream(stream); + } + + void _shutdownCall(ClientCall call) { + if (call._isCancelled) return; + call._onConnectError( + new GrpcError.unavailable('Connection shutting down.')); + } + /// Shuts down this connection. /// /// No further calls may be made on this connection, but existing calls /// are allowed to finish. Future shutdown() { - // TODO(jakobr): Manage streams, close [_transport] when all are done. - return _transport.finish(); + if (_state == ConnectionState.shutdown) return new Future.value(); + _setShutdownState(); + return _transport?.finish() ?? new Future.value(); } /// Terminates this connection. @@ -109,20 +252,81 @@ class ClientConnection { /// All open calls are terminated immediately, and no further calls may be /// made on this connection. Future terminate() { - // TODO(jakobr): Manage streams, close them immediately. - return _transport.terminate(); + _setShutdownState(); + return _transport?.terminate() ?? new Future.value(); } - /// Starts a new RPC on this connection. - /// - /// Creates a new transport stream on this connection, and sends initial call - /// metadata. - ClientTransportStream sendRequest( - bool useTls, String authority, String path, CallOptions options) { - final headers = createCallHeaders(useTls, authority, path, options); - final stream = _transport.makeRequest(headers); - // TODO(jakobr): Manage streams. Subscribe to stream state changes. - return stream; + void _setShutdownState() { + _setState(ConnectionState.shutdown); + _cancelTimer(); + _pendingCalls.forEach(_shutdownCall); + _pendingCalls.clear(); + } + + void _setState(ConnectionState state) { + _state = state; + if (onStateChanged != null) { + onStateChanged(this); + } + } + + void _handleIdleTimeout() { + if (_timer == null || _state != ConnectionState.ready) return; + _cancelTimer(); + _transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error. + _transport = null; + _setState(ConnectionState.idle); + } + + void _cancelTimer() { + _timer?.cancel(); + _timer = null; + } + + void _handleActiveStateChanged(bool isActive) { + if (isActive) { + _cancelTimer(); + } else { + if (options.idleTimeout != null) { + _timer ??= new Timer(options.idleTimeout, _handleIdleTimeout); + } + } + } + + bool _hasPendingCalls() { + // Get rid of pending calls that have timed out. + _pendingCalls.removeWhere((call) => call._isCancelled); + return _pendingCalls.isNotEmpty; + } + + void _handleTransientFailure(error) { + _transport = null; + if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { + return; + } + // TODO(jakobr): Log error. + _cancelTimer(); + if (!_hasPendingCalls()) { + _setState(ConnectionState.idle); + return; + } + _setState(ConnectionState.transientFailure); + _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); + _timer = new Timer(_currentReconnectDelay, _handleReconnect); + } + + void _handleReconnect() { + if (_timer == null || _state != ConnectionState.transientFailure) return; + _cancelTimer(); + _connect(); + } + + void _handleSocketClosed(Socket _) { + _cancelTimer(); + if (_state != ConnectionState.idle && _state != ConnectionState.shutdown) { + // We were not planning to close the socket. + _handleTransientFailure('Socket closed'); + } } } @@ -136,20 +340,13 @@ class ClientChannel { final int port; final ChannelOptions options; - final _connections = []; + // TODO(jakobr): Multiple connections, load balancing. + ClientConnection _connection; bool _isShutdown = false; ClientChannel(this.host, - {this.port = 443, this.options = const ChannelOptions()}); - - String get authority => options.authority ?? host; - - void _shutdownCheck([Function() cleanup]) { - if (!_isShutdown) return; - if (cleanup != null) cleanup(); - throw new GrpcError.unavailable('Channel shutting down.'); - } + {this.port = 443, this.options = const ChannelOptions.secure()}); /// Shuts down this channel. /// @@ -158,7 +355,7 @@ class ClientChannel { Future shutdown() { if (_isShutdown) return new Future.value(); _isShutdown = true; - return Future.wait(_connections.map((c) => c.shutdown())); + return _connection.shutdown(); } /// Terminates this channel. @@ -167,42 +364,25 @@ class ClientChannel { /// on this channel. Future terminate() { _isShutdown = true; - return Future.wait(_connections.map((c) => c.terminate())); + return _connection.terminate(); } /// Returns a connection to this [Channel]'s RPC endpoint. /// /// The connection may be shared between multiple RPCs. - Future connect() async { - _shutdownCheck(); - final securityContext = options.securityContext; - - var socket = await Socket.connect(host, port); - _shutdownCheck(socket.destroy); - if (securityContext != null) { - socket = await SecureSocket.secure(socket, - host: authority, context: securityContext); - _shutdownCheck(socket.destroy); - } - final connection = - new ClientConnection(new ClientTransportConnection.viaSocket(socket)); - _connections.add(connection); - return connection; + Future getConnection() async { + if (_isShutdown) throw new GrpcError.unavailable('Channel shutting down.'); + return _connection ??= new ClientConnection(host, port, options); } /// Initiates a new RPC on this connection. ClientCall createCall( ClientMethod method, Stream requests, CallOptions options) { - final call = new ClientCall(method, requests, options.timeout); - connect().then((connection) { - // TODO(jakobr): Check if deadline is exceeded. + final call = new ClientCall(method, requests, options); + getConnection().then((connection) { if (call._isCancelled) return; - final stream = connection.sendRequest( - this.options._useTls, authority, method.path, options); - call._onConnectedStream(stream); - }, onError: (error) { - call._onConnectError(error); - }); + connection.dispatchCall(call); + }, onError: call._onConnectError); return call; } } @@ -263,6 +443,7 @@ class Client { class ClientCall implements Response { final ClientMethod _method; final Stream _requests; + final CallOptions options; final _headers = new Completer>(); final _trailers = new Completer>(); @@ -278,13 +459,15 @@ class ClientCall implements Response { bool _isCancelled = false; Timer _timeoutTimer; - ClientCall(this._method, this._requests, Duration timeout) { + ClientCall(this._method, this._requests, this.options) { _responses = new StreamController(onListen: _onResponseListen); - if (timeout != null) { - _timeoutTimer = new Timer(timeout, _onTimedOut); + if (options.timeout != null) { + _timeoutTimer = new Timer(options.timeout, _onTimedOut); } } + String get path => _method.path; + void _onConnectError(error) { if (!_responses.isClosed) { _responses diff --git a/lib/src/server.dart b/lib/src/server.dart index cf51a2f2..f80da365 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -112,6 +112,8 @@ class Server { server.listen((socket) { final connection = new ServerTransportConnection.viaSocket(socket); _connections.add(connection); + // TODO(jakobr): Set active state handlers, close connection after idle + // timeout. connection.incomingStreams.listen(serveStream, onError: (error) { print('Connection error: $error'); }, onDone: () { diff --git a/lib/src/shared.dart b/lib/src/shared.dart index 0959fa49..c8b6ee0c 100644 --- a/lib/src/shared.dart +++ b/lib/src/shared.dart @@ -124,10 +124,12 @@ abstract class _ResponseMixin implements Response { Future cancel() => _call.cancel(); } +const supportedAlpnProtocols = const ['grpc-exp', 'h2']; + // TODO: Simplify once we have a stable Dart 1.25 release (update pubspec to // require SDK >=1.25.0, and remove check for alpnSupported). SecurityContext createSecurityContext(bool isServer) => SecurityContext.alpnSupported ? (new SecurityContext() - ..setAlpnProtocols(['grpc-exp', 'h2'], isServer)) + ..setAlpnProtocols(supportedAlpnProtocols, isServer)) : new SecurityContext(); diff --git a/lib/src/streams.dart b/lib/src/streams.dart index 526b9cc2..fb68ce0d 100644 --- a/lib/src/streams.dart +++ b/lib/src/streams.dart @@ -148,6 +148,7 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink { // TODO(jakobr): Handle duplicate header names correctly. headers[ASCII.decode(header.name)] = ASCII.decode(header.value); } + // TODO(jakobr): Check :status, go to error mode if not 2xx. _out.add(new GrpcMetadata(headers)); } diff --git a/test/client_test.dart b/test/client_test.dart index f1b2e437..fa4c4660 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -4,10 +4,9 @@ import 'dart:async'; -import 'package:grpc/src/status.dart'; +import 'package:grpc/grpc.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; -import 'package:mockito/mockito.dart'; import 'src/client_utils.dart'; import 'src/utils.dart'; @@ -275,15 +274,6 @@ void main() { ); }); - test('Connection errors are reported', () async { - harness.channel.connectionError = 'Connection error'; - final expectedError = - new GrpcError.unavailable('Error connecting: Connection error'); - harness.expectThrows(harness.client.unary(dummyValue), expectedError); - harness.expectThrows( - harness.client.serverStreaming(dummyValue).toList(), expectedError); - }); - test('Known request errors are reported', () async { final expectedException = new GrpcError.deadlineExceeded('Too late!'); @@ -310,4 +300,85 @@ void main() { expectDone: false, ); }); + + Future makeUnaryCall() async { + void handleRequest(StreamMessage message) { + harness + ..sendResponseHeader() + ..sendResponseValue(1) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(1), + expectedResult: 1, + expectedPath: '/Test/Unary', + serverHandlers: [handleRequest], + ); + } + + test('Reconnect on connection error', () async { + final connectionStates = []; + harness.connection.connectionError = 'Connection error'; + int failureCount = 0; + harness.connection.onStateChanged = (connection) { + final state = connection.state; + connectionStates.add(state); + if (state == ConnectionState.transientFailure) failureCount++; + if (failureCount == 2) { + harness.connection.connectionError = null; + } + }; + + await makeUnaryCall(); + + expect(failureCount, 2); + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.transientFailure, + ConnectionState.connecting, + ConnectionState.transientFailure, + ConnectionState.connecting, + ConnectionState.ready + ]); + }); + + test('Connections time out if idle', () async { + final done = new Completer(); + final connectionStates = []; + harness.connection.onStateChanged = (connection) { + final state = connection.state; + connectionStates.add(state); + if (state == ConnectionState.idle) done.complete(); + }; + + harness.channelOptions.idleTimeout = const Duration(microseconds: 10); + + await makeUnaryCall(); + harness.signalIdle(); + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.ready]); + await done.future; + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.ready, + ConnectionState.idle + ]); + }); + + test('Default reconnect backoff backs off', () { + Duration lastBackoff = defaultBackoffStrategy(null); + expect(lastBackoff, const Duration(seconds: 1)); + for (int i = 0; i < 12; i++) { + final minNext = lastBackoff * (1.6 - 0.2); + final maxNext = lastBackoff * (1.6 + 0.2); + lastBackoff = defaultBackoffStrategy(lastBackoff); + if (lastBackoff != const Duration(minutes: 2)) { + expect(lastBackoff, greaterThanOrEqualTo(minNext)); + expect(lastBackoff, lessThanOrEqualTo(maxNext)); + } + } + expect(lastBackoff, const Duration(minutes: 2)); + expect(defaultBackoffStrategy(lastBackoff), const Duration(minutes: 2)); + }); } diff --git a/test/options_test.dart b/test/options_test.dart index 6f32d019..f103008f 100644 --- a/test/options_test.dart +++ b/test/options_test.dart @@ -26,9 +26,6 @@ void main() { final correctPassword = new ChannelOptions.secure( certificate: certificate, password: 'correct'); expect(correctPassword.securityContext, isNotNull); - - final channel = new ClientChannel('localhost', options: missingPassword); - expect(channel.connect(), throwsA(isTlsException)); }); }); } diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index d5d8fbc3..cb00f344 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'dart:io'; import 'package:grpc/src/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; @@ -17,20 +18,42 @@ class MockTransport extends Mock implements ClientTransportConnection {} class MockStream extends Mock implements ClientTransportStream {} -class MockChannel extends ClientChannel { - final ClientConnection connection; +class FakeConnection extends ClientConnection { + final ClientTransportConnection transport; var connectionError; - MockChannel(String host, this.connection) : super(host); + FakeConnection(String host, this.transport, ChannelOptions options) + : super(host, 443, options); @override - Future connect() async { + Future connectTransport() async { if (connectionError != null) throw connectionError; - return connection; + return transport; } } +Duration testBackoff(Duration lastBackoff) => const Duration(milliseconds: 1); + +class FakeChannelOptions implements ChannelOptions { + String authority; + Duration idleTimeout = const Duration(seconds: 1); + BackoffStrategy backoffStrategy = testBackoff; + SecurityContext securityContext = new SecurityContext(); + bool isSecure = true; +} + +class FakeChannel extends ClientChannel { + final ClientConnection connection; + final FakeChannelOptions options; + + FakeChannel(String host, this.connection, this.options) + : super(host, options: options); + + @override + Future getConnection() async => connection; +} + typedef ServerMessageHandler = void Function(StreamMessage message); class TestClient extends Client { @@ -74,7 +97,9 @@ class TestClient extends Client { class ClientHarness { MockTransport transport; - MockChannel channel; + FakeConnection connection; + FakeChannel channel; + FakeChannelOptions channelOptions; MockStream stream; StreamController fromClient; @@ -84,11 +109,14 @@ class ClientHarness { void setUp() { transport = new MockTransport(); - channel = new MockChannel('test', new ClientConnection(transport)); + channelOptions = new FakeChannelOptions(); + connection = new FakeConnection('test', transport, channelOptions); + channel = new FakeChannel('test', connection, channelOptions); stream = new MockStream(); fromClient = new StreamController(); toClient = new StreamController(); when(transport.makeRequest(any)).thenReturn(stream); + when(transport.onActiveStateChanged = captureAny).thenReturn(null); when(stream.outgoingMessages).thenReturn(fromClient.sink); when(stream.incomingMessages).thenReturn(toClient.stream); client = new TestClient(channel); @@ -114,6 +142,13 @@ class ClientHarness { if (closeStream) toClient.close(); } + void signalIdle() { + final ActiveStateHandler handler = + verify(transport.onActiveStateChanged = captureAny).captured.single; + expect(handler, isNotNull); + handler(false); + } + Future runTest( {Future clientCall, dynamic expectedResult,