Skip to content

Commit

Permalink
Dart 2 fixes (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobr-google authored Apr 5, 2018
1 parent 0393703 commit 686ecb3
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 60 deletions.
2 changes: 1 addition & 1 deletion example/googleapis/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:
sdk: '>=1.24.3 <2.0.0'

dependencies:
async: ^1.13.3
async: '>=1.13.3 <3.0.0'
grpc:
path: ../../
protobuf: ^0.7.0
Expand Down
2 changes: 1 addition & 1 deletion example/helloworld/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:
sdk: '>=1.24.3 <2.0.0'

dependencies:
async: ^1.13.3
async: '>=1.13.3 <3.0.0'
grpc:
path: ../../
protobuf: ^0.7.0
Expand Down
2 changes: 1 addition & 1 deletion example/metadata/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:
sdk: '>=1.24.3 <2.0.0'

dependencies:
async: ^1.13.3
async: '>=1.13.3 <3.0.0'
grpc:
path: ../../
protobuf: ^0.7.0
Expand Down
2 changes: 1 addition & 1 deletion example/route_guide/lib/src/common.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final List<Feature> featuresDb = _readDatabase();

List<Feature> _readDatabase() {
final dbData = new File('data/route_guide_db.json').readAsStringSync();
final List<Map<String, dynamic>> db = JSON.decode(dbData);
final List db = JSON.decode(dbData);
return db.map((entry) {
final location = new Point()
..latitude = entry['location']['latitude']
Expand Down
2 changes: 1 addition & 1 deletion example/route_guide/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ environment:
sdk: '>=1.24.3 <2.0.0'

dependencies:
async: ^1.13.3
async: '>=1.13.3 <3.0.0'
grpc:
path: ../../
protobuf: ^0.7.0
Expand Down
3 changes: 2 additions & 1 deletion interop/bin/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ Future<Null> main(List<String> args) async {
tlsCredentials = new ServerTlsCredentials(
certificate: await certificate, privateKey: await privateKey);
}
await server.serve(port: port, security: tlsCredentials);
await server.serve(
address: 'localhost', port: port, security: tlsCredentials);
print('Server listening on port ${server.port}...');
}
2 changes: 1 addition & 1 deletion interop/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ environment:

dependencies:
args: ^0.13.0
async: ^1.13.3
async: '>=1.13.3 <3.0.0'
collection: ^1.14.2
grpc:
path: ../
Expand Down
19 changes: 11 additions & 8 deletions lib/src/auth/auth.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ abstract class BaseAuthenticator {
auth.AccessToken _accessToken;
String _lastUri;

Future authenticate(Map<String, String> metadata, String uri) async {
Future<Null> authenticate(Map<String, String> metadata, String uri) async {
if (uri == null) {
throw new GrpcError.unauthenticated(
'Credentials require secure transport.');
Expand All @@ -54,13 +54,13 @@ abstract class BaseAuthenticator {

CallOptions get toCallOptions => new CallOptions(providers: [authenticate]);

Future obtainAccessCredentials(String uri);
Future<Null> obtainAccessCredentials(String uri);
}

abstract class HttpBasedAuthenticator extends BaseAuthenticator {
Future _call;
Future<Null> _call;

Future obtainAccessCredentials(String uri) {
Future<Null> obtainAccessCredentials(String uri) {
if (_call == null) {
final authClient = new http.Client();
_call = obtainCredentialsWithClient(authClient, uri).then((credentials) {
Expand All @@ -72,11 +72,13 @@ abstract class HttpBasedAuthenticator extends BaseAuthenticator {
return _call;
}

Future obtainCredentialsWithClient(http.Client client, String uri);
Future<auth.AccessCredentials> obtainCredentialsWithClient(
http.Client client, String uri);
}

class ComputeEngineAuthenticator extends HttpBasedAuthenticator {
Future obtainCredentialsWithClient(http.Client client, String uri) =>
Future<auth.AccessCredentials> obtainCredentialsWithClient(
http.Client client, String uri) =>
auth.obtainAccessCredentialsViaMetadataServer(client);
}

Expand All @@ -94,7 +96,8 @@ class ServiceAccountAuthenticator extends HttpBasedAuthenticator {

String get projectId => _projectId;

Future obtainCredentialsWithClient(http.Client client, String uri) =>
Future<auth.AccessCredentials> obtainCredentialsWithClient(
http.Client client, String uri) =>
auth.obtainAccessCredentialsViaServiceAccount(
_serviceAccountCredentials, _scopes, client);
}
Expand All @@ -114,7 +117,7 @@ class JwtServiceAccountAuthenticator extends BaseAuthenticator {

String get projectId => _projectId;

Future obtainAccessCredentials(String uri) async {
Future<Null> obtainAccessCredentials(String uri) async {
_accessToken = _jwtTokenFor(_serviceAccountCredentials, _keyId, uri);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/client/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ClientCall<Q, R> implements Response {
if (options.metadataProviders.isEmpty) {
_sendRequest(connection, _sanitizeMetadata(options.metadata));
} else {
final metadata = new Map.from(options.metadata);
final metadata = new Map<String, String>.from(options.metadata);
String audience;
if (connection.options.credentials.isSecure) {
final port = connection.port != 443 ? ':${connection.port}' : '';
Expand Down
48 changes: 4 additions & 44 deletions lib/src/server/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,53 +113,13 @@ class ServerHandler extends ServiceCall {
_startStreamingRequest();
}

Future<T> _toSingleFuture<T>(Stream<T> stream) {
T _ensureOnlyOneRequest(T previous, T element) {
if (previous != null) {
throw new GrpcError.unimplemented('More than one request received');
}
return element;
}

T _ensureOneRequest(T value) {
if (value == null)
throw new GrpcError.unimplemented('No requests received');
return value;
}

final future =
stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest);
// Make sure errors on the future aren't unhandled, but return the original
// future so the request handler can also get the error.
future.catchError((_) {});
return future;
}

void _startStreamingRequest() {
_incomingSubscription.pause();
_requests = new StreamController(
onListen: _incomingSubscription.resume,
onPause: _incomingSubscription.pause,
onResume: _incomingSubscription.resume);
_requests = _descriptor.createRequestStream(_incomingSubscription);
_incomingSubscription.onData(_onDataActive);

_service.$onMetadata(this);
if (_descriptor.streamingResponse) {
if (_descriptor.streamingRequest) {
_responses = _descriptor.handler(this, _requests.stream);
} else {
_responses =
_descriptor.handler(this, _toSingleFuture(_requests.stream));
}
} else {
Future response;
if (_descriptor.streamingRequest) {
response = _descriptor.handler(this, _requests.stream);
} else {
response = _descriptor.handler(this, _toSingleFuture(_requests.stream));
}
_responses = response.asStream();
}
_responses = _descriptor.handle(this, _requests.stream);

_responseSubscription = _responses.listen(_onResponse,
onError: _onResponseError,
Expand Down Expand Up @@ -213,7 +173,7 @@ class ServerHandler extends ServiceCall {
final data = message as GrpcData;
var request;
try {
request = _descriptor.requestDeserializer(data.data);
request = _descriptor.deserialize(data.data);
} catch (error) {
final grpcError =
new GrpcError.internal('Error deserializing request: $error');
Expand All @@ -231,7 +191,7 @@ class ServerHandler extends ServiceCall {

void _onResponse(response) {
try {
final bytes = _descriptor.responseSerializer(response);
final bytes = _descriptor.serialize(response);
if (!_headersSent) {
sendHeaders();
}
Expand Down
53 changes: 53 additions & 0 deletions lib/src/server/service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';

import '../shared/status.dart';
import 'call.dart';

/// Definition of a gRPC service method.
Expand All @@ -34,6 +37,56 @@ class ServiceMethod<Q, R> {
this.streamingResponse,
this.requestDeserializer,
this.responseSerializer);

StreamController<Q> createRequestStream(StreamSubscription incoming) =>
new StreamController<Q>(
onListen: incoming.resume,
onPause: incoming.pause,
onResume: incoming.resume);

Q deserialize(List<int> data) => requestDeserializer(data);

List<int> serialize(dynamic response) => responseSerializer(response as R);

Stream<R> handle(ServiceCall call, Stream<Q> requests) {
if (streamingResponse) {
if (streamingRequest) {
return handler(call, requests);
} else {
return handler(call, _toSingleFuture(requests));
}
} else {
Future<R> response;
if (streamingRequest) {
response = handler(call, requests);
} else {
response = handler(call, _toSingleFuture(requests));
}
return response.asStream();
}
}

Future<Q> _toSingleFuture(Stream<Q> stream) {
Q _ensureOnlyOneRequest(Q previous, Q element) {
if (previous != null) {
throw new GrpcError.unimplemented('More than one request received');
}
return element;
}

Q _ensureOneRequest(Q value) {
if (value == null)
throw new GrpcError.unimplemented('No requests received');
return value;
}

final future =
stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest);
// Make sure errors on the future aren't unhandled, but return the original
// future so the request handler can also get the error.
future.catchError((_) {});
return future;
}
}

/// Definition of a gRPC service.
Expand Down

0 comments on commit 686ecb3

Please sign in to comment.