Skip to content

Commit

Permalink
Format all sources with dartfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
achilleasa committed Feb 18, 2016
1 parent 05ab26f commit e82a0c4
Show file tree
Hide file tree
Showing 80 changed files with 2,831 additions and 2,703 deletions.
18 changes: 16 additions & 2 deletions lib/dart_cassandra_cql.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
library dart_cassandra_cql;

// Client API exports
export "src/exceptions.dart" show AuthenticationException, CassandraException, NoHealthyConnectionsException, DriverException;
export "src/exceptions.dart"
show
AuthenticationException,
CassandraException,
NoHealthyConnectionsException,
DriverException;
export "src/types.dart";
export "src/query.dart";
export "src/protocol.dart" show ResultMessage, RowsResultMessage, VoidResultMessage, SetKeyspaceResultMessage, SchemaChangeResultMessage, EventMessage, Authenticator, PasswordAuthenticator;
export "src/protocol.dart"
show
ResultMessage,
RowsResultMessage,
VoidResultMessage,
SetKeyspaceResultMessage,
SchemaChangeResultMessage,
EventMessage,
Authenticator,
PasswordAuthenticator;
export "src/connection.dart" hide AsyncQueue;
export "src/client.dart";
export "src/stream.dart";
105 changes: 55 additions & 50 deletions lib/src/client/client.dart
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
part of dart_cassandra_cql.client;

class Client {

ConnectionPool connectionPool;
final Map<String, Future<PreparedResultMessage>> preparedQueries = new Map<String, Future<PreparedResultMessage>>();
final Map<String, Future<PreparedResultMessage>> preparedQueries =
new Map<String, Future<PreparedResultMessage>>();

/**
* Create a new client and a [SimpleConnectionPool] to the supplied [hosts] optionally using
* the supplied [poolConfig]. If [poolConfig] is not specified, a default configuration will be used insted.
* If a [defaultKeyspace] is provided, it will be autoselected during the handshake phase of each pool connection
*/

Client.fromHostList(List<String> hosts, {String defaultKeyspace, PoolConfiguration poolConfig}) {
Client.fromHostList(List<String> hosts,
{String defaultKeyspace, PoolConfiguration poolConfig}) {
connectionPool = new SimpleConnectionPool.fromHostList(
hosts
, poolConfig == null
? new PoolConfiguration()
: poolConfig
, defaultKeyspace : defaultKeyspace
);
hosts, poolConfig == null ? new PoolConfiguration() : poolConfig,
defaultKeyspace: defaultKeyspace);
}

/**
Expand All @@ -33,10 +30,12 @@ class Client {
* [SetKeyspaceResultMessage] or [SchemaChangeResultMessage]. The optional [pageSize] and [pagingState]
* params may be supplied to enable pagination when performing single queries.
*/
Future<ResultMessage> execute(QueryInterface query, {int pageSize : null, Uint8List pagingState : null}) {
Future<ResultMessage> execute(QueryInterface query,
{int pageSize: null, Uint8List pagingState: null}) {
return (query is BatchQuery)
? _executeBatch(query)
: _executeSingle(query as Query, pageSize : pageSize, pagingState : pagingState);
? _executeBatch(query)
: _executeSingle(query as Query,
pageSize: pageSize, pagingState: pagingState);
}

/**
Expand All @@ -53,7 +52,7 @@ class Client {
* event per result row. The client uses cassandra's pagination API to load additional result pages on
* demand. The result page size is controlled by the [pageSize] parameter (defaults to 100 rows).
*/
Stream<Map<String, Object>> stream(Query query, { int pageSize : 100}) {
Stream<Map<String, Object>> stream(Query query, {int pageSize: 100}) {
return new ResultStream(_executeSingle, query, pageSize).stream;
}

Expand All @@ -63,22 +62,24 @@ class Client {
* that will complete when all connections are drained. If [drain] is false then the returned [Future]
* will be already completed.
*/
Future shutdown({ bool drain : true, Duration drainTimeout : const Duration( seconds : 5 )}) {
return connectionPool.disconnect(drain: drain, drainTimeout : drainTimeout);
Future shutdown(
{bool drain: true, Duration drainTimeout: const Duration(seconds: 5)}) {
return connectionPool.disconnect(drain: drain, drainTimeout: drainTimeout);
}

/**
* Prepare the given query and return back a [Future] with a [PreparedResultMessage]
*/
Future<PreparedResultMessage> _prepare(Query query) {

// If the query is preparing/already prepared, return its future
if (preparedQueries.containsKey(query.query)) {
return preparedQueries[query.query];
}

// Queue for preparation and return back a future
Future deferred = connectionPool.getConnection().then((Connection conn) => conn.prepare(query));
Future deferred = connectionPool
.getConnection()
.then((Connection conn) => conn.prepare(query));
preparedQueries[query.query] = deferred;
return deferred;
}
Expand All @@ -87,22 +88,25 @@ class Client {
* Execute a single [query] with optional [pageSize] and [pagingState] data
* and return back a [Future<ResultMessage>]
*/
Future<ResultMessage> _executeSingle(Query query, {int pageSize : null, Uint8List pagingState : null}) {
Future<ResultMessage> _executeSingle(Query query,
{int pageSize: null, Uint8List pagingState: null}) {
Completer completer = new Completer();

// If this is a normal query, pick the next available pool connection and execute it
if (!query.prepared) {
void _execute() {
connectionPool.getConnection()
.then((Connection conn) => conn.execute(
query
, pageSize: pageSize
, pagingState : pagingState))
.then(completer.complete)
// If we lose our connection OR we cannot reserve a connection stream, retry on another connection
.catchError((_) => _execute(), test: (e) => e is ConnectionLostException || e is StreamReservationException)
// Any other error will cause the future to fail
.catchError(completer.completeError);
connectionPool
.getConnection()
.then((Connection conn) => conn.execute(query,
pageSize: pageSize, pagingState: pagingState))
.then(completer.complete)
// If we lose our connection OR we cannot reserve a connection stream, retry on another connection
.catchError((_) => _execute(),
test: (e) =>
e is ConnectionLostException ||
e is StreamReservationException)
// Any other error will cause the future to fail
.catchError(completer.completeError);
}

_execute();
Expand All @@ -112,26 +116,26 @@ class Client {
void _prepareAndExecute() {
// Prepare query; any error will make our returned future fail
_prepare(query)
// Fetch a connection for the node this query was prepared at and execute it
.then((PreparedResultMessage preparedResult) => connectionPool.getConnectionToHost(
preparedResult.host
, preparedResult.port
).then((Connection conn) => conn.execute(
query
, preparedResult : preparedResult
, pageSize: pageSize
, pagingState : pagingState
))
.then(completer.complete)
// If we lose our connection OR we cannot reserve a connection stream, retry on another connection to the same host
.catchError((_) => _prepareAndExecute(), test: (e) => e is ConnectionLostException || e is StreamReservationException)
// We run out of connections to use this prepared result so we need to prepare it again on a new node
.catchError((_) {
preparedQueries.remove(query.query);
_prepareAndExecute();
}, test : (e) => e is NoHealthyConnectionsException))
// Any other error will cause the future to fail
.catchError(completer.completeError);
// Fetch a connection for the node this query was prepared at and execute it
.then((PreparedResultMessage preparedResult) => connectionPool
.getConnectionToHost(preparedResult.host, preparedResult.port)
.then((Connection conn) => conn.execute(query,
preparedResult: preparedResult,
pageSize: pageSize,
pagingState: pagingState))
.then(completer.complete)
// If we lose our connection OR we cannot reserve a connection stream, retry on another connection to the same host
.catchError((_) => _prepareAndExecute(),
test: (e) =>
e is ConnectionLostException ||
e is StreamReservationException)
// We run out of connections to use this prepared result so we need to prepare it again on a new node
.catchError((_) {
preparedQueries.remove(query.query);
_prepareAndExecute();
}, test: (e) => e is NoHealthyConnectionsException))
// Any other error will cause the future to fail
.catchError(completer.completeError);
}
_prepareAndExecute();
return completer.future;
Expand All @@ -141,7 +145,8 @@ class Client {
* Execute a batch [query] and return back a [Future<ResultMessage>]
*/
Future<ResultMessage> _executeBatch(BatchQuery query) {
return connectionPool.getConnection().then((Connection conn) => conn.executeBatch(query));
return connectionPool
.getConnection()
.then((Connection conn) => conn.executeBatch(query));
}

}
70 changes: 32 additions & 38 deletions lib/src/client/result_stream.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
part of dart_cassandra_cql.client;

typedef Future PagedQueryExecutor(Query query, {int pageSize, Uint8List pagingState});
typedef Future PagedQueryExecutor(Query query,
{int pageSize, Uint8List pagingState});

class ResultStream {
PagedQueryExecutor _queryExecutor;
Expand All @@ -17,43 +18,39 @@ class ResultStream {
}
_buffering = true;

_queryExecutor(
_query
, pageSize : _pageSize
, pagingState : _pagingState
).then((RowsResultMessage data) {
// If the stream has been closed, clean up
if (_streamController.isClosed) {
return;
}

_buffering = false;

// Append incoming rows to current result list and update our paging state
_bufferedData = new Queue.from(data.rows);
data.rows = null;
_pagingState = data.metadata.pagingState;

_emitRows();
})
.catchError(_streamController.addError, test : (e) => e is NoHealthyConnectionsException)
.catchError((_) {
// Treat any other kind of error as a 'connection lost' event and try to rebuffer again
_buffering = false;
_bufferNextPage();
});
_queryExecutor(_query, pageSize: _pageSize, pagingState: _pagingState)
.then((RowsResultMessage data) {
// If the stream has been closed, clean up
if (_streamController.isClosed) {
return;
}

_buffering = false;

// Append incoming rows to current result list and update our paging state
_bufferedData = new Queue.from(data.rows);
data.rows = null;
_pagingState = data.metadata.pagingState;

_emitRows();
})
.catchError(_streamController.addError,
test: (e) => e is NoHealthyConnectionsException)
.catchError((_) {
// Treat any other kind of error as a 'connection lost' event and try to rebuffer again
_buffering = false;
_bufferNextPage();
});
}

void _emitRows() {

// If stream is paused, do not emit any events
if (_streamController.isPaused) {
return;
}

// Emit each available row
while (_bufferedData != null && _bufferedData.isNotEmpty) {

Map<String, Object> row = _bufferedData.removeFirst();
_streamController.add(row);

Expand All @@ -67,9 +64,7 @@ class ResultStream {
// or close the stream if we are done
if (!_streamController.isClosed &&
!_streamController.isPaused &&
_bufferedData.isEmpty
) {

_bufferedData.isEmpty) {
if (_pagingState == null) {
_streamController.close();
} else {
Expand All @@ -81,20 +76,19 @@ class ResultStream {
void _cleanup() {
_bufferedData = null;
_pagingState = null;

}

Stream<Map<String, Object>> get stream => _streamController.stream;

/**
* Create a new [ResultStream] by paging through [this._query] object with a page size of [this._pageSize].
*/
ResultStream(PagedQueryExecutor this._queryExecutor, Query this._query, int this._pageSize) {
ResultStream(PagedQueryExecutor this._queryExecutor, Query this._query,
int this._pageSize) {
_streamController = new StreamController<Map<String, Object>>(
onListen : _bufferNextPage
, onResume : _emitRows
, onCancel : _cleanup
, sync : true
);
onListen: _bufferNextPage,
onResume: _emitRows,
onCancel: _cleanup,
sync: true);
}
}
6 changes: 3 additions & 3 deletions lib/src/connection/async_queue.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
part of dart_cassandra_cql.connection;

class AsyncQueue<T> {

Queue<T> _resources;
Queue<Completer<T>> _reservations = new Queue<Completer<T>>();
Duration reservationTimeout;

AsyncQueue.from(Iterable<T> resources){
AsyncQueue.from(Iterable<T> resources) {
_resources = new Queue<T>.from(resources);
}

Expand All @@ -24,7 +23,8 @@ class AsyncQueue<T> {
if (reservationTimeout != null && reservationTimeout.inMilliseconds > 0) {
new Timer(reservationTimeout, () {
if (!reservation.isCompleted) {
reservation.completeError(new StreamReservationException('Timed out waiting for stream reservation'));
reservation.completeError(new StreamReservationException(
'Timed out waiting for stream reservation'));
}
});
}
Expand Down
Loading

0 comments on commit e82a0c4

Please sign in to comment.