From 6c16fceb2a1d6c153e2432d42f21553dcf500766 Mon Sep 17 00:00:00 2001 From: Daniel Brauner <44034965+LeFrosch@users.noreply.github.com> Date: Mon, 22 Mar 2021 17:22:32 +0100 Subject: [PATCH] Be more resilient to broken deployments (#460) Require 200 HTTP status and a supported Content-Type header to be present in a response. When handling malformed responses make effort to translate HTTP statuses into gRPC statuses as gRPC protocol specification recommends. Fixes #421 Fixes #458 Co-authored-by: Vyacheslav Egorov --- CHANGELOG.md | 6 + lib/src/client/call.dart | 48 +----- lib/src/client/transport/http2_transport.dart | 2 +- lib/src/client/transport/xhr_transport.dart | 49 ++---- lib/src/shared/status.dart | 160 +++++++++++++++++- lib/src/shared/streams.dart | 29 +++- test/client_tests/client_test.dart | 78 +++++++++ .../client_transport_connector_test.dart | 2 + .../client_xhr_transport_test.dart | 50 +++--- test/grpc_web_server.dart | 80 +++++++-- test/grpc_web_test.dart | 50 +++++- test/src/client_utils.dart | 25 ++- test/stream_test.dart | 2 +- 13 files changed, 448 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4896e876..85ae54ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ ## 3.0.1-dev * Require `package:googleapis_auth` `^1.1.0` +* Fix issues [#421](https://github.com/grpc/grpc-dart/issues/421) and + [#458](https://github.com/grpc/grpc-dart/issues/458). Validate + responses according to gRPC/gRPC-Web protocol specifications: require + 200 HTTP status and a supported `Content-Type` header to be present, as well + as `grpc-status: 0` header. When handling malformed responses make effort + to translate HTTP statuses into gRPC statuses. ## 3.0.0 diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index 156462bd..aaae6046 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -14,13 +14,8 @@ // limitations under the License. import 'dart:async'; -import 'dart:convert'; import 'dart:developer'; -import 'package:grpc/src/generated/google/rpc/status.pb.dart'; -import 'package:meta/meta.dart'; -import 'package:protobuf/protobuf.dart'; - import '../shared/codec.dart'; import '../shared/message.dart'; import '../shared/profiler.dart'; @@ -38,7 +33,6 @@ const _reservedHeaders = [ 'grpc-encoding', 'user-agent', ]; -const _statusDetailsHeader = 'grpc-status-details-bin'; /// Provides per-RPC metadata. /// @@ -343,23 +337,11 @@ class ClientCall implements Response { _stream!.terminate(); } - /// If there's an error status then process it as a response error - void _checkForErrorStatus(Map metadata) { - final status = metadata['grpc-status']; - final statusCode = int.parse(status ?? '0'); - - if (statusCode != 0) { - final messageMetadata = metadata['grpc-message']; - final message = - messageMetadata == null ? null : Uri.decodeFull(messageMetadata); - - final statusDetails = metadata[_statusDetailsHeader]; - _responseError(GrpcError.custom( - statusCode, - message, - statusDetails == null - ? const [] - : decodeStatusDetails(statusDetails))); + /// If there's an error status then process it as a response error. + void _checkForErrorStatus(Map trailers) { + final error = grpcErrorFromTrailers(trailers); + if (error != null) { + _responseError(error); } } @@ -512,23 +494,3 @@ class ClientCall implements Response { } catch (_) {} } } - -/// Given a string of base64url data, attempt to parse a Status object from it. -/// Once parsed, it will then map each detail item and attempt to parse it into -/// its respective GeneratedMessage type, returning the list of parsed detail items -/// as a `List`. -/// -/// Prior to creating the Status object we pad the data to ensure its length is -/// an even multiple of 4, which is a requirement in Dart when decoding base64url data. -/// -/// If any errors are thrown during decoding/parsing, it will return an empty list. -@visibleForTesting -List decodeStatusDetails(String data) { - try { - final parsedStatus = Status.fromBuffer( - base64Url.decode(data.padRight((data.length + 3) & ~3, '='))); - return parsedStatus.details.map(parseErrorDetailsFromAny).toList(); - } catch (e) { - return []; - } -} diff --git a/lib/src/client/transport/http2_transport.dart b/lib/src/client/transport/http2_transport.dart index 83fae12d..a55c5396 100644 --- a/lib/src/client/transport/http2_transport.dart +++ b/lib/src/client/transport/http2_transport.dart @@ -39,7 +39,7 @@ class Http2TransportStream extends GrpcTransportStream { CodecRegistry? codecRegistry, Codec? compression, ) : incomingMessages = _transportStream.incomingMessages - .transform(GrpcHttpDecoder()) + .transform(GrpcHttpDecoder(forResponse: true)) .transform(grpcDecompressor(codecRegistry: codecRegistry)) { _outgoingMessages.stream .map((payload) => frame(payload, compression)) diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart index c92fe82c..41a5845d 100644 --- a/lib/src/client/transport/xhr_transport.dart +++ b/lib/src/client/transport/xhr_transport.dart @@ -29,17 +29,11 @@ import 'web_streams.dart'; const _contentTypeKey = 'Content-Type'; -/// All accepted content-type header's prefix. -const _validContentTypePrefix = [ - 'application/grpc', - 'application/json+protobuf', - 'application/x-protobuf' -]; - class XhrTransportStream implements GrpcTransportStream { final HttpRequest _request; final ErrorHandler _onError; final Function(XhrTransportStream stream) _onDone; + bool _headersReceived = false; int _requestBytesRead = 0; final StreamController _incomingProcessor = StreamController(); final StreamController _incomingMessages = StreamController(); @@ -104,37 +98,28 @@ class XhrTransportStream implements GrpcTransportStream { onError: _onError, onDone: _incomingMessages.close); } - bool _checkContentType(String contentType) { - return _validContentTypePrefix.any(contentType.startsWith); + bool _validateResponseState() { + try { + validateHttpStatusAndContentType( + _request.status, _request.responseHeaders, + rawResponse: _request.responseText); + return true; + } catch (e, st) { + _onError(e, st); + return false; + } } void _onHeadersReceived() { - // Force a metadata message with headers. - final headers = GrpcMetadata(_request.responseHeaders); - _incomingMessages.add(headers); + _headersReceived = true; + if (!_validateResponseState()) { + return; + } + _incomingMessages.add(GrpcMetadata(_request.responseHeaders)); } void _onRequestDone() { - final contentType = _request.getResponseHeader(_contentTypeKey); - if (_request.status != 200) { - _onError( - GrpcError.unavailable('XhrConnection status ${_request.status}', null, - _request.responseText), - StackTrace.current); - return; - } - if (contentType == null) { - _onError( - GrpcError.unavailable('XhrConnection missing Content-Type', null, - _request.responseText), - StackTrace.current); - return; - } - if (!_checkContentType(contentType)) { - _onError( - GrpcError.unavailable('XhrConnection bad Content-Type $contentType', - null, _request.responseText), - StackTrace.current); + if (!_headersReceived && !_validateResponseState()) { return; } if (_request.response == null) { diff --git a/lib/src/shared/status.dart b/lib/src/shared/status.dart index f4fc8bc9..537b41f9 100644 --- a/lib/src/shared/status.dart +++ b/lib/src/shared/status.dart @@ -13,10 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'dart:convert'; +import 'dart:io' show HttpStatus; + +import 'package:meta/meta.dart'; +import 'package:protobuf/protobuf.dart'; + import 'package:grpc/src/generated/google/protobuf/any.pb.dart'; import 'package:grpc/src/generated/google/rpc/code.pbenum.dart'; import 'package:grpc/src/generated/google/rpc/error_details.pb.dart'; -import 'package:protobuf/protobuf.dart'; +import 'package:grpc/src/generated/google/rpc/status.pb.dart'; class StatusCode { /// The operation completed successfully. @@ -120,6 +126,29 @@ class StatusCode { /// The request does not have valid authentication credentials for the /// operation. static const unauthenticated = 16; + + /// Mapping taken from gRPC-Web JS implementation: + /// https://github.com/grpc/grpc-web/blob/master/javascript/net/grpc/web/statuscode.js + static const _httpStatusToGrpcStatus = { + HttpStatus.ok: StatusCode.ok, + HttpStatus.badRequest: StatusCode.invalidArgument, + HttpStatus.unauthorized: StatusCode.unauthenticated, + HttpStatus.forbidden: StatusCode.permissionDenied, + HttpStatus.notFound: StatusCode.notFound, + HttpStatus.conflict: StatusCode.aborted, + HttpStatus.preconditionFailed: StatusCode.failedPrecondition, + HttpStatus.tooManyRequests: StatusCode.resourceExhausted, + HttpStatus.clientClosedRequest: StatusCode.cancelled, + HttpStatus.internalServerError: StatusCode.unknown, + HttpStatus.notImplemented: StatusCode.unimplemented, + HttpStatus.serviceUnavailable: StatusCode.unavailable, + HttpStatus.gatewayTimeout: StatusCode.deadlineExceeded, + }; + + /// Creates a gRPC Status code from a HTTP Status code + static int fromHttpStatus(int status) { + return _httpStatusToGrpcStatus[status] ?? StatusCode.unknown; + } } class GrpcError implements Exception { @@ -309,3 +338,132 @@ GeneratedMessage parseErrorDetailsFromAny(Any any) { return any; } } + +/// Validate HTTP status and Content-Type which arrived with the response: +/// reject reponses with non-ok (200) status or unsupported Content-Type. +/// +/// Note that grpc-status arrives in trailers and will be handled by +/// [ClientCall._onResponseData]. +/// +/// gRPC over HTTP2 protocol specification mandates the following: +/// +/// Implementations should expect broken deployments to send non-200 HTTP +/// status codes in responses as well as a variety of non-GRPC content-types +/// and to omit Status & Status-Message. Implementations must synthesize a +/// Status & Status-Message to propagate to the application layer when this +/// occurs. +/// +void validateHttpStatusAndContentType( + int? httpStatus, Map headers, + {Object? rawResponse}) { + if (httpStatus == null) { + throw GrpcError.unknown( + 'HTTP response status is unknown', null, rawResponse); + } + + if (httpStatus == 0) { + throw GrpcError.unknown( + 'HTTP request completed without a status (potential CORS issue)', + null, + rawResponse); + } + + final status = StatusCode.fromHttpStatus(httpStatus); + if (status != StatusCode.ok) { + // [httpStatus] itself already indicates an error. Check if we also + // received grpc-status/message (i.e. this is a Trailers-Only response) + // and use this information to report a better error to the application + // layer. However prefer to use status code derived from HTTP status + // if grpc-status itself does not provide an informative error. + final error = grpcErrorFromTrailers(headers); + if (error == null || error.code == StatusCode.unknown) { + throw GrpcError.custom( + status, + error?.message ?? + 'HTTP connection completed with ${httpStatus} instead of 200', + error?.details, + rawResponse); + } + throw error; + } + + final contentType = headers['content-type']; + if (contentType == null) { + throw GrpcError.unknown('missing content-type header', null, rawResponse); + } + + // Check if content-type header indicates a supported format. + if (!_validContentTypePrefix.any(contentType.startsWith)) { + throw GrpcError.unknown( + 'unsupported content-type (${contentType})', null, rawResponse); + } +} + +GrpcError? grpcErrorFromTrailers(Map trailers) { + final status = trailers['grpc-status']; + final statusCode = status != null ? int.parse(status) : StatusCode.unknown; + + if (statusCode != StatusCode.ok) { + final message = _tryDecodeStatusMessage(trailers['grpc-message']); + final statusDetails = trailers[_statusDetailsHeader]; + return GrpcError.custom( + statusCode, + message, + statusDetails == null + ? const [] + : decodeStatusDetails(statusDetails)); + } + + return null; +} + +const _statusDetailsHeader = 'grpc-status-details-bin'; + +/// All accepted content-type header's prefix. We are being more permissive +/// then gRPC and gRPC-Web specifications because some of the services +/// return slightly different content-types. +const _validContentTypePrefix = [ + 'application/grpc', + 'application/json+protobuf', + 'application/x-protobuf' +]; + +/// Given a string of base64url data, attempt to parse a Status object from it. +/// Once parsed, it will then map each detail item and attempt to parse it into +/// its respective GeneratedMessage type, returning the list of parsed detail items +/// as a `List`. +/// +/// Prior to creating the Status object we pad the data to ensure its length is +/// an even multiple of 4, which is a requirement in Dart when decoding base64url data. +/// +/// If any errors are thrown during decoding/parsing, it will return an empty list. +@visibleForTesting +List decodeStatusDetails(String data) { + try { + final parsedStatus = Status.fromBuffer( + base64Url.decode(data.padRight((data.length + 3) & ~3, '='))); + return parsedStatus.details.map(parseErrorDetailsFromAny).toList(); + } catch (e) { + return []; + } +} + +/// Decode percent encoded status message contained in 'grpc-message' trailer. +String? _tryDecodeStatusMessage(String? statusMessage) { + if (statusMessage == null) { + return statusMessage; + } + + try { + return Uri.decodeFull(statusMessage); + } catch (_) { + // gRPC over HTTP2 protocol specification mandates: + // + // When decoding invalid values, implementations MUST NOT error or throw + // away the message. At worst, the implementation can abort decoding the + // status message altogether such that the user would received the raw + // percent-encoded form. + // + return statusMessage; + } +} diff --git a/lib/src/shared/streams.dart b/lib/src/shared/streams.dart index cfd58695..6a8e3b79 100644 --- a/lib/src/shared/streams.dart +++ b/lib/src/shared/streams.dart @@ -39,6 +39,11 @@ class GrpcHttpEncoder extends Converter { } class GrpcHttpDecoder extends Converter { + /// [true] if this decoder is used for decoding responses. + final bool forResponse; + + GrpcHttpDecoder({this.forResponse = false}); + @override GrpcMessage convert(StreamMessage input) { final sink = GrpcMessageSink(); @@ -50,18 +55,21 @@ class GrpcHttpDecoder extends Converter { @override Sink startChunkedConversion(Sink sink) { - return _GrpcMessageConversionSink(sink); + return _GrpcMessageConversionSink(sink, forResponse); } } class _GrpcMessageConversionSink extends ChunkedConversionSink { final Sink _out; + final bool _forResponse; final _dataHeader = Uint8List(5); Uint8List? _data; int _dataOffset = 0; - _GrpcMessageConversionSink(this._out); + bool _headersReceived = false; + + _GrpcMessageConversionSink(this._out, this._forResponse); void _addData(DataStreamMessage chunk) { final chunkData = chunk.bytes; @@ -117,7 +125,22 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink { // TODO(jakobr): Handle duplicate header names correctly. headers[ascii.decode(header.name)] = ascii.decode(header.value); } - // TODO(jakobr): Check :status, go to error mode if not 2xx. + if (!_headersReceived) { + if (_forResponse) { + // Validate :status and content-type header here synchronously before + // attempting to parse subsequent DataStreamMessage. + final httpStatus = headers.containsKey(':status') + ? int.tryParse(headers[':status']!) + : null; + + // Validation might throw an exception. When [GrpcHttpDecoder] is + // used as a [StreamTransformer] the underlying implementation of + // [StreamTransformer.bind] will take care of forwarding this + // exception into the stream as an error. + validateHttpStatusAndContentType(httpStatus, headers); + } + _headersReceived = true; + } _out.add(GrpcMetadata(headers)); } diff --git a/test/client_tests/client_test.dart b/test/client_tests/client_test.dart index 8e9d879c..adacd69e 100644 --- a/test/client_tests/client_test.dart +++ b/test/client_tests/client_test.dart @@ -15,6 +15,7 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:io' show HttpStatus; import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/call.dart'; @@ -272,6 +273,8 @@ void main() { void handleRequest(_) { harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'application/grpc'), Header.ascii('grpc-status', '$customStatusCode'), Header.ascii('grpc-message', customStatusMessage) ], endStream: true)); @@ -286,6 +289,77 @@ void main() { ); }); + test('Call throws if HTTP status indicates an error', () async { + void handleRequest(_) { + harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', HttpStatus.serviceUnavailable.toString()), + Header.ascii('content-type', 'application/grpc'), + ])); + // Send a frame that might be misinterpreted as a length-prefixed proto + // message and cause OOM. + harness.toClient + .add(DataStreamMessage([0, 0xFF, 0xFF, 0xFF, 0xFF], endStream: true)); + harness.toClient.close(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: GrpcError.unavailable( + 'HTTP connection completed with 503 instead of 200'), + serverHandlers: [handleRequest], + ); + }); + + test('Call throws if content-type indicates an error', () async { + void handleRequest(_) { + harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'text/html'), + ])); + // Send a frame that might be misinterpreted as a length-prefixed proto + // message and cause OOM. + harness.toClient.add(DataStreamMessage([0, 0xFF, 0xFF, 0xFF, 0xFF])); + harness.sendResponseTrailer(); + } + + await harness.runFailureTest( + clientCall: harness.client.unary(dummyValue), + expectedException: + GrpcError.unknown('unsupported content-type (text/html)'), + serverHandlers: [handleRequest], + ); + }); + + for (var contentType in [ + 'application/json+protobuf', + 'application/x-protobuf' + ]) { + test('$contentType content type is accepted', () async { + const requestValue = 17; + const responseValue = 19; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), requestValue); + + harness + ..toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', contentType), + ])) + ..sendResponseValue(responseValue) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(requestValue), + expectedResult: responseValue, + expectedPath: '/Test/Unary', + serverHandlers: [handleRequest], + ); + }); + } + test('Call throws decoded message', () async { const customStatusCode = 17; const customStatusMessage = 'エラー'; @@ -293,6 +367,8 @@ void main() { void handleRequest(_) { harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'application/grpc'), Header.ascii('grpc-status', '$customStatusCode'), Header.ascii('grpc-message', encodedCustomStatusMessage) ], endStream: true)); @@ -509,6 +585,8 @@ void main() { void handleRequest(_) { harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'application/grpc'), Header.ascii('grpc-status', code.toString()), Header.ascii('grpc-message', message), Header.ascii('grpc-status-details-bin', details), diff --git a/test/client_tests/client_transport_connector_test.dart b/test/client_tests/client_transport_connector_test.dart index 16f6ab5b..9189f438 100644 --- a/test/client_tests/client_transport_connector_test.dart +++ b/test/client_tests/client_transport_connector_test.dart @@ -243,6 +243,8 @@ void main() { void handleRequest(_) { harness.toClient.add(HeadersStreamMessage([ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'application/grpc'), Header.ascii('grpc-status', '$customStatusCode'), Header.ascii('grpc-message', customStatusMessage) ], endStream: true)); diff --git a/test/client_tests/client_xhr_transport_test.dart b/test/client_tests/client_xhr_transport_test.dart index afd31668..c5ec4234 100644 --- a/test/client_tests/client_xhr_transport_test.dart +++ b/test/client_tests/client_xhr_transport_test.dart @@ -216,7 +216,8 @@ void main() { test('Stream handles headers properly', () async { final responseHeaders = { 'parameter_1': 'value_1', - 'parameter_2': 'value_2' + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', }; final transport = MockXhrClientConnection(); @@ -225,8 +226,6 @@ void main() { (error, _) => fail(error.toString())); when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders); - when(transport.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); when(transport.latestRequest.response) .thenReturn(String.fromCharCodes(frame([]))); @@ -240,16 +239,20 @@ void main() { transport.latestRequest.readyStateChangeController .add(readyStateChangeEvent); - // Should be only one metadata message with headers. + // Should be only one metadata message with headers augmented with :status + // field. final message = await stream.incomingMessages.single as GrpcMetadata; expect(message.metadata, responseHeaders); }); test('Stream handles trailers properly', () async { - final requestHeaders = {'parameter_1': 'value_1'}; + final requestHeaders = { + 'parameter_1': 'value_1', + 'content-type': 'application/grpc+proto', + }; final responseTrailers = { 'trailer_1': 'value_1', - 'trailer_2': 'value_2' + 'trailer_2': 'value_2', }; final connection = MockXhrClientConnection(); @@ -264,9 +267,7 @@ void main() { encodedTrailers[0] = 0x80; // Mark this frame as trailers. final encodedString = String.fromCharCodes(encodedTrailers); - when(connection.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); - when(connection.latestRequest.responseHeaders).thenReturn({}); + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); when(connection.latestRequest.response).thenReturn(encodedString); // Set expectation for request readyState and generate events so that @@ -284,11 +285,15 @@ void main() { final messages = await stream.incomingMessages.whereType().toList(); expect(messages.length, 2); - expect(messages.first.metadata, isEmpty); + expect(messages.first.metadata, requestHeaders); expect(messages.last.metadata, responseTrailers); }); test('Stream handles empty trailers properly', () async { + final requestHeaders = { + 'content-type': 'application/grpc+proto', + }; + final connection = MockXhrClientConnection(); final stream = connection.makeRequest('test_path', Duration(seconds: 10), @@ -298,9 +303,7 @@ void main() { encoded[0] = 0x80; // Mark this frame as trailers. final encodedString = String.fromCharCodes(encoded); - when(connection.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); - when(connection.latestRequest.responseHeaders).thenReturn({}); + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); when(connection.latestRequest.response).thenReturn(encodedString); // Set expectation for request readyState and generate events so that @@ -318,14 +321,15 @@ void main() { final messages = await stream.incomingMessages.whereType().toList(); expect(messages.length, 2); - expect(messages.first.metadata, isEmpty); + expect(messages.first.metadata, requestHeaders); expect(messages.last.metadata, isEmpty); }); test('Stream deserializes data properly', () async { final requestHeaders = { 'parameter_1': 'value_1', - 'parameter_2': 'value_2' + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', }; final connection = MockXhrClientConnection(); @@ -333,9 +337,7 @@ void main() { final stream = connection.makeRequest('test_path', Duration(seconds: 10), requestHeaders, (error, _) => fail(error.toString())); final data = List.filled(10, 224); - when(connection.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); - when(connection.latestRequest.responseHeaders).thenReturn({}); + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); when(connection.latestRequest.response) .thenReturn(String.fromCharCodes(frame(data))); @@ -366,9 +368,8 @@ void main() { errors.add(e as GrpcError); }); const errorDetails = 'error details'; - when(connection.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); - when(connection.latestRequest.responseHeaders).thenReturn({}); + when(connection.latestRequest.responseHeaders) + .thenReturn({'content-type': 'application/grpc+proto'}); when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE); when(connection.latestRequest.responseText).thenReturn(errorDetails); connection.latestRequest.readyStateChangeController @@ -377,10 +378,11 @@ void main() { expect(errors.single.rawResponse, errorDetails); }); - test('Stream recieves multiple messages', () async { + test('Stream receives multiple messages', () async { final metadata = { 'parameter_1': 'value_1', - 'parameter_2': 'value_2' + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', }; final connection = MockXhrClientConnection(); @@ -395,8 +397,6 @@ void main() { final encodedStrings = data.map((d) => String.fromCharCodes(frame(d))).toList(); - when(connection.latestRequest.getResponseHeader('Content-Type')) - .thenReturn('application/grpc+proto'); when(connection.latestRequest.responseHeaders).thenReturn(metadata); when(connection.latestRequest.readyState) .thenReturn(HttpRequest.HEADERS_RECEIVED); diff --git a/test/grpc_web_server.dart b/test/grpc_web_server.dart index e039622f..15e0656d 100644 --- a/test/grpc_web_server.dart +++ b/test/grpc_web_server.dart @@ -1,9 +1,9 @@ -// @dart = 2.3 import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:grpc/grpc.dart'; +import 'package:http2/transport.dart'; import 'package:path/path.dart' as p; import 'package:stream_channel/stream_channel.dart'; @@ -85,24 +85,27 @@ static_resources: port_value: %TARGET_PORT% '''; -Future hybridMain(StreamChannel channel) async { - // Envoy output will be collected and dumped to stdout if envoy exits - // with an error. Otherwise if verbose is specified it will be dumped - // to stdout unconditionally. - final output = []; - void _info(String line) { - if (!verbose) { - output.add(line); - } else { - print(line); - } +// Envoy output will be collected and dumped to stdout if envoy exits +// with an error. Otherwise if verbose is specified it will be dumped +// to stdout unconditionally. +final output = []; +void _info(String line) { + if (!verbose) { + output.add(line); + } else { + print(line); } +} +Future hybridMain(StreamChannel channel) async { // Spawn a gRPC server. final server = Server([EchoService()]); await server.serve(port: 0); _info('grpc server listening on ${server.port}'); + final httpServer = await startHttpServer(); + _info('HTTP server listening on ${httpServer.port}'); + // Create Envoy configuration. final tempDir = await Directory.systemTemp.createTemp(); final config = p.join(tempDir.path, 'config.yaml'); @@ -136,7 +139,8 @@ if you are running tests locally. _info('envoy|stderr] $line'); final m = portRe.firstMatch(line); if (m != null) { - channel.sink.add(int.parse(m[1])); + channel.sink + .add({'grpcPort': int.parse(m[1]!), 'httpPort': httpServer.port}); } }); @@ -164,4 +168,54 @@ if you are running tests locally. tempDir.deleteSync(recursive: true); } channel.sink.add('EXITED'); + + await server.shutdown(); + await httpServer.close(); +} + +final testCases = { + 'test:cors': (rs) { + rs.headers.removeAll('Access-Control-Allow-Origin'); + rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html'); + rs.write('some body'); + rs.close(); + }, + 'test:status-503': (rs) { + rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html'); + rs.statusCode = HttpStatus.serviceUnavailable; + rs.write('some body'); + rs.close(); + }, + 'test:bad-content-type': (rs) { + rs.headers.add(HttpHeaders.contentTypeHeader, 'text/html'); + rs.statusCode = HttpStatus.ok; + rs.write('some body'); + rs.close(); + }, +}; + +void defaultHandler(HttpResponse rs) { + rs.close(); +} + +Future startHttpServer() async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + server.defaultResponseHeaders.removeAll('x-frame-options'); + server.defaultResponseHeaders.removeAll('x-xss-protection'); + server.defaultResponseHeaders.removeAll('x-content-type-options'); + server.defaultResponseHeaders.add('Access-Control-Allow-Origin', '*'); + server.listen((request) async { + _info('${request.method} ${request.requestedUri} ${request.headers}'); + final message = await GrpcHttpDecoder() + .bind(request.map((list) => DataStreamMessage(list))) + .first as GrpcData; + final echoRequest = EchoRequest.fromBuffer(message.data); + (testCases[echoRequest.message] ?? defaultHandler)(request.response); + }); + return server; +} + +Future main() async { + final controller = StreamChannelController(); + await hybridMain(controller.local); } diff --git a/test/grpc_web_test.dart b/test/grpc_web_test.dart index 34c6e216..a89922ca 100644 --- a/test/grpc_web_test.dart +++ b/test/grpc_web_test.dart @@ -25,7 +25,7 @@ void main() { // server (written in Dart) via gRPC-web protocol through a third party // gRPC-web proxy. test('gRPC-web echo test', () async { - final channel = GrpcWebClientChannel.xhr(server.uri); + final channel = GrpcWebClientChannel.xhr(server.grpcUri); final service = EchoServiceClient(channel); const testMessage = 'hello from gRPC-web'; @@ -57,7 +57,7 @@ void main() { // Verify that terminate does not cause an exception when terminating // channel with multiple active requests. test('terminate works', () async { - final channel = GrpcWebClientChannel.xhr(server.uri); + final channel = GrpcWebClientChannel.xhr(server.grpcUri); final service = EchoServiceClient(channel); const testMessage = 'hello from gRPC-web'; @@ -97,7 +97,7 @@ void main() { // Verify that stream cancellation does not cause an exception test('stream cancellation works', () async { - final channel = GrpcWebClientChannel.xhr(server.uri); + final channel = GrpcWebClientChannel.xhr(server.grpcUri); final service = EchoServiceClient(channel); const testMessage = 'hello from gRPC-web'; @@ -116,14 +116,40 @@ void main() { await channel.terminate(); }); + + final invalidResponseTests = { + 'cors': GrpcError.unknown( + 'HTTP request completed without a status (potential CORS issue)'), + 'status-503': GrpcError.unavailable( + 'HTTP connection completed with 503 instead of 200'), + 'bad-content-type': + GrpcError.unknown('unsupported content-type (text/html)'), + }; + + for (var entry in invalidResponseTests.entries) { + // We test a bunch of boundary conditions by starting a simple HTTP server + // we sends various erroneous responses back. The kind of response is + // selected based on the payload of the request (i.e. the server expects + // to get valid gRPC request with an [EchoRequest] payload and responds + // with different errors based on the [EchoRequest.message] value. + // See [startHttpServer] in [grpc_web_server.dart] for the server part. + test('invalid response: ${entry.key}', () async { + final channel = GrpcWebClientChannel.xhr(server.httpUri); + final service = EchoServiceClient(channel, + options: WebCallOptions(bypassCorsPreflight: true)); + expect(() => service.echo(EchoRequest()..message = 'test:${entry.key}'), + throwsA(entry.value)); + }); + } } class GrpcWebServer { final StreamChannel channel; final Future whenExited; - final Uri uri; + final Uri grpcUri; + final Uri httpUri; - GrpcWebServer(this.channel, this.whenExited, this.uri); + GrpcWebServer(this.channel, this.whenExited, this.grpcUri, this.httpUri); Future shutdown() async { channel.sink.add('shutdown'); @@ -142,7 +168,7 @@ class GrpcWebServer { // number we should be talking to. final serverChannel = spawnHybridUri('grpc_web_server.dart', stayAlive: true); - final portCompleter = Completer(); + final portCompleter = Completer(); final exitCompleter = Completer(); serverChannel.stream.listen((event) { if (!portCompleter.isCompleted) { @@ -158,12 +184,18 @@ class GrpcWebServer { } }); - final port = await portCompleter.future; + final ports = await portCompleter.future; + + final grpcPort = ports['grpcPort']; + final httpPort = ports['httpPort']; // Note: we would like to test https as well, but we can't easily do it // because browsers like chrome don't trust self-signed certificates by // default. - return GrpcWebServer(serverChannel, exitCompleter.future, - Uri.parse('http://localhost:$port')); + return GrpcWebServer( + serverChannel, + exitCompleter.future, + Uri.parse('http://localhost:${grpcPort}'), + Uri.parse('http://localhost:${httpPort}')); } } diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index 86ac164f..04e81640 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -204,6 +204,8 @@ abstract class _Harness { Iterable? interceptors; + bool headersWereSent = false; + late TestClient client; base.ClientChannel createChannel(); @@ -233,17 +235,30 @@ abstract class _Harness { toClient.close(); } - void sendResponseHeader({List
headers = const []}) { - toClient.add(HeadersStreamMessage(headers)); + static final _defaultHeaders = [ + Header.ascii(':status', '200'), + Header.ascii('content-type', 'application/grpc'), + ]; + + static final _defaultTrailers = [ + Header.ascii('grpc-status', '0'), + ]; + + void sendResponseHeader() { + assert(!headersWereSent); + headersWereSent = true; + toClient.add(HeadersStreamMessage(_defaultHeaders)); } void sendResponseValue(int value) { toClient.add(DataStreamMessage(frame(mockEncode(value)))); } - void sendResponseTrailer( - {List
headers = const [], bool closeStream = true}) { - toClient.add(HeadersStreamMessage(headers, endStream: true)); + void sendResponseTrailer({bool closeStream = true}) { + toClient.add(HeadersStreamMessage([ + if (!headersWereSent) ..._defaultHeaders, + ..._defaultTrailers, + ], endStream: true)); if (closeStream) toClient.close(); } diff --git a/test/stream_test.dart b/test/stream_test.dart index 00ba1d72..6e33bb6e 100644 --- a/test/stream_test.dart +++ b/test/stream_test.dart @@ -27,7 +27,7 @@ void main() { setUp(() { input = StreamController(); - output = input.stream.transform(GrpcHttpDecoder()); + output = input.stream.transform(GrpcHttpDecoder(forResponse: false)); }); test('converts chunked data correctly', () async {