From bdecb9ea3b61bad1fb6f8526eb6dfd8d616b6d0b Mon Sep 17 00:00:00 2001 From: Robson Araujo Date: Tue, 24 Mar 2020 16:46:36 -0700 Subject: [PATCH] Support grpc-web in pure dart --- lib/src/client/transport/xhr_transport.dart | 120 ++++++++------------ 1 file changed, 47 insertions(+), 73 deletions(-) diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart index ad89caf4..c0c3c274 100644 --- a/lib/src/client/transport/xhr_transport.dart +++ b/lib/src/client/transport/xhr_transport.dart @@ -14,10 +14,10 @@ // limitations under the License. import 'dart:async'; -import 'dart:html'; import 'dart:typed_data'; import 'package:meta/meta.dart'; +import 'package:http/http.dart'; import '../../client/call.dart'; import '../../shared/message.dart'; @@ -27,10 +27,10 @@ import 'transport.dart'; import 'web_streams.dart'; class XhrTransportStream implements GrpcTransportStream { - final HttpRequest _request; + final Client _client; + final Request _request; final ErrorHandler _onError; final Function(XhrTransportStream stream) _onDone; - int _requestBytesRead = 0; final StreamController _incomingProcessor = StreamController(); final StreamController _incomingMessages = StreamController(); final StreamController> _outgoingMessages = StreamController(); @@ -41,53 +41,33 @@ class XhrTransportStream implements GrpcTransportStream { @override StreamSink> get outgoingMessages => _outgoingMessages.sink; - XhrTransportStream(this._request, {onError, onDone}) + XhrTransportStream(this._client, this._request, {onError, onDone}) : _onError = onError, _onDone = onDone { - _outgoingMessages.stream - .map(frame) - .listen((data) => _request.send(data), cancelOnError: true); - - _request.onReadyStateChange.listen((data) { - if (_incomingMessages.isClosed) { - return; - } - switch (_request.readyState) { - case HttpRequest.HEADERS_RECEIVED: - _onHeadersReceived(); - break; - case HttpRequest.DONE: - if (_request.status != 200) { - _onError(GrpcError.unavailable( - 'XhrConnection status ${_request.status}')); - } else { - _close(); + _outgoingMessages.stream.map(frame).listen( + (data) { + _request.bodyBytes = data; + _client.send(_request).then((response) { + if (_incomingMessages.isClosed) { + return; + } + if (!_onHeadersReceived(response)) { + return; + } + print(response.headers); + response.stream.listen((data) { + _incomingProcessor.add(Uint8List.fromList(data).buffer); + }, onDone: _close); + }); + }, + cancelOnError: true, + onError: (e, st) { + if (_incomingMessages.isClosed) { + return; } - break; - } - }); - - _request.onError.listen((ProgressEvent event) { - if (_incomingMessages.isClosed) { - return; - } - _onError(GrpcError.unavailable('XhrConnection connection-error')); - terminate(); - }); - - _request.onProgress.listen((_) { - if (_incomingMessages.isClosed) { - return; - } - // Use response over responseText as most browsers don't support - // using responseText during an onProgress event. - final responseString = _request.response as String; - final bytes = Uint8List.fromList( - responseString.substring(_requestBytesRead).codeUnits) - .buffer; - _requestBytesRead = responseString.length; - _incomingProcessor.add(bytes); - }); + _onError(GrpcError.unavailable('XhrConnection connection-error')); + terminate(); + }); _incomingProcessor.stream .transform(GrpcWebDecoder()) @@ -96,30 +76,27 @@ class XhrTransportStream implements GrpcTransportStream { onError: _onError, onDone: _incomingMessages.close); } - _onHeadersReceived() { - final contentType = _request.getResponseHeader('Content-Type'); - if (_request.status != 200) { + bool _onHeadersReceived(StreamedResponse response) { + final contentType = response.headers['content-type']; + if (response.statusCode != 200) { _onError( - GrpcError.unavailable('XhrConnection status ${_request.status}')); - return; + GrpcError.unavailable('XhrConnection status ${response.statusCode}')); + return false; } if (contentType == null) { _onError(GrpcError.unavailable('XhrConnection missing Content-Type')); - return; + return false; } if (!contentType.startsWith('application/grpc')) { _onError( GrpcError.unavailable('XhrConnection bad Content-Type $contentType')); - return; - } - if (_request.response == null) { - _onError(GrpcError.unavailable('XhrConnection request null response')); - return; + return false; } // Force a metadata message with headers. - final headers = GrpcMetadata(_request.responseHeaders); + final headers = GrpcMetadata(response.headers); _incomingMessages.add(headers); + return true; } _close() { @@ -131,12 +108,12 @@ class XhrTransportStream implements GrpcTransportStream { @override Future terminate() async { _close(); - _request.abort(); } } class XhrClientConnection extends ClientConnection { final Uri uri; + final _client = Client(); final Set _requests = Set(); @@ -145,31 +122,28 @@ class XhrClientConnection extends ClientConnection { String get authority => uri.authority; String get scheme => uri.scheme; - void _initializeRequest(HttpRequest request, Map metadata) { + void _initializeRequest(Request request, Map metadata) { for (final header in metadata.keys) { - request.setRequestHeader(header, metadata[header]); + request.headers[header] = metadata[header]; } - request.setRequestHeader('Content-Type', 'application/grpc-web+proto'); - request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'); - request.setRequestHeader('X-Grpc-Web', '1'); - // Overriding the mimetype allows us to stream and parse the data - request.overrideMimeType('text/plain; charset=x-user-defined'); - request.responseType = 'text'; + request.headers['Content-Type'] = 'application/grpc-web+proto'; + request.headers['X-User-Agent'] = 'grpc-web-dart/0.1'; + request.headers['X-Grpc-Web'] = '1'; } @visibleForTesting - HttpRequest createHttpRequest() => HttpRequest(); + Request createHttpRequest(String path) => Request('POST', uri.resolve(path)); @override GrpcTransportStream makeRequest(String path, Duration timeout, Map metadata, ErrorHandler onError) { - final HttpRequest request = createHttpRequest(); - request.open('POST', uri.resolve(path).toString()); + final Request request = createHttpRequest(path); _initializeRequest(request, metadata); - final XhrTransportStream transportStream = - XhrTransportStream(request, onError: onError, onDone: _removeStream); + final XhrTransportStream transportStream = XhrTransportStream( + _client, request, + onError: onError, onDone: _removeStream); _requests.add(transportStream); return transportStream; }