Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support grpc-web in pure dart #287

Merged
merged 17 commits into from May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 50 additions & 70 deletions lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
// limitations under the License.

import 'dart:async';
import 'dart:html';
import 'dart:typed_data';

import 'package:http/http.dart';
import 'package:meta/meta.dart';

import '../../client/call.dart';
Expand All @@ -27,10 +27,10 @@ import 'transport.dart';
import 'web_streams.dart';

class XhrTransportStream implements GrpcTransportStream {
final HttpRequest _request;
final Client _client;
final Request _request;
final ErrorHandler _onError;
final Function(XhrTransportStream stream) _onDone;
int _requestBytesRead = 0;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
Expand All @@ -41,53 +41,34 @@ class XhrTransportStream implements GrpcTransportStream {
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;

XhrTransportStream(this._request, {onError, onDone})
XhrTransportStream(this._client, this._request, {onError, onDone})
: _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);

_request.onReadyStateChange.listen((data) {
if (_incomingMessages.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
if (_request.status != 200) {
_onError(GrpcError.unavailable(
'XhrConnection status ${_request.status}'));
} else {
_close();
}
break;
}
});

_request.onError.listen((ProgressEvent event) {
final asyncOnError = (e, st) {
if (_incomingMessages.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'));
terminate();
});

_request.onProgress.listen((_) {
if (_incomingMessages.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
responseString.substring(_requestBytesRead).codeUnits)
.buffer;
_requestBytesRead = responseString.length;
_incomingProcessor.add(bytes);
});
};
_outgoingMessages.stream.map(frame).listen((data) {
_request.bodyBytes = data;
var firstMessage = true;
_client.send(_request).then((response) {
if (_incomingMessages.isClosed) {
return;
}
if (firstMessage) {
if (!_onHeadersReceived(response)) {
return;
}
}
firstMessage = false;
response.stream.listen((data) {
_incomingProcessor.add(Uint8List.fromList(data).buffer);
}, onDone: _close);
}).catchError(asyncOnError);
}, cancelOnError: true, onError: asyncOnError);

_incomingProcessor.stream
.transform(GrpcWebDecoder())
Expand All @@ -96,30 +77,27 @@ class XhrTransportStream implements GrpcTransportStream {
onError: _onError, onDone: _incomingMessages.close);
}

_onHeadersReceived() {
final contentType = _request.getResponseHeader('Content-Type');
if (_request.status != 200) {
bool _onHeadersReceived(StreamedResponse response) {
final contentType = response.headers['content-type'];
if (response.statusCode != 200) {
_onError(
GrpcError.unavailable('XhrConnection status ${_request.status}'));
return;
GrpcError.unavailable('XhrConnection status ${response.statusCode}'));
return false;
}
if (contentType == null) {
_onError(GrpcError.unavailable('XhrConnection missing Content-Type'));
return;
return false;
}
if (!contentType.startsWith('application/grpc')) {
_onError(
GrpcError.unavailable('XhrConnection bad Content-Type $contentType'));
return;
}
if (_request.response == null) {
_onError(GrpcError.unavailable('XhrConnection request null response'));
return;
return false;
}

// Force a metadata message with headers.
final headers = GrpcMetadata(_request.responseHeaders);
final headers = GrpcMetadata(response.headers);
_incomingMessages.add(headers);
return true;
}

_close() {
Expand All @@ -131,45 +109,47 @@ class XhrTransportStream implements GrpcTransportStream {
@override
Future<void> terminate() async {
_close();
_request.abort();
}
}

class XhrClientConnection extends ClientConnection {
final Uri uri;
Client _client;

final Set<XhrTransportStream> _requests = Set<XhrTransportStream>();

XhrClientConnection(this.uri);
XhrClientConnection(this.uri) {
_client = createClient();
}

String get authority => uri.authority;
String get scheme => uri.scheme;

void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
void _initializeRequest(Request request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]);
request.headers[header] = metadata[header];
}
request.setRequestHeader('Content-Type', 'application/grpc-web+proto');
request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1');
request.setRequestHeader('X-Grpc-Web', '1');
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
request.headers['Content-Type'] = 'application/grpc-web+proto';
request.headers['X-User-Agent'] = 'grpc-web-dart/0.1';
request.headers['X-Grpc-Web'] = '1';
}

@visibleForTesting
HttpRequest createHttpRequest() => HttpRequest();
Request createHttpRequest(String path) => Request('POST', uri.resolve(path));

@visibleForTesting
Client createClient() => Client();

@override
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) {
final HttpRequest request = createHttpRequest();
request.open('POST', uri.resolve(path).toString());
final Request request = createHttpRequest(path);

_initializeRequest(request, metadata);

final XhrTransportStream transportStream =
XhrTransportStream(request, onError: onError, onDone: _removeStream);
final XhrTransportStream transportStream = XhrTransportStream(
_client, request,
onError: onError, onDone: _removeStream);
_requests.add(transportStream);
return transportStream;
}
Expand Down
Loading