Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
Use StreamChannel.
Browse files Browse the repository at this point in the history
This converts the constructors to take StreamChannels, and changes some
edge-case semantics to be more familiar to StreamChannel (and WebSocket)
users.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1652413002 .
  • Loading branch information
nex3 committed Feb 2, 2016
1 parent 7bc408d commit d6afedc
Show file tree
Hide file tree
Showing 14 changed files with 360 additions and 424 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## 2.0.0

* **Breaking change:** all constructors now take a `StreamChannel` rather than a
`Stream`/`StreamSink` pair.

* `Client.sendRequest()` and `Client.sendNotification()` no longer throw
`StateError`s after the connection has been closed but before `Client.close()`
has been called.

* The various `close()` methods may now be called before their corresponding
`listen()` methods.

* The various `close()` methods now wait on the result of closing the underlying
`StreamSink`. Be aware that [in some circumstances][issue 19095]
`StreamController`s' `Sink.close()` futures may never complete.

[issue 19095]: https://github.com/dart-lang/sdk/issues/19095

## 1.2.0

* Add `Client.isClosed` and `Server.isClosed`, which make it possible to
Expand Down
178 changes: 87 additions & 91 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,66 @@ These methods can be registered using `Server.registerMethod`:

```dart
import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
import "package:stream_channel/stream_channel.dart";
main() async {
var socket = await WebSocket.connect('ws://localhost:4321');
var server = new json_rpc.Server(new StreamChannel(socket, socket));
// Any string may be used as a method name. JSON-RPC 2.0 methods are
// case-sensitive.
var i = 0;
server.registerMethod("count", () {
// Just return the value to be sent as a response to the client. This can
// be anything JSON-serializable, or a Future that completes to something
// JSON-serializable.
return i++;
});
// Methods can take parameters. They're presented as a [Parameters] object
// which makes it easy to validate that the expected parameters exist.
server.registerMethod("echo", (params) {
// If the request doesn't have a "message" parameter, this will
// automatically send a response notifying the client that the request
// was invalid.
return params.getNamed("message");
});
// [Parameters] has methods for verifying argument types.
server.registerMethod("subtract", (params) {
// If "minuend" or "subtrahend" aren't numbers, this will reject the
// request.
return params.getNum("minuend") - params.getNum("subtrahend");
});
void main() {
WebSocket.connect('ws://localhost:4321').then((socket) {
// You can start the server with a Stream for requests and a StreamSink for
// responses, or with an object that's both, like a WebSocket.
var server = new json_rpc.Server(socket);
// Any string may be used as a method name. JSON-RPC 2.0 methods are
// case-sensitive.
var i = 0;
server.registerMethod("count", () {
// Just return the value to be sent as a response to the client. This can
// be anything JSON-serializable, or a Future that completes to something
// JSON-serializable.
return i++;
});
// Methods can take parameters. They're presented as a [Parameters] object
// which makes it easy to validate that the expected parameters exist.
server.registerMethod("echo", (params) {
// If the request doesn't have a "message" parameter, this will
// automatically send a response notifying the client that the request
// was invalid.
return params.getNamed("message");
});
// [Parameters] has methods for verifying argument types.
server.registerMethod("subtract", (params) {
// If "minuend" or "subtrahend" aren't numbers, this will reject the
// request.
return params.getNum("minuend") - params.getNum("subtrahend");
});
// [Parameters] also supports optional arguments.
server.registerMethod("sort", (params) {
var list = params.getList("list");
list.sort();
if (params.getBool("descending", orElse: () => false)) {
return params.list.reversed;
} else {
return params.list;
}
});
// A method can send an error response by throwing a
// `json_rpc.RpcException`. Any positive number may be used as an
// application- defined error code.
const DIVIDE_BY_ZERO = 1;
server.registerMethod("divide", (params) {
var divisor = params.getNum("divisor");
if (divisor == 0) {
throw new json_rpc.RpcException(
DIVIDE_BY_ZERO, "Cannot divide by zero.");
}
return params.getNum("dividend") / divisor;
});
// To give you time to register all your methods, the server won't actually
// start listening for requests until you call `listen`.
server.listen();
// [Parameters] also supports optional arguments.
server.registerMethod("sort", (params) {
var list = params.getList("list");
list.sort();
if (params.getBool("descending", orElse: () => false)) {
return params.list.reversed;
} else {
return params.list;
}
});
// A method can send an error response by throwing a
// `json_rpc.RpcException`. Any positive number may be used as an
// application- defined error code.
const DIVIDE_BY_ZERO = 1;
server.registerMethod("divide", (params) {
var divisor = params.getNum("divisor");
if (divisor == 0) {
throw new json_rpc.RpcException(
DIVIDE_BY_ZERO, "Cannot divide by zero.");
}
return params.getNum("dividend") / divisor;
});
// To give you time to register all your methods, the server won't actually
// start listening for requests until you call `listen`.
server.listen();
}
```

Expand All @@ -82,38 +80,36 @@ responses to those method calls. These methods can be called using

```dart
import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
void main() {
WebSocket.connect('ws://localhost:4321').then((socket) {
// Just like the server, a client takes a Stream and a StreamSink or a
// single object that's both.
var client = new json_rpc.Client(socket);
// This calls the "count" method on the server. A Future is returned that
// will complete to the value contained in the server's response.
client.sendRequest("count").then((result) => print("Count is $result."));
// Parameters are passed as a simple Map or, for positional parameters, an
// Iterable. Make sure they're JSON-serializable!
client.sendRequest("echo", {"message": "hello"})
.then((echo) => print('Echo says "$echo"!'));
// A notification is a way to call a method that tells the server that no
// result is expected. Its return type is `void`; even if it causes an
// error, you won't hear back.
client.sendNotification("count");
// If the server sends an error response, the returned Future will complete
// with an RpcException. You can catch this error and inspect its error
// code, message, and any data that the server sent along with it.
client.sendRequest("divide", {"dividend": 2, "divisor": 0})
.catchError((error) {
print("RPC error ${error.code}: ${error.message}");
});
// The client won't subscribe to the input stream until you call `listen`.
client.listen();
import "package:stream_channel/stream_channel.dart";
main() async {
var socket = await WebSocket.connect('ws://localhost:4321');
var client = new json_rpc.Client(new StreamChannel(socket, socket));
// This calls the "count" method on the server. A Future is returned that
// will complete to the value contained in the server's response.
client.sendRequest("count").then((result) => print("Count is $result."));
// Parameters are passed as a simple Map or, for positional parameters, an
// Iterable. Make sure they're JSON-serializable!
client.sendRequest("echo", {"message": "hello"})
.then((echo) => print('Echo says "$echo"!'));
// A notification is a way to call a method that tells the server that no
// result is expected. Its return type is `void`; even if it causes an
// error, you won't hear back.
client.sendNotification("count");
// If the server sends an error response, the returned Future will complete
// with an RpcException. You can catch this error and inspect its error
// code, message, and any data that the server sent along with it.
client.sendRequest("divide", {"dividend": 2, "divisor": 0})
.catchError((error) {
print("RPC error ${error.code}: ${error.message}");
});
// The client won't subscribe to the input stream until you call `listen`.
client.listen();
}
```

Expand Down
79 changes: 79 additions & 0 deletions lib/src/channel_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:stream_channel/stream_channel.dart';

/// Wraps a [StreamChannel] and handles logic that's shared between [Server],
/// [Client], and [Peer].
///
/// These classes don't provide the user direct access to a
/// [StreamSubscription]. Instead, they use the future returned by [listen] to
/// notify the user of the remote endpoint closing or producing an error.
class ChannelManager {
/// The name of the component whose channel is wrapped (e.g. "Server").
///
/// Used for error reporting.
final String _name;

/// The underlying channel.
final StreamChannel _channel;

/// Returns a [Future] that completes when the connection is closed.
///
/// This is the same future that's returned by [listen].
Future get done => _doneCompleter.future;
final _doneCompleter = new Completer();

/// Whether the underlying communication channel is closed.
bool get isClosed => _doneCompleter.isCompleted;

/// Whether [listen] has been called.
bool _listenCalled = false;

/// Whether [close] has been called.
///
/// Note that [isClosed] tracks whether the underlying connection is closed,
/// whereas this tracks only whether it was explicitly closed from this end.
bool _closeCalled = false;

ChannelManager(this._name, this._channel);

/// Starts listening to the channel.
///
/// The returned Future will complete when the input stream is closed. If the
/// input stream emits an error, that will be piped to the returned Future.
Future listen(void handleInput(input)) {
if (_listenCalled) {
throw new StateError("Can only call $_name.listen() once.");
}
_listenCalled = true;

_channel.stream.listen(handleInput,
onError: (error, stackTrace) {
_doneCompleter.completeError(error, stackTrace);
_channel.sink.close();
},
onDone: _doneCompleter.complete,
cancelOnError: true);

return done;
}

/// Emit [event].
void add(event) {
if (isClosed && !_closeCalled) return;
_channel.sink.add(event);
}

/// Closes the channel.
Future close() {
_closeCalled = true;
if (!_doneCompleter.isCompleted) {
_doneCompleter.complete(_channel.sink.close());
}
return done;
}
}
Loading

0 comments on commit d6afedc

Please sign in to comment.