From d6afedc5f1cb77531c80d7505a9d8e454a3c1bf8 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Tue, 2 Feb 2016 12:21:41 -0800 Subject: [PATCH] Use StreamChannel. This converts the constructors to take StreamChannels, and changes some edge-case semantics to be more familiar to StreamChannel (and WebSocket) users. R=rnystrom@google.com Review URL: https://codereview.chromium.org//1652413002 . --- CHANGELOG.md | 18 ++++ README.md | 178 +++++++++++++++++------------------ lib/src/channel_manager.dart | 79 ++++++++++++++++ lib/src/client.dart | 63 ++++++------- lib/src/peer.dart | 69 +++++--------- lib/src/server.dart | 94 +++++++++--------- lib/src/two_way_stream.dart | 135 -------------------------- lib/src/utils.dart | 33 +++++++ pubspec.yaml | 6 +- test/client/stream_test.dart | 46 +++------ test/client/utils.dart | 6 +- test/peer_test.dart | 6 +- test/server/stream_test.dart | 45 +++------ test/server/utils.dart | 6 +- 14 files changed, 360 insertions(+), 424 deletions(-) create mode 100644 lib/src/channel_manager.dart delete mode 100644 lib/src/two_way_stream.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 524cbad..649c738 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## 2.0.0 + +* **Breaking change:** all constructors now take a `StreamChannel` rather than a + `Stream`/`StreamSink` pair. + +* `Client.sendRequest()` and `Client.sendNotification()` no longer throw + `StateError`s after the connection has been closed but before `Client.close()` + has been called. + +* The various `close()` methods may now be called before their corresponding + `listen()` methods. + +* The various `close()` methods now wait on the result of closing the underlying + `StreamSink`. Be aware that [in some circumstances][issue 19095] + `StreamController`s' `Sink.close()` futures may never complete. + +[issue 19095]: https://github.com/dart-lang/sdk/issues/19095 + ## 1.2.0 * Add `Client.isClosed` and `Server.isClosed`, which make it possible to diff --git a/README.md b/README.md index 67afc31..8078fa2 100644 --- a/README.md +++ b/README.md @@ -9,68 +9,66 @@ These methods can be registered using `Server.registerMethod`: ```dart import "package:json_rpc_2/json_rpc_2.dart" as json_rpc; +import "package:stream_channel/stream_channel.dart"; + +main() async { + var socket = await WebSocket.connect('ws://localhost:4321'); + var server = new json_rpc.Server(new StreamChannel(socket, socket)); + + // Any string may be used as a method name. JSON-RPC 2.0 methods are + // case-sensitive. + var i = 0; + server.registerMethod("count", () { + // Just return the value to be sent as a response to the client. This can + // be anything JSON-serializable, or a Future that completes to something + // JSON-serializable. + return i++; + }); + + // Methods can take parameters. They're presented as a [Parameters] object + // which makes it easy to validate that the expected parameters exist. + server.registerMethod("echo", (params) { + // If the request doesn't have a "message" parameter, this will + // automatically send a response notifying the client that the request + // was invalid. + return params.getNamed("message"); + }); + + // [Parameters] has methods for verifying argument types. + server.registerMethod("subtract", (params) { + // If "minuend" or "subtrahend" aren't numbers, this will reject the + // request. + return params.getNum("minuend") - params.getNum("subtrahend"); + }); -void main() { - WebSocket.connect('ws://localhost:4321').then((socket) { - // You can start the server with a Stream for requests and a StreamSink for - // responses, or with an object that's both, like a WebSocket. - var server = new json_rpc.Server(socket); - - // Any string may be used as a method name. JSON-RPC 2.0 methods are - // case-sensitive. - var i = 0; - server.registerMethod("count", () { - // Just return the value to be sent as a response to the client. This can - // be anything JSON-serializable, or a Future that completes to something - // JSON-serializable. - return i++; - }); - - // Methods can take parameters. They're presented as a [Parameters] object - // which makes it easy to validate that the expected parameters exist. - server.registerMethod("echo", (params) { - // If the request doesn't have a "message" parameter, this will - // automatically send a response notifying the client that the request - // was invalid. - return params.getNamed("message"); - }); - - // [Parameters] has methods for verifying argument types. - server.registerMethod("subtract", (params) { - // If "minuend" or "subtrahend" aren't numbers, this will reject the - // request. - return params.getNum("minuend") - params.getNum("subtrahend"); - }); - - // [Parameters] also supports optional arguments. - server.registerMethod("sort", (params) { - var list = params.getList("list"); - list.sort(); - if (params.getBool("descending", orElse: () => false)) { - return params.list.reversed; - } else { - return params.list; - } - }); - - // A method can send an error response by throwing a - // `json_rpc.RpcException`. Any positive number may be used as an - // application- defined error code. - const DIVIDE_BY_ZERO = 1; - server.registerMethod("divide", (params) { - var divisor = params.getNum("divisor"); - if (divisor == 0) { - throw new json_rpc.RpcException( - DIVIDE_BY_ZERO, "Cannot divide by zero."); - } - - return params.getNum("dividend") / divisor; - }); - - // To give you time to register all your methods, the server won't actually - // start listening for requests until you call `listen`. - server.listen(); + // [Parameters] also supports optional arguments. + server.registerMethod("sort", (params) { + var list = params.getList("list"); + list.sort(); + if (params.getBool("descending", orElse: () => false)) { + return params.list.reversed; + } else { + return params.list; + } }); + + // A method can send an error response by throwing a + // `json_rpc.RpcException`. Any positive number may be used as an + // application- defined error code. + const DIVIDE_BY_ZERO = 1; + server.registerMethod("divide", (params) { + var divisor = params.getNum("divisor"); + if (divisor == 0) { + throw new json_rpc.RpcException( + DIVIDE_BY_ZERO, "Cannot divide by zero."); + } + + return params.getNum("dividend") / divisor; + }); + + // To give you time to register all your methods, the server won't actually + // start listening for requests until you call `listen`. + server.listen(); } ``` @@ -82,38 +80,36 @@ responses to those method calls. These methods can be called using ```dart import "package:json_rpc_2/json_rpc_2.dart" as json_rpc; - -void main() { - WebSocket.connect('ws://localhost:4321').then((socket) { - // Just like the server, a client takes a Stream and a StreamSink or a - // single object that's both. - var client = new json_rpc.Client(socket); - - // This calls the "count" method on the server. A Future is returned that - // will complete to the value contained in the server's response. - client.sendRequest("count").then((result) => print("Count is $result.")); - - // Parameters are passed as a simple Map or, for positional parameters, an - // Iterable. Make sure they're JSON-serializable! - client.sendRequest("echo", {"message": "hello"}) - .then((echo) => print('Echo says "$echo"!')); - - // A notification is a way to call a method that tells the server that no - // result is expected. Its return type is `void`; even if it causes an - // error, you won't hear back. - client.sendNotification("count"); - - // If the server sends an error response, the returned Future will complete - // with an RpcException. You can catch this error and inspect its error - // code, message, and any data that the server sent along with it. - client.sendRequest("divide", {"dividend": 2, "divisor": 0}) - .catchError((error) { - print("RPC error ${error.code}: ${error.message}"); - }); - - // The client won't subscribe to the input stream until you call `listen`. - client.listen(); +import "package:stream_channel/stream_channel.dart"; + +main() async { + var socket = await WebSocket.connect('ws://localhost:4321'); + var client = new json_rpc.Client(new StreamChannel(socket, socket)); + + // This calls the "count" method on the server. A Future is returned that + // will complete to the value contained in the server's response. + client.sendRequest("count").then((result) => print("Count is $result.")); + + // Parameters are passed as a simple Map or, for positional parameters, an + // Iterable. Make sure they're JSON-serializable! + client.sendRequest("echo", {"message": "hello"}) + .then((echo) => print('Echo says "$echo"!')); + + // A notification is a way to call a method that tells the server that no + // result is expected. Its return type is `void`; even if it causes an + // error, you won't hear back. + client.sendNotification("count"); + + // If the server sends an error response, the returned Future will complete + // with an RpcException. You can catch this error and inspect its error + // code, message, and any data that the server sent along with it. + client.sendRequest("divide", {"dividend": 2, "divisor": 0}) + .catchError((error) { + print("RPC error ${error.code}: ${error.message}"); }); + + // The client won't subscribe to the input stream until you call `listen`. + client.listen(); } ``` diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart new file mode 100644 index 0000000..35d75a6 --- /dev/null +++ b/lib/src/channel_manager.dart @@ -0,0 +1,79 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_channel/stream_channel.dart'; + +/// Wraps a [StreamChannel] and handles logic that's shared between [Server], +/// [Client], and [Peer]. +/// +/// These classes don't provide the user direct access to a +/// [StreamSubscription]. Instead, they use the future returned by [listen] to +/// notify the user of the remote endpoint closing or producing an error. +class ChannelManager { + /// The name of the component whose channel is wrapped (e.g. "Server"). + /// + /// Used for error reporting. + final String _name; + + /// The underlying channel. + final StreamChannel _channel; + + /// Returns a [Future] that completes when the connection is closed. + /// + /// This is the same future that's returned by [listen]. + Future get done => _doneCompleter.future; + final _doneCompleter = new Completer(); + + /// Whether the underlying communication channel is closed. + bool get isClosed => _doneCompleter.isCompleted; + + /// Whether [listen] has been called. + bool _listenCalled = false; + + /// Whether [close] has been called. + /// + /// Note that [isClosed] tracks whether the underlying connection is closed, + /// whereas this tracks only whether it was explicitly closed from this end. + bool _closeCalled = false; + + ChannelManager(this._name, this._channel); + + /// Starts listening to the channel. + /// + /// The returned Future will complete when the input stream is closed. If the + /// input stream emits an error, that will be piped to the returned Future. + Future listen(void handleInput(input)) { + if (_listenCalled) { + throw new StateError("Can only call $_name.listen() once."); + } + _listenCalled = true; + + _channel.stream.listen(handleInput, + onError: (error, stackTrace) { + _doneCompleter.completeError(error, stackTrace); + _channel.sink.close(); + }, + onDone: _doneCompleter.complete, + cancelOnError: true); + + return done; + } + + /// Emit [event]. + void add(event) { + if (isClosed && !_closeCalled) return; + _channel.sink.add(event); + } + + /// Closes the channel. + Future close() { + _closeCalled = true; + if (!_doneCompleter.isCompleted) { + _doneCompleter.complete(_channel.sink.close()); + } + return done; + } +} diff --git a/lib/src/client.dart b/lib/src/client.dart index e75e7f3..5df21e4 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -5,9 +5,10 @@ import 'dart:async'; import 'package:stack_trace/stack_trace.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'channel_manager.dart'; import 'exception.dart'; -import 'two_way_stream.dart'; import 'utils.dart'; /// A JSON-RPC 2.0 client. @@ -16,7 +17,7 @@ import 'utils.dart'; /// those method calls. Methods can be called with [sendRequest], or with /// [sendNotification] if no response is expected. class Client { - final TwoWayStream _streams; + final ChannelManager _manager; /// The next request id. var _id = 0; @@ -29,55 +30,53 @@ class Client { /// The map of request ids to pending requests. final _pendingRequests = new Map(); - /// Returns a [Future] that completes when the connection is closed. + /// Returns a [Future] that completes when the underlying connection is + /// closed. /// - /// This is the same future that's returned by [listen]. - Future get done => _streams.done; + /// This is the same future that's returned by [listen] and [close]. It may + /// complete before [close] is called if the remote endpoint closes the + /// connection. + Future get done => _manager.done; - /// Whether the connection is closed. - bool get isClosed => _streams.isClosed; - - /// Creates a [Client] that writes requests to [requests] and reads responses - /// from [responses]. + /// Whether the underlying connection is closed. /// - /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a - /// `WebSocket`), [requests] may be omitted. + /// Note that this will be `true` before [close] is called if the remote + /// endpoint closes the connection. + bool get isClosed => _manager.isClosed; + + /// Creates a [Client] that communicates over [channel]. /// /// Note that the client won't begin listening to [responses] until /// [Client.listen] is called. - Client(Stream responses, [StreamSink requests]) - : _streams = new TwoWayStream( - "Client", responses, "responses", requests, "requests"); + Client(StreamChannel channel) + : this.withoutJson(channel + .transform(jsonDocument) + .transformStream(ignoreFormatExceptions)); - /// Creates a [Client] that writes decoded responses to [responses] and reads - /// decoded requests from [requests]. + /// Creates a [Client] that communicates using decoded messages over + /// [channel]. /// /// Unlike [new Client], this doesn't read or write JSON strings. Instead, it /// reads and writes decoded maps or lists. /// - /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be - /// omitted. - /// /// Note that the client won't begin listening to [responses] until /// [Client.listen] is called. - Client.withoutJson(Stream responses, [StreamSink requests]) - : _streams = new TwoWayStream.withoutJson( - "Client", responses, "responses", requests, "requests"); + Client.withoutJson(StreamChannel channel) + : _manager = new ChannelManager("Client", channel); /// Starts listening to the underlying stream. /// - /// Returns a [Future] that will complete when the stream is closed or when it - /// has an error. + /// Returns a [Future] that will complete when the connection is closed or + /// when it has an error. This is the same as [done]. /// /// [listen] may only be called once. - Future listen() => _streams.listen(_handleResponse); + Future listen() => _manager.listen(_handleResponse); - /// Closes the server's request sink and response subscription. + /// Closes the underlying connection. /// /// Returns a [Future] that completes when all resources have been released. - /// - /// A client can't be closed before [listen] has been called. - Future close() => _streams.close(); + /// This is the same as [done]. + Future close() => _manager.close(); /// Sends a JSON-RPC 2 request to invoke the given [method]. /// @@ -132,7 +131,7 @@ class Client { if (_batch != null) { _batch.add(message); } else { - _streams.add(message); + _manager.add(message); } } @@ -153,7 +152,7 @@ class Client { _batch = []; return tryFinally(callback, () { - _streams.add(_batch); + _manager.add(_batch); _batch = null; }); } diff --git a/lib/src/peer.dart b/lib/src/peer.dart index a6707e2..810d173 100644 --- a/lib/src/peer.dart +++ b/lib/src/peer.dart @@ -4,12 +4,13 @@ import 'dart:async'; -import '../error_code.dart' as error_code; +import 'package:stream_channel/stream_channel.dart'; + +import 'channel_manager.dart'; import 'client.dart'; -import 'exception.dart'; import 'parameters.dart'; import 'server.dart'; -import 'two_way_stream.dart'; +import 'utils.dart'; /// A JSON-RPC 2.0 client *and* server. /// @@ -17,7 +18,7 @@ import 'two_way_stream.dart'; /// 2.0 endpoint. It sends both requests and responses across the same /// communication channel and expects to connect to a peer that does the same. class Peer implements Client, Server { - TwoWayStream _streams; + final ChannelManager _manager; /// The underlying client that handles request-sending and response-receiving /// logic. @@ -35,55 +36,31 @@ class Peer implements Client, Server { /// they're responses. final _clientIncomingForwarder = new StreamController(sync: true); - /// A stream controller that forwards outgoing messages from both [_server] - /// and [_client]. - final _outgoingForwarder = new StreamController(sync: true); - - Future get done => _streams.done; - bool get isClosed => _streams.isClosed; + Future get done => _manager.done; + bool get isClosed => _manager.isClosed; - /// Creates a [Peer] that reads incoming messages from [incoming] and writes - /// outgoing messages to [outgoing]. + /// Creates a [Peer] that communicates over [channel]. /// - /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a - /// `WebSocket`), [outgoing] may be omitted. - /// - /// Note that the peer won't begin listening to [incoming] until [Peer.listen] + /// Note that the peer won't begin listening to [channel] until [Peer.listen] /// is called. - Peer(Stream incoming, [StreamSink outgoing]) { - _streams = new TwoWayStream("Peer", incoming, "incoming", - outgoing, "outgoing", onInvalidInput: (message, error) { - _streams.add(new RpcException(error_code.PARSE_ERROR, - 'Invalid JSON: ${error.message}').serialize(message)); - }); + Peer(StreamChannel channel) + : this.withoutJson(channel + .transform(jsonDocument) + .transform(respondToFormatExceptions)); - _outgoingForwarder.stream.listen(_streams.add); - _server = new Server.withoutJson( - _serverIncomingForwarder.stream, _outgoingForwarder); - _client = new Client.withoutJson( - _clientIncomingForwarder.stream, _outgoingForwarder); - } - - /// Creates a [Peer] that reads incoming decoded messages from [incoming] and - /// writes outgoing decoded messages to [outgoing]. + /// Creates a [Peer] that communicates using decoded messages over [channel]. /// /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it /// reads and writes decoded maps or lists. /// - /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be - /// omitted. - /// - /// Note that the peer won't begin listening to [incoming] until + /// Note that the peer won't begin listening to [channel] until /// [Peer.listen] is called. - Peer.withoutJson(Stream incoming, [StreamSink outgoing]) { - _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming", - outgoing, "outgoing"); - - _outgoingForwarder.stream.listen(_streams.add); - _server = new Server.withoutJson( - _serverIncomingForwarder.stream, _outgoingForwarder); - _client = new Client.withoutJson( - _clientIncomingForwarder.stream, _outgoingForwarder); + Peer.withoutJson(StreamChannel channel) + : _manager = new ChannelManager("Peer", channel) { + _server = new Server.withoutJson(new StreamChannel( + _serverIncomingForwarder.stream, channel.sink)); + _client = new Client.withoutJson(new StreamChannel( + _clientIncomingForwarder.stream, channel.sink)); } // Client methods. @@ -109,7 +86,7 @@ class Peer implements Client, Server { Future listen() { _client.listen(); _server.listen(); - return _streams.listen((message) { + return _manager.listen((message) { if (message is Map) { if (message.containsKey('result') || message.containsKey('error')) { _clientIncomingForwarder.add(message); @@ -133,5 +110,5 @@ class Peer implements Client, Server { } Future close() => - Future.wait([_client.close(), _server.close(), _streams.close()]); + Future.wait([_client.close(), _server.close(), _manager.close()]); } diff --git a/lib/src/server.dart b/lib/src/server.dart index 06dd0b9..3ef59ed 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -7,11 +7,12 @@ import 'dart:collection'; import 'dart:convert'; import 'package:stack_trace/stack_trace.dart'; +import 'package:stream_channel/stream_channel.dart'; import '../error_code.dart' as error_code; +import 'channel_manager.dart'; import 'exception.dart'; import 'parameters.dart'; -import 'two_way_stream.dart'; import 'utils.dart'; /// A JSON-RPC 2.0 server. @@ -25,7 +26,7 @@ import 'utils.dart'; /// asynchronously, it's possible for multiple methods to be invoked at the same /// time, or even for a single method to be invoked multiple times at once. class Server { - TwoWayStream _streams; + final ChannelManager _manager; /// The methods registered for this server. final _methods = new Map(); @@ -36,59 +37,53 @@ class Server { /// [RpcException.methodNotFound] exception. final _fallbacks = new Queue(); - /// Returns a [Future] that completes when the connection is closed. + /// Returns a [Future] that completes when the underlying connection is + /// closed. /// - /// This is the same future that's returned by [listen]. - Future get done => _streams.done; + /// This is the same future that's returned by [listen] and [close]. It may + /// complete before [close] is called if the remote endpoint closes the + /// connection. + Future get done => _manager.done; - /// Whether the connection is closed. - bool get isClosed => _streams.isClosed; - - /// Creates a [Server] that reads requests from [requests] and writes - /// responses to [responses]. + /// Whether the underlying connection is closed. /// - /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a - /// `WebSocket`), [responses] may be omitted. + /// Note that this will be `true` before [close] is called if the remote + /// endpoint closes the connection. + bool get isClosed => _manager.isClosed; + + /// Creates a [Server] that communicates over [channel]. /// /// Note that the server won't begin listening to [requests] until /// [Server.listen] is called. - Server(Stream requests, [StreamSink responses]) { - _streams = new TwoWayStream("Server", requests, "requests", - responses, "responses", onInvalidInput: (message, error) { - _streams.add(new RpcException(error_code.PARSE_ERROR, - 'Invalid JSON: ${error.message}').serialize(message)); - }); - } + Server(StreamChannel channel) + : this.withoutJson(channel + .transform(jsonDocument) + .transform(respondToFormatExceptions)); - /// Creates a [Server] that reads decoded requests from [requests] and writes - /// decoded responses to [responses]. + /// Creates a [Server] that communicates using decoded messages over + /// [channel]. /// /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it /// reads and writes decoded maps or lists. /// - /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be - /// omitted. - /// /// Note that the server won't begin listening to [requests] until /// [Server.listen] is called. - Server.withoutJson(Stream requests, [StreamSink responses]) - : _streams = new TwoWayStream.withoutJson( - "Server", requests, "requests", responses, "responses"); + Server.withoutJson(StreamChannel channel) + : _manager = new ChannelManager("Server", channel); /// Starts listening to the underlying stream. /// - /// Returns a [Future] that will complete when the stream is closed or when it - /// has an error. + /// Returns a [Future] that will complete when the connection is closed or + /// when it has an error. This is the same as [done]. /// /// [listen] may only be called once. - Future listen() => _streams.listen(_handleRequest); + Future listen() => _manager.listen(_handleRequest); - /// Closes the server's request subscription and response sink. + /// Closes the underlying connection. /// /// Returns a [Future] that completes when all resources have been released. - /// - /// A server can't be closed before [listen] has been called. - Future close() => _streams.close(); + /// This is the same as [done]. + Future close() => _manager.close(); /// Registers a method named [name] on this server. /// @@ -129,21 +124,24 @@ class Server { /// handling that request and returns a JSON-serializable response, or `null` /// if no response should be sent. [callback] may send custom /// errors by throwing an [RpcException]. - Future _handleRequest(request) { - return syncFuture(() { - if (request is! List) return _handleSingleRequest(request); - if (request.isEmpty) { - return new RpcException(error_code.INVALID_REQUEST, 'A batch must ' - 'contain at least one request.').serialize(request); - } + Future _handleRequest(request) async { + var response; + if (request is! List) { + response = await _handleSingleRequest(request); + if (response == null) return; + } else if (request.isEmpty) { + response = new RpcException( + error_code.INVALID_REQUEST, + 'A batch must contain at least one request.') + .serialize(request); + } else { + var results = await Future.wait(request.map(_handleSingleRequest)); + var nonNull = results.where((result) => result != null); + if (nonNull.isEmpty) return; + response = nonNull.toList(); + } - return Future.wait(request.map(_handleSingleRequest)).then((results) { - var nonNull = results.where((result) => result != null); - return nonNull.isEmpty ? null : nonNull.toList(); - }); - }).then((response) { - if (!_streams.isClosed && response != null) _streams.add(response); - }); + if (!isClosed) _manager.add(response); } /// Handles an individual parsed request. diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart deleted file mode 100644 index 4f20686..0000000 --- a/lib/src/two_way_stream.dart +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; -import 'dart:convert'; - -import 'utils.dart'; - -/// A class for managing a stream of input messages and a sink for output -/// messages. -/// -/// This contains stream logic that's shared between [Server] and [Client]. -class TwoWayStream { - /// The name of the component whose streams are being managed (e.g. "Server"). - /// - /// Used for error reporting. - final String _name; - - /// The input stream. - /// - /// This is a stream of decoded JSON objects. - final Stream _input; - - /// The subscription to [_input]. - StreamSubscription _inputSubscription; - - /// The output sink. - /// - /// This takes decoded JSON objects. - final StreamSink _output; - - /// Returns a [Future] that completes when the connection is closed. - /// - /// This is the same future that's returned by [listen]. - Future get done => _doneCompleter.future; - final _doneCompleter = new Completer(); - - /// Whether the stream has been closed. - bool get isClosed => _doneCompleter.isCompleted; - - /// Creates a two-way stream. - /// - /// [input] and [output] should emit and take (respectively) JSON-encoded - /// strings. - /// - /// [inputName] is used in error messages as the name of the input parameter. - /// [outputName] is likewise used as the name of the output parameter. - /// - /// If [onInvalidInput] is passed, any errors parsing messages from [input] - /// are passed to it. Otherwise, they're ignored and the input is discarded. - factory TwoWayStream(String name, Stream input, String inputName, - StreamSink output, String outputName, - {void onInvalidInput(String message, FormatException error)}) { - if (output == null) { - if (input is! StreamSink) { - throw new ArgumentError("Either `$inputName` must be a StreamSink or " - "`$outputName` must be passed."); - } - output = input as StreamSink; - } - - var wrappedOutput = mapStreamSink(output, JSON.encode); - return new TwoWayStream.withoutJson(name, input.expand((message) { - var decodedMessage; - try { - decodedMessage = JSON.decode(message); - } on FormatException catch (error) { - if (onInvalidInput != null) onInvalidInput(message, error); - return []; - } - - return [decodedMessage]; - }), inputName, wrappedOutput, outputName); - } - - /// Creates a two-way stream that reads decoded input and writes decoded - /// responses. - /// - /// [input] and [output] should emit and take (respectively) decoded JSON - /// objects. - /// - /// [inputName] is used in error messages as the name of the input parameter. - /// [outputName] is likewise used as the name of the output parameter. - TwoWayStream.withoutJson(this._name, Stream input, String inputName, - StreamSink output, String outputName) - : _input = input, - _output = output == null && input is StreamSink ? input : output { - if (_output == null) { - throw new ArgumentError("Either `$inputName` must be a StreamSink or " - "`$outputName` must be passed."); - } - } - - /// Starts listening to the input stream. - /// - /// The returned Future will complete when the input stream is closed. If the - /// input stream emits an error, that will be piped to the returned Future. - Future listen(void handleInput(input)) { - if (_inputSubscription != null) { - throw new StateError("Can only call $_name.listen once."); - } - - _inputSubscription = _input.listen(handleInput, - onError: (error, stackTrace) { - if (_doneCompleter.isCompleted) return; - _doneCompleter.completeError(error, stackTrace); - _output.close(); - }, onDone: () { - if (_doneCompleter.isCompleted) return; - _doneCompleter.complete(); - _output.close(); - }, cancelOnError: true); - - return _doneCompleter.future; - } - - /// Emit [event] on the output stream. - void add(event) => _output.add(event); - - /// Stops listening to the input stream and closes the output stream. - Future close() { - if (_inputSubscription == null) { - throw new StateError("Can't call $_name.close before $_name.listen."); - } - - if (!_doneCompleter.isCompleted) _doneCompleter.complete(); - - var inputFuture = _inputSubscription.cancel(); - // TODO(nweiz): include the output future in the return value when issue - // 19095 is fixed. - _output.close(); - return inputFuture == null ? new Future.value() : inputFuture; - } -} diff --git a/lib/src/utils.dart b/lib/src/utils.dart index dde6b4a..8871b56 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -5,6 +5,10 @@ import 'dart:async'; import 'package:stack_trace/stack_trace.dart'; +import 'package:stream_channel/stream_channel.dart'; + +import '../error_code.dart' as error_code; +import 'exception.dart'; typedef ZeroArgumentFunction(); @@ -64,6 +68,35 @@ tryFinally(body(), whenComplete()) { } } +/// A transformer that silently drops [FormatException]s. +final ignoreFormatExceptions = new StreamTransformer.fromHandlers( + handleError: (error, stackTrace, sink) { + if (error is FormatException) return; + sink.addError(error, stackTrace); +}); + +/// A transformer that sends error responses on [FormatException]s. +final StreamChannelTransformer respondToFormatExceptions = + new _RespondToFormatExceptionsTransformer(); + +/// The implementation of [respondToFormatExceptions]. +class _RespondToFormatExceptionsTransformer + implements StreamChannelTransformer { + StreamChannel bind(StreamChannel channel) { + var transformed; + transformed = channel.changeStream((stream) { + return stream.handleError((error) { + if (error is! FormatException) throw error; + + var exception = new RpcException( + error_code.PARSE_ERROR, 'Invalid JSON: ${error.message}'); + transformed.sink.add(exception.serialize(error.source)); + }); + }); + return transformed; + } +} + /// Returns a [StreamSink] that wraps [sink] and maps each event added using /// [callback]. StreamSink mapStreamSink(StreamSink sink, callback(event)) => diff --git a/pubspec.yaml b/pubspec.yaml index 348dc35..630b9cc 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,12 +1,12 @@ name: json_rpc_2 -version: 1.2.0 +version: 2.0.0 author: Dart Team description: An implementation of the JSON-RPC 2.0 spec. homepage: http://github.com/dart-lang/json_rpc_2 dependencies: stack_trace: '>=0.9.1 <2.0.0' + stream_channel: '^1.1.0' dev_dependencies: test: ">=0.12.0 <0.13.0" environment: - sdk: ">=1.2.0 <2.0.0" - + sdk: ">=1.8.0 <2.0.0" diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart index 146e753..b9a31c6 100644 --- a/test/client/stream_test.dart +++ b/test/client/stream_test.dart @@ -4,17 +4,25 @@ import 'dart:async'; +import 'package:stream_channel/stream_channel.dart'; import 'package:test/test.dart'; + import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'utils.dart'; void main() { + var responseController; + var requestController; + var client; + setUp(() { + responseController = new StreamController(); + requestController = new StreamController(); + client = new json_rpc.Client.withoutJson( + new StreamChannel(responseController.stream, requestController.sink)); + }); + test(".withoutJson supports decoded stream and sink", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client.withoutJson( - responseController.stream, requestController.sink); client.listen(); expect(requestController.stream.first.then((request) { @@ -34,11 +42,6 @@ void main() { }); test(".listen returns when the controller is closed", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client.withoutJson( - responseController.stream, requestController.sink); - var hasListenCompeted = false; expect(client.listen().then((_) => hasListenCompeted = true), completes); @@ -51,30 +54,18 @@ void main() { }); test(".listen returns a stream error", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client( - responseController.stream, requestController.sink); - expect(client.listen(), throwsA('oh no')); responseController.addError('oh no'); }); test(".listen can't be called twice", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client( - responseController.stream, requestController.sink); client.listen(); - expect(() => client.listen(), throwsStateError); }); test(".close cancels the stream subscription and closes the sink", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client( - responseController.stream, requestController.sink); + // Work around sdk#19095. + requestController.stream.listen(null); expect(client.listen(), completes); @@ -85,13 +76,4 @@ void main() { expect(() => responseController.stream.listen((_) {}), throwsStateError); expect(requestController.isClosed, isTrue); }); - - test(".close can't be called before .listen", () { - var responseController = new StreamController(); - var requestController = new StreamController(); - var client = new json_rpc.Client( - responseController.stream, requestController.sink); - - expect(() => client.close(), throwsStateError); - }); } diff --git a/test/client/utils.dart b/test/client/utils.dart index 8684892..5eb0b60 100644 --- a/test/client/utils.dart +++ b/test/client/utils.dart @@ -5,9 +5,11 @@ import 'dart:async'; import 'dart:convert'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'package:json_rpc_2/error_code.dart' as error_code; -import 'package:test/test.dart'; /// A controller used to test a [json_rpc.Client]. class ClientController { @@ -23,7 +25,7 @@ class ClientController { ClientController() { _client = new json_rpc.Client( - _responseController.stream, _requestController.sink); + new StreamChannel(_responseController.stream, _requestController.sink)); _client.listen(); } diff --git a/test/peer_test.dart b/test/peer_test.dart index 7008b72..89ab2d6 100644 --- a/test/peer_test.dart +++ b/test/peer_test.dart @@ -5,7 +5,9 @@ import 'dart:async'; import 'dart:convert'; +import 'package:stream_channel/stream_channel.dart'; import 'package:test/test.dart'; + import 'package:json_rpc_2/error_code.dart' as error_code; import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; @@ -19,7 +21,7 @@ void main() { var outgoingController = new StreamController(); outgoing = outgoingController.stream; peer = new json_rpc.Peer.withoutJson( - incomingController.stream, outgoingController); + new StreamChannel(incomingController.stream, outgoingController)); }); group("like a client,", () { @@ -165,7 +167,7 @@ void main() { var incomingController = new StreamController(); var outgoingController = new StreamController(); var jsonPeer = new json_rpc.Peer( - incomingController.stream, outgoingController); + new StreamChannel(incomingController.stream, outgoingController)); expect(outgoingController.stream.first.then(JSON.decode), completion({ "jsonrpc": "2.0", diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart index 58c5e62..7be0001 100644 --- a/test/server/stream_test.dart +++ b/test/server/stream_test.dart @@ -4,17 +4,25 @@ import 'dart:async'; +import 'package:stream_channel/stream_channel.dart'; import 'package:test/test.dart'; + import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'utils.dart'; void main() { + var requestController; + var responseController; + var server; + setUp(() { + requestController = new StreamController(); + responseController = new StreamController(); + server = new json_rpc.Server.withoutJson( + new StreamChannel(requestController.stream, responseController.sink)); + }); + test(".withoutJson supports decoded stream and sink", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server.withoutJson( - requestController.stream, responseController.sink); server.listen(); server.registerMethod('foo', (params) { @@ -36,11 +44,6 @@ void main() { }); test(".listen returns when the controller is closed", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server( - requestController.stream, responseController.sink); - var hasListenCompeted = false; expect(server.listen().then((_) => hasListenCompeted = true), completes); @@ -53,30 +56,19 @@ void main() { }); test(".listen returns a stream error", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server( - requestController.stream, responseController.sink); - expect(server.listen(), throwsA('oh no')); requestController.addError('oh no'); }); test(".listen can't be called twice", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server( - requestController.stream, responseController.sink); server.listen(); expect(() => server.listen(), throwsStateError); }); test(".close cancels the stream subscription and closes the sink", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server( - requestController.stream, responseController.sink); + // Work around sdk#19095. + responseController.stream.listen(null); expect(server.listen(), completes); @@ -87,13 +79,4 @@ void main() { expect(() => requestController.stream.listen((_) {}), throwsStateError); expect(responseController.isClosed, isTrue); }); - - test(".close can't be called before .listen", () { - var requestController = new StreamController(); - var responseController = new StreamController(); - var server = new json_rpc.Server( - requestController.stream, responseController.sink); - - expect(() => server.close(), throwsStateError); - }); } diff --git a/test/server/utils.dart b/test/server/utils.dart index 9b4b020..f33cb13 100644 --- a/test/server/utils.dart +++ b/test/server/utils.dart @@ -5,9 +5,11 @@ import 'dart:async'; import 'dart:convert'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; import 'package:json_rpc_2/error_code.dart' as error_code; -import 'package:test/test.dart'; /// A controller used to test a [json_rpc.Server]. class ServerController { @@ -23,7 +25,7 @@ class ServerController { ServerController() { _server = new json_rpc.Server( - _requestController.stream, _responseController.sink); + new StreamChannel(_requestController.stream, _responseController.sink)); _server.listen(); }