diff --git a/.gitignore b/.gitignore index fe580dd..b96433d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,9 @@ .buildlog .pub/ build/ -packages/ +packages ~packages/ +.packages # Or the files created by dart2js. *.dart.js @@ -22,4 +23,4 @@ pubspec.lock .DS_Store ._* .Spotlight-V100 -.Trashes \ No newline at end of file +.Trashes diff --git a/lib/src/connection/connection.dart b/lib/src/connection/connection.dart index fcb641e..2cb00ab 100644 --- a/lib/src/connection/connection.dart +++ b/lib/src/connection/connection.dart @@ -231,15 +231,18 @@ class Connection { Completer reply = new Completer(); // Make sure we have flushed our socket data and then // block till we get back a frame writer - _socketFlushed - .then( (_) => _frameWriterPool.reserve()) - .then((FrameWriter writer) { - _reservedFrameWriters[ writer.getStreamId()] = writer; - _pendingResponses[ writer.getStreamId() ] = reply; - connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); - writer.writeMessage(message, _socket, compression : _poolConfig.compression); - _socketFlushed = _socket.flush(); - return _socketFlushed; + // We also assign returned future to _socketFlushed to avoid + // race conditions on consecutive calls to _writeMessage. + _socketFlushed = _socketFlushed + .then((_) => _frameWriterPool.reserve()) + .then((FrameWriter writer) { + _reservedFrameWriters[writer.getStreamId()] = writer; + _pendingResponses[writer.getStreamId()] = reply; + connectionLogger.fine( + "[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); + writer.writeMessage(message, _socket, + compression: _poolConfig.compression); + return _socket.flush(); }) .catchError((e, trace) { // Wrap SocketExceptions diff --git a/pubspec.yaml b/pubspec.yaml index 013b1b9..33c034d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -6,6 +6,6 @@ homepage: https://github.com/achilleasa/dart_cassandra_cql dependencies: uuid: ">=0.4.1 <0.5.0" collection: ">=1.1.0 <2.0.0" - logging: ">=0.9.1+1 <0.10.0" + logging: "^0.11.2" dev_dependencies: unittest: ">=0.11.0+5 <0.12.0" diff --git a/test/lib/client_test.dart b/test/lib/client_test.dart index 287942a..9d5a3a3 100644 --- a/test/lib/client_test.dart +++ b/test/lib/client_test.dart @@ -297,6 +297,19 @@ main({bool enableLogger : true}) { }) ); }); + + test( + "it can handle execution of multiple queries scheduled synchronously", + () { + server.setReplayList(["select_v2.dump", "select_v2.dump"]); + var client = new cql.Client.fromHostList( + ["${SERVER_HOST}:${SERVER_PORT}"], + poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); + var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); + var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); + + expect(Future.wait([f1, f2]), completes); + }); }); group("query:", () { diff --git a/test/lib/connection_test.dart b/test/lib/connection_test.dart index 4c41fbb..ae28521 100644 --- a/test/lib/connection_test.dart +++ b/test/lib/connection_test.dart @@ -285,38 +285,6 @@ main({bool enableLogger : true}) { .catchError(expectAsync(handleError)); }); - test("server responds with compressed frame but we have unregistered the codec", () { - - // Register codec for the mock server - cql.registerCodec(cql.Compression.SNAPPY.value, new compress.MockCompressionCodec()); - - cql.PoolConfiguration config = new cql.PoolConfiguration( - protocolVersion : cql.ProtocolVersion.V2 - , compression : cql.Compression.SNAPPY - , streamsPerConnection : 1 - ); - - conn = new cql.Connection("conn-0", SERVER_HOST, SERVER_PORT, config : config); - - void handleError(e) { - expect(e.message, equals("A compression codec needs to be registered via registerCodec() for type '${cql.Compression.SNAPPY}'")); - } - - conn.open() - .then((_) { - Future res = conn.execute(new cql.Query("SELECT * from test.type_test")); - - new Timer(new Duration(milliseconds: 10), () { - server.replayFile(0, "void_result_v2.dump"); - - cql.unregisterCodec(cql.Compression.SNAPPY.value); - }); - - return res; - }) - .catchError(expectAsync(handleError)); - }); - test("Using a compression codec", () { server.setReplayList(["void_result_v2.dump"]); diff --git a/test/lib/mocks/mocks.dart b/test/lib/mocks/mocks.dart index d03264c..5aa4f68 100644 --- a/test/lib/mocks/mocks.dart +++ b/test/lib/mocks/mocks.dart @@ -93,6 +93,7 @@ class MockServer { List _replayAuthDumpFileList; String _pathToDumps; Duration responseDelay; + Future _replayFuture = new Future.value(); MockServer() { List pathSegments = Platform.script.pathSegments.getRange(0, Platform.script.pathSegments.length - 1).toList(); @@ -112,6 +113,7 @@ class MockServer { mockLogger.info("Shutting down server [${_server.address}:${_server.port}]"); List cleanupFutures = [] + ..add(_replayFuture) ..addAll(clients.map((Socket client) => new Future.value(client.destroy()))) ..add(_server.close().then((_) => new Future.delayed(new Duration(milliseconds:20), () => true))); @@ -210,9 +212,11 @@ class MockServer { } - return responseDelay != null + return _replayFuture.then((_){ + return responseDelay != null ? new Future.delayed(responseDelay, onReplay) : onReplay(); + }); } Future listen(String host, int port) { @@ -253,10 +257,10 @@ class MockServer { writeMessage(client, Opcode.READY.value, streamId : frame.header.streamId); } else if (_replayAuthDumpFileList != null && !_replayAuthDumpFileList.isEmpty) { // Respond with the next payload in replay list - replayFile(clients.indexOf(client), _replayAuthDumpFileList.removeAt(0), frame.header.streamId); + _replayFuture = replayFile(clients.indexOf(client), _replayAuthDumpFileList.removeAt(0), frame.header.streamId); } else if (_replayDumpFileList != null && !_replayDumpFileList.isEmpty) { // Respond with the next payload in replay list - replayFile(clients.indexOf(client), _replayDumpFileList.removeAt(0), frame.header.streamId); + _replayFuture = replayFile(clients.indexOf(client), _replayDumpFileList.removeAt(0), frame.header.streamId); } } @@ -298,4 +302,4 @@ class MockAuthenticator extends Authenticator { return null; } -} \ No newline at end of file +} diff --git a/test/lib/serialization_test.dart b/test/lib/serialization_test.dart index 378c662..dcdbd6c 100644 --- a/test/lib/serialization_test.dart +++ b/test/lib/serialization_test.dart @@ -338,7 +338,7 @@ main({bool enableLogger : true}) { test("TIMESTAMP", () { - Object input = new DateTime.now(); + Object input = new DateTime.fromMillisecondsSinceEpoch(1455746327000); TypeSpec type = new TypeSpec(DataType.TIMESTAMP); encoder.writeTypedValue('test', input, typeSpec : type, size : size); Object output = mock.createDecoder(encoder).readTypedValue(type, size : size); @@ -559,7 +559,7 @@ main({bool enableLogger : true}) { }); test("LIST", () { - Object input = [ new DateTime.now(), new DateTime.now() ]; + Object input = [ new DateTime.fromMillisecondsSinceEpoch(1455746327000), new DateTime.fromMillisecondsSinceEpoch(1455746327000) ]; TypeSpec type = new TypeSpec(DataType.LIST, valueSubType : new TypeSpec(DataType.TIMESTAMP)); encoder.writeTypedValue('test', input, typeSpec : type, size : size); Object output = mock.createDecoder(encoder).readTypedValue(type, size : size); @@ -569,8 +569,8 @@ main({bool enableLogger : true}) { test("MAP", () { Object input = { - "foo" : new DateTime.now(), - "bar" : new DateTime.now() + "foo" : new DateTime.fromMillisecondsSinceEpoch(1455746327000), + "bar" : new DateTime.fromMillisecondsSinceEpoch(1455746327000) }; TypeSpec type = new TypeSpec( DataType.MAP @@ -669,7 +669,7 @@ main({bool enableLogger : true}) { test("TIMESTAMP", () { - Object input = new DateTime.now(); + Object input = new DateTime.fromMillisecondsSinceEpoch(1455746327000); TypeSpec type = new TypeSpec(DataType.TIMESTAMP); encoder.writeTypedValue('test', input, typeSpec : type, size : size); Object output = mock.createDecoder(encoder).readTypedValue(type, size : size); @@ -890,7 +890,7 @@ main({bool enableLogger : true}) { }); test("LIST", () { - Object input = [ new DateTime.now(), new DateTime.now() ]; + Object input = [ new DateTime.fromMillisecondsSinceEpoch(1455746327000), new DateTime.fromMillisecondsSinceEpoch(1455746327000) ]; TypeSpec type = new TypeSpec(DataType.LIST, valueSubType : new TypeSpec(DataType.TIMESTAMP)); encoder.writeTypedValue('test', input, typeSpec : type, size : size); Object output = mock.createDecoder(encoder).readTypedValue(type, size : size); @@ -900,8 +900,8 @@ main({bool enableLogger : true}) { test("MAP", () { Object input = { - "foo" : new DateTime.now(), - "bar" : new DateTime.now() + "foo" : new DateTime.fromMillisecondsSinceEpoch(1455746327000), + "bar" : new DateTime.fromMillisecondsSinceEpoch(1455746327000) }; TypeSpec type = new TypeSpec( DataType.MAP @@ -930,7 +930,7 @@ main({bool enableLogger : true}) { ] , "tags" : { "home" : { - "when" : new DateTime.now() + "when" : new DateTime.fromMillisecondsSinceEpoch(1455746327000) , "labels" : [ "red", "green", "blue" ] } } @@ -957,7 +957,7 @@ main({bool enableLogger : true}) { }); test("TUPLE", () { - Object input = new Tuple.fromIterable(["Test", 3.14, new DateTime.now()]); + Object input = new Tuple.fromIterable(["Test", 3.14, new DateTime.fromMillisecondsSinceEpoch(1455746327000)]); TypeSpec type = new TypeSpec(DataType.TUPLE) ..tupleFields.add(new TypeSpec(DataType.TEXT)) ..tupleFields.add(new TypeSpec(DataType.DOUBLE)) @@ -971,4 +971,4 @@ main({bool enableLogger : true}) { }); }); -} \ No newline at end of file +}