Skip to content

Commit

Permalink
Fix error in dart 2.9.0+ and fully migrate to dart 2 and added instal…
Browse files Browse the repository at this point in the history
…lation instructions.
  • Loading branch information
Sparks1998 committed Oct 19, 2020
1 parent 6f94351 commit 1511801
Show file tree
Hide file tree
Showing 35 changed files with 345 additions and 374 deletions.
32 changes: 16 additions & 16 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ In all other cases, the driver performs *automatic inline expansion* of each bou
|ascii | String
|bigint | int
|blob | [Uint8List](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart-typed_data.Uint8List)
|boolea | bool
|boolean | bool
|counter | int
|decimal | int or double
|double | double
Expand All @@ -29,7 +29,7 @@ In all other cases, the driver performs *automatic inline expansion* of each bou
|uuid | [Uuid](#uuids)
|timeuuid | [Uuid](#uuids)
|varchar | String
|varint | int
|varint | BigInt
|UDT | LinkedHashMap. See the section on [UDTs](#user-defined-types)
|tuple | [Tuple](#tuples)
|custom | [Uint8List](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart-typed_data.Uint8List) or type instance implementing [CustomType](#custom-types)
Expand All @@ -39,8 +39,8 @@ In all other cases, the driver performs *automatic inline expansion* of each bou
The [Uuid](https://github.com/achilleasa/dart_cassandra_cql/blob/master/lib/src/types/uuid.dart) class provides a wrapper around UUIDs and provides factory constructors for generating simple and time-based UUIDs.

```dart
Uuid simpleUuid = new Uuid.simple();
Uuid timeUuid = new Uuid.timeBased();
Uuid simpleUuid = Uuid.simple();
Uuid timeUuid = Uuid.timeBased();
```

If you have some externally generated UUIDs that you wish to pass to a Cassandra query you can either pass them as a ```String``` or wrap them with a Uuid object:
Expand All @@ -60,7 +60,7 @@ Whenever you need to use a ```tuple``` type in your queries or read it from a qu
This class is essentially a decorated ```List<Object>```. You can instanciate a ```Tuple``` object from any ```Iterable``` using the ```fromIterable``` named constructor. Here is an example:

```dart
Tuple tuple = new Tuple.fromIterable([1, "test", 3.14]);
Tuple tuple = Tuple.fromIterable([1, "test", 3.14]);
```

### Custom types
Expand All @@ -82,7 +82,7 @@ This interface defines a *getter* for querying the fully qualified Java class na
You will also need to register a ```Codec<Uint8List, CustomType>``` for handing the actual serialization/de-serialization. To register the codec you will need to invoke the globally available ```registerCodec``` method as follows:

```dart
registerCodec('fully.qualified.java.class.name', new MyCustomTypeCodec() );
registerCodec('fully.qualified.java.class.name', MyCustomTypeCodec() );
```

After this step, the driver will automatically invoke the codec whenever it encounters a custom type with this class name while parsing query results or whenever an ```CustomType``` object instance of this type is bound to a query.
Expand Down Expand Up @@ -173,12 +173,12 @@ import "package:dart_cassandra_cql/dart_cassandra_cql.dart" as cql;
import "mocks/lz4.dart" as compress;
int main() {
cql.registerCodec(cql.Compression.LZ4.value, new compress.LZ4Codec());
cql.registerCodec(cql.Compression.LZ4.value, compress.LZ4Codec());
// This client will now use LZ4 compression when talking to Cassandra
cql.Client client = new cql.Client.fromHostList(
cql.Client client = cql.Client.fromHostList(
['10.0.0.1']
, poolConfig : new cql.PoolConfiguration(
, poolConfig : cql.PoolConfiguration(
protocolVersion : cql.ProtocolVersion.V2
, compression : cql.Compression.LZ4
)
Expand Down Expand Up @@ -207,12 +207,12 @@ To define a single query, the driver provides the [Query](https://github.com/ach
Here are some examples:

```dart
new cql.Query(
cql.Query(
"SELECT * FROM test.test_table WHERE id=? AND alt_id=?"
, bindings : [ 1, 2 ]
);
new cql.Query(
cql.Query(
"SELECT * FROM test.test_table WHERE id=:id AND alt_id=:id"
, bindings : { "id" : 1 }
, consistency : cql.Consistency.ONE
Expand All @@ -230,12 +230,12 @@ If you need to execute a batch query, the driver provides the [BatchQuery](https
The ```BatchQuery``` class provides the ```add( Query query)``` method for appending individual ```Query``` instances to the batch. Keep in mind that when useing batch queries, the ```consistency``` and ```serialConsistency``` settings of the ```BatchQuery``` object override any individual consistency settings specified by the appended ```Query``` objects. Here is an example:

```dart
new cql.BatchQuery(consistency: cql.Consistency.TWO)
..add(new cql.Query(
cql.BatchQuery(consistency: cql.Consistency.TWO)
..add(cql.Query(
"INSERT INTO test.test_table (id, alt_id) VALUES (?, ?)"
, bindings : [ 1, 2 ]
))
..add(new cql.Query(
..add(cql.Query(
"INSERT INTO test.test_table (id, alt_id) VALUES (:id, :id)"
, bindings : {"id" : 1}
));
Expand Down Expand Up @@ -266,7 +266,7 @@ To use native pagination you need to invoke the ```execute``` method with your s
To retrieve the next set of rows you need to invoke once again the ```execute``` method with the same ```pageSize``` value as before and the ```pagingState``` named parameter set to the value obtained by the previous method invocation. Here is an example:

```dart
cql.Query query = new cql.Query("SELECT * from really_big_dataset");
cql.Query query = cql.Query("SELECT * from really_big_dataset");
client
.execute(query, pageSize : 10)
.then((cql.RowsResultMessage result) {
Expand All @@ -291,7 +291,7 @@ The underlying StreamController buffers the rows for each page on-demand and emi

```dart
client.stream(
new cql.Query("SELECT * FROM really_big_dataset")
cql.Query("SELECT * FROM really_big_dataset")
, pageSize: 20
).listen( (Map<String, Object> row) => print );
```
Expand Down
30 changes: 19 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Dart driver for [Apache Cassandra](http://Cassandra.apache.org/) that supports C

The driver has a small dependency tree and implements Cassandra binary protocol (versions 2 and 3) for communicating with Cassandra servers. The protocol and CQL versions to be used are both configurable by the user.

# Installation
add the following dependency to install this plugin:\
```yaml
dependency:
dart_cassandra_cql:
git: https://github.com/Sparks1998/dart_cassandra_cql
```
# Features
- Asynchronous API based on [Future](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.Future) and [Streams](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart-async.Stream)
- Connection management via connection pools
Expand All @@ -25,26 +33,26 @@ import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql;

void main() {
// Create a client for connecting to our cluster using native
// protocol V2 and sensible defaults. The client will setup
// protocol V3 and sensible defaults. The client will setup
// a connection pool for you and connect automatically when
// you execute a query.
cql.Client client = new cql.Client.fromHostList([
cql.Client client = cql.Client.fromHostList([
"10.0.0.1:9042"
, "10.0.0.2:9042"
]);

// Perform a select with positional bindings
client.query(
new cql.Query("SELECT * from test.type_test WHERE id=?", bindings : [123])
cql.Query("SELECT * from test.type_test WHERE id=?", bindings : [123])
).then((Iterable<Map<String, Object>> rows) {
// ...
});

// Perform an prepared insert with named bindings, a time-based uuid and tuneable consistency
client.execute(
new cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
"id" : 1
, "uuid" : new cql.Uuid.timeBased()
, "uuid" : cql.Uuid.timeBased()
}, consistency : cql.Consistency.LOCAL_QUORUM
, prepared : true)
).then((cql.ResultMessage res) {
Expand All @@ -53,17 +61,17 @@ void main() {

// Perform a batch insert query
client.execute(
new cql.BatchQuery()
cql.BatchQuery()
..add(
new cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
"id" : 1
, "uuid" : new cql.Uuid.timeBased()
, "uuid" : cql.Uuid.timeBased()
})
)
..add(
new cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
cql.Query("INSERT INTO test.type_test (id, uuid_value) VALUES (:id, :uuid)", bindings : {
"id" : 2
, "uuid" : new cql.Uuid.timeBased()
, "uuid" : cql.Uuid.timeBased()
})
)
..consistency = cql.Consistency.TWO
Expand All @@ -76,7 +84,7 @@ void main() {
// Stream (paginated) query
StreamSubscription sub;
sub = client.stream(
new cql.Query("SELECT * from test.type_test")
cql.Query("SELECT * from test.type_test")
, pageSize : 200
).listen((Map<String, Object> row) {
// Handle incoming row
Expand Down
17 changes: 8 additions & 9 deletions lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ part of dart_cassandra_cql.client;
class Client {
final ConnectionPool connectionPool;
final Map<String, Future<PreparedResultMessage>> preparedQueries =
new Map<String, Future<PreparedResultMessage>>();
Map<String, Future<PreparedResultMessage>>();

/**
* Create a new client and a [SimpleConnectionPool] to the supplied [hosts] optionally using
* the supplied [poolConfig]. If [poolConfig] is not specified, a default configuration will be used insted.
* If a [defaultKeyspace] is provided, it will be autoselected during the handshake phase of each pool connection
* the supplied [poolConfig]. If [poolConfig] is not specified, a default configuration will be used instead.
* If a [defaultKeyspace] is provided, it will be auto selected during the handshake phase of each pool connection
*/

factory Client.fromHostList(List<String> hosts,
{String defaultKeyspace, PoolConfiguration poolConfig}) {
final connectionPool = new SimpleConnectionPool.fromHostList(
hosts, poolConfig == null ? new PoolConfiguration() : poolConfig,
final connectionPool = SimpleConnectionPool.fromHostList(
hosts, poolConfig == null ? PoolConfiguration() : poolConfig,
defaultKeyspace: defaultKeyspace);
return new Client.withPool(connectionPool,
defaultKeyspace: defaultKeyspace);
Expand Down Expand Up @@ -46,8 +46,7 @@ class Client {
*/
Future<Iterable<Map<String, Object>>> query(Query query) async {
// Run query and return back
final res = await _executeSingle(query);
return (res as RowsResultMessage).rows;
return (await _executeSingle(query)).rows;
}

/**
Expand All @@ -56,7 +55,7 @@ class Client {
* demand. The result page size is controlled by the [pageSize] parameter (defaults to 100 rows).
*/
Stream<Map<String, Object>> stream(Query query, {int pageSize: 100}) {
return new ResultStream(_executeSingle, query, pageSize).stream;
return ResultStream(_executeSingle, query, pageSize).stream;
}

/**
Expand Down Expand Up @@ -93,7 +92,7 @@ class Client {
*/
Future<ResultMessage> _executeSingle(Query query,
{int pageSize: null, Uint8List pagingState: null}) async {
final completer = new Completer<Message>();
final completer = Completer<ResultMessage>();

// If this is a normal query, pick the next available pool connection and execute it
if (!query.prepared) {
Expand Down
19 changes: 6 additions & 13 deletions lib/src/client/result_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ class ResultStream {
_buffering = false;

// Append incoming rows to current result list and update our paging state
_bufferedData = new Queue.from(data.rows);
_bufferedData = Queue.from(data.rows);
data.rows = null;
_pagingState = data.metadata.pagingState;

_emitRows();
})
.catchError(_streamController.addError,
test: (e) => e is NoHealthyConnectionsException)
.catchError(_streamController.addError, test: (e) => e is NoHealthyConnectionsException)
.catchError((_) {
// Treat any other kind of error as a 'connection lost' event and try to rebuffer again
_buffering = false;
Expand All @@ -63,9 +62,7 @@ class ResultStream {

// If our stream is active and we emitted all page rows, fetch the next row
// or close the stream if we are done
if (!_streamController.isClosed &&
!_streamController.isPaused &&
_bufferedData.isEmpty) {
if (!_streamController.isClosed && !_streamController.isPaused && _bufferedData.isEmpty) {
if (_pagingState == null) {
_streamController.close();
} else {
Expand All @@ -84,12 +81,8 @@ class ResultStream {
/**
* Create a new [ResultStream] by paging through [this._query] object with a page size of [this._pageSize].
*/
ResultStream(PagedQueryExecutor this._queryExecutor, Query this._query,
int this._pageSize) {
_streamController = new StreamController<Map<String, Object>>(
onListen: _bufferNextPage,
onResume: _emitRows,
onCancel: _cleanup,
sync: true);
ResultStream(PagedQueryExecutor this._queryExecutor, Query this._query, int this._pageSize) {
_streamController = StreamController<Map<String, Object>>(
onListen: _bufferNextPage, onResume: _emitRows, onCancel: _cleanup, sync: true);
}
}
10 changes: 5 additions & 5 deletions lib/src/connection/async_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ part of dart_cassandra_cql.connection;

class AsyncQueue<T> {
Queue<T> _resources;
Queue<Completer<T>> _reservations = new Queue<Completer<T>>();
Queue<Completer<T>> _reservations = Queue<Completer<T>>();
Duration reservationTimeout;

AsyncQueue.from(Iterable<T> resources) {
_resources = new Queue<T>.from(resources);
_resources = Queue<T>.from(resources);
}

/**
Expand All @@ -15,15 +15,15 @@ class AsyncQueue<T> {
*/

Future<T> reserve() {
final reservation = new Completer<T>();
final reservation = Completer<T>();
_reservations.add(reservation);
_dequeue();

// Set reservation timeout if one is specified
if (reservationTimeout != null && reservationTimeout.inMilliseconds > 0) {
new Timer(reservationTimeout, () {
Timer(reservationTimeout, () {
if (!reservation.isCompleted) {
reservation.completeError(new StreamReservationException(
reservation.completeError(StreamReservationException(
'Timed out waiting for stream reservation'));
}
});
Expand Down
Loading

0 comments on commit 1511801

Please sign in to comment.