From 28ffa3634fc7bbce90b69fcbc33c1d98d9caf363 Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Tue, 16 Feb 2016 12:51:20 -0800 Subject: [PATCH 1/8] Updated dependency on logging package --- pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubspec.yaml b/pubspec.yaml index 013b1b9..33c034d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -6,6 +6,6 @@ homepage: https://github.com/achilleasa/dart_cassandra_cql dependencies: uuid: ">=0.4.1 <0.5.0" collection: ">=1.1.0 <2.0.0" - logging: ">=0.9.1+1 <0.10.0" + logging: "^0.11.2" dev_dependencies: unittest: ">=0.11.0+5 <0.12.0" From 154e4a0eadb6a02683c60ad909b7f2d0dbc0d2d8 Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Tue, 16 Feb 2016 17:46:18 -0800 Subject: [PATCH 2/8] Fix for race condition in Connection._writeMessage() in case multiple queries were scheduled synchronously --- .gitignore | 5 ++-- lib/src/connection/connection.dart | 21 +++++++++------- test/lib/race_condition_test.dart | 39 ++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 test/lib/race_condition_test.dart diff --git a/.gitignore b/.gitignore index fe580dd..b96433d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,9 @@ .buildlog .pub/ build/ -packages/ +packages ~packages/ +.packages # Or the files created by dart2js. *.dart.js @@ -22,4 +23,4 @@ pubspec.lock .DS_Store ._* .Spotlight-V100 -.Trashes \ No newline at end of file +.Trashes diff --git a/lib/src/connection/connection.dart b/lib/src/connection/connection.dart index fcb641e..2cb00ab 100644 --- a/lib/src/connection/connection.dart +++ b/lib/src/connection/connection.dart @@ -231,15 +231,18 @@ class Connection { Completer reply = new Completer(); // Make sure we have flushed our socket data and then // block till we get back a frame writer - _socketFlushed - .then( (_) => _frameWriterPool.reserve()) - .then((FrameWriter writer) { - _reservedFrameWriters[ writer.getStreamId()] = writer; - _pendingResponses[ writer.getStreamId() ] = reply; - connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); - writer.writeMessage(message, _socket, compression : _poolConfig.compression); - _socketFlushed = _socket.flush(); - return _socketFlushed; + // We also assign returned future to _socketFlushed to avoid + // race conditions on consecutive calls to _writeMessage. + _socketFlushed = _socketFlushed + .then((_) => _frameWriterPool.reserve()) + .then((FrameWriter writer) { + _reservedFrameWriters[writer.getStreamId()] = writer; + _pendingResponses[writer.getStreamId()] = reply; + connectionLogger.fine( + "[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); + writer.writeMessage(message, _socket, + compression: _poolConfig.compression); + return _socket.flush(); }) .catchError((e, trace) { // Wrap SocketExceptions diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart new file mode 100644 index 0000000..8877f5b --- /dev/null +++ b/test/lib/race_condition_test.dart @@ -0,0 +1,39 @@ +library dart_cassandra_cql.tests.race_condition; + +import "dart:async"; +import "package:unittest/unittest.dart"; +import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; + +main() { + group("Race Conditions:", () { + cql.Client client; + setUp(() async { + client = new cql.Client.fromHostList(['127.0.0.1:9042']); + await client.execute(new cql.Query(''' + CREATE KEYSPACE IF NOT EXISTS cassandra_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; + ''')); + await client.execute(new cql.Query(''' + CREATE TABLE IF NOT EXISTS cassandra_test.test_table ( + id int, + data varchar, + PRIMARY KEY (id) + ) + WITH caching = '{"keys":"NONE", "rows_per_partition":"NONE"}' + ''')); + }); + + tearDown(() { + return client.connectionPool.disconnect(drain: false); + }); + + test("it can handle execution of multiple queries scheduled synchronously", + () { + var f1 = client + .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + var f2 = client + .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + + expect(Future.wait([f1, f2]), completes); + }); + }); +} From a69c0ac7b4f29a160250e410b4ad5c4c8ace8d65 Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Wed, 17 Feb 2016 09:46:48 -0800 Subject: [PATCH 3/8] Updated test to use mock server --- test/lib/race_condition_test.dart | 41 ++++++++++++++----------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart index 8877f5b..ef3245a 100644 --- a/test/lib/race_condition_test.dart +++ b/test/lib/race_condition_test.dart @@ -3,35 +3,30 @@ library dart_cassandra_cql.tests.race_condition; import "dart:async"; import "package:unittest/unittest.dart"; import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; +import "mocks/mocks.dart" as mock; -main() { - group("Race Conditions:", () { - cql.Client client; - setUp(() async { - client = new cql.Client.fromHostList(['127.0.0.1:9042']); - await client.execute(new cql.Query(''' - CREATE KEYSPACE IF NOT EXISTS cassandra_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; - ''')); - await client.execute(new cql.Query(''' - CREATE TABLE IF NOT EXISTS cassandra_test.test_table ( - id int, - data varchar, - PRIMARY KEY (id) - ) - WITH caching = '{"keys":"NONE", "rows_per_partition":"NONE"}' - ''')); - }); +main({bool enableLogger: true}) { + if (enableLogger) { + mock.initLogger(); + } - tearDown(() { - return client.connectionPool.disconnect(drain: false); + const String SERVER_HOST = "127.0.0.1"; + const int SERVER_PORT = 32000; + mock.MockServer server = new mock.MockServer(); + + group("Race Conditions:", () { + setUp(() { + return server.listen(SERVER_HOST, SERVER_PORT); }); test("it can handle execution of multiple queries scheduled synchronously", () { - var f1 = client - .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); - var f2 = client - .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + server.setReplayList(["select_v2.dump", "select_v2.dump"]); + var client = new cql.Client.fromHostList( + ["${SERVER_HOST}:${SERVER_PORT}"], + poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); + var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); + var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); expect(Future.wait([f1, f2]), completes); }); From b5dd7c42364243829fac4ae98652d2ccff46bad0 Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Tue, 16 Feb 2016 12:51:20 -0800 Subject: [PATCH 4/8] Updated dependency on logging package --- pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubspec.yaml b/pubspec.yaml index 013b1b9..33c034d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -6,6 +6,6 @@ homepage: https://github.com/achilleasa/dart_cassandra_cql dependencies: uuid: ">=0.4.1 <0.5.0" collection: ">=1.1.0 <2.0.0" - logging: ">=0.9.1+1 <0.10.0" + logging: "^0.11.2" dev_dependencies: unittest: ">=0.11.0+5 <0.12.0" From a11b60138d7748b2e71d3239fd2db466ebf1739e Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Tue, 16 Feb 2016 17:46:18 -0800 Subject: [PATCH 5/8] Fix for race condition in Connection._writeMessage() in case multiple queries were scheduled synchronously --- .gitignore | 5 ++-- lib/src/connection/connection.dart | 21 +++++++++------- test/lib/race_condition_test.dart | 39 ++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 test/lib/race_condition_test.dart diff --git a/.gitignore b/.gitignore index fe580dd..b96433d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,9 @@ .buildlog .pub/ build/ -packages/ +packages ~packages/ +.packages # Or the files created by dart2js. *.dart.js @@ -22,4 +23,4 @@ pubspec.lock .DS_Store ._* .Spotlight-V100 -.Trashes \ No newline at end of file +.Trashes diff --git a/lib/src/connection/connection.dart b/lib/src/connection/connection.dart index fcb641e..2cb00ab 100644 --- a/lib/src/connection/connection.dart +++ b/lib/src/connection/connection.dart @@ -231,15 +231,18 @@ class Connection { Completer reply = new Completer(); // Make sure we have flushed our socket data and then // block till we get back a frame writer - _socketFlushed - .then( (_) => _frameWriterPool.reserve()) - .then((FrameWriter writer) { - _reservedFrameWriters[ writer.getStreamId()] = writer; - _pendingResponses[ writer.getStreamId() ] = reply; - connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); - writer.writeMessage(message, _socket, compression : _poolConfig.compression); - _socketFlushed = _socket.flush(); - return _socketFlushed; + // We also assign returned future to _socketFlushed to avoid + // race conditions on consecutive calls to _writeMessage. + _socketFlushed = _socketFlushed + .then((_) => _frameWriterPool.reserve()) + .then((FrameWriter writer) { + _reservedFrameWriters[writer.getStreamId()] = writer; + _pendingResponses[writer.getStreamId()] = reply; + connectionLogger.fine( + "[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}"); + writer.writeMessage(message, _socket, + compression: _poolConfig.compression); + return _socket.flush(); }) .catchError((e, trace) { // Wrap SocketExceptions diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart new file mode 100644 index 0000000..8877f5b --- /dev/null +++ b/test/lib/race_condition_test.dart @@ -0,0 +1,39 @@ +library dart_cassandra_cql.tests.race_condition; + +import "dart:async"; +import "package:unittest/unittest.dart"; +import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; + +main() { + group("Race Conditions:", () { + cql.Client client; + setUp(() async { + client = new cql.Client.fromHostList(['127.0.0.1:9042']); + await client.execute(new cql.Query(''' + CREATE KEYSPACE IF NOT EXISTS cassandra_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; + ''')); + await client.execute(new cql.Query(''' + CREATE TABLE IF NOT EXISTS cassandra_test.test_table ( + id int, + data varchar, + PRIMARY KEY (id) + ) + WITH caching = '{"keys":"NONE", "rows_per_partition":"NONE"}' + ''')); + }); + + tearDown(() { + return client.connectionPool.disconnect(drain: false); + }); + + test("it can handle execution of multiple queries scheduled synchronously", + () { + var f1 = client + .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + var f2 = client + .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + + expect(Future.wait([f1, f2]), completes); + }); + }); +} From a4bef43323ef0423e09257d35a59a437d44b209e Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Wed, 17 Feb 2016 09:46:48 -0800 Subject: [PATCH 6/8] Updated test to use mock server --- test/lib/race_condition_test.dart | 41 ++++++++++++++----------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart index 8877f5b..ef3245a 100644 --- a/test/lib/race_condition_test.dart +++ b/test/lib/race_condition_test.dart @@ -3,35 +3,30 @@ library dart_cassandra_cql.tests.race_condition; import "dart:async"; import "package:unittest/unittest.dart"; import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; +import "mocks/mocks.dart" as mock; -main() { - group("Race Conditions:", () { - cql.Client client; - setUp(() async { - client = new cql.Client.fromHostList(['127.0.0.1:9042']); - await client.execute(new cql.Query(''' - CREATE KEYSPACE IF NOT EXISTS cassandra_test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; - ''')); - await client.execute(new cql.Query(''' - CREATE TABLE IF NOT EXISTS cassandra_test.test_table ( - id int, - data varchar, - PRIMARY KEY (id) - ) - WITH caching = '{"keys":"NONE", "rows_per_partition":"NONE"}' - ''')); - }); +main({bool enableLogger: true}) { + if (enableLogger) { + mock.initLogger(); + } - tearDown(() { - return client.connectionPool.disconnect(drain: false); + const String SERVER_HOST = "127.0.0.1"; + const int SERVER_PORT = 32000; + mock.MockServer server = new mock.MockServer(); + + group("Race Conditions:", () { + setUp(() { + return server.listen(SERVER_HOST, SERVER_PORT); }); test("it can handle execution of multiple queries scheduled synchronously", () { - var f1 = client - .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); - var f2 = client - .execute(new cql.Query('SELECT * FROM cassandra_test.test_table')); + server.setReplayList(["select_v2.dump", "select_v2.dump"]); + var client = new cql.Client.fromHostList( + ["${SERVER_HOST}:${SERVER_PORT}"], + poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); + var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); + var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); expect(Future.wait([f1, f2]), completes); }); From a04dd49417f46a1c19b8bbdab1a40298559b4353 Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Wed, 17 Feb 2016 15:13:17 -0800 Subject: [PATCH 7/8] Rebased from master with mock server fix; moved test case to client_test execute group --- test/lib/client_test.dart | 13 ++++++++++++ test/lib/race_condition_test.dart | 34 ------------------------------- 2 files changed, 13 insertions(+), 34 deletions(-) delete mode 100644 test/lib/race_condition_test.dart diff --git a/test/lib/client_test.dart b/test/lib/client_test.dart index 287942a..9d5a3a3 100644 --- a/test/lib/client_test.dart +++ b/test/lib/client_test.dart @@ -297,6 +297,19 @@ main({bool enableLogger : true}) { }) ); }); + + test( + "it can handle execution of multiple queries scheduled synchronously", + () { + server.setReplayList(["select_v2.dump", "select_v2.dump"]); + var client = new cql.Client.fromHostList( + ["${SERVER_HOST}:${SERVER_PORT}"], + poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); + var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); + var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); + + expect(Future.wait([f1, f2]), completes); + }); }); group("query:", () { diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart deleted file mode 100644 index ef3245a..0000000 --- a/test/lib/race_condition_test.dart +++ /dev/null @@ -1,34 +0,0 @@ -library dart_cassandra_cql.tests.race_condition; - -import "dart:async"; -import "package:unittest/unittest.dart"; -import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; -import "mocks/mocks.dart" as mock; - -main({bool enableLogger: true}) { - if (enableLogger) { - mock.initLogger(); - } - - const String SERVER_HOST = "127.0.0.1"; - const int SERVER_PORT = 32000; - mock.MockServer server = new mock.MockServer(); - - group("Race Conditions:", () { - setUp(() { - return server.listen(SERVER_HOST, SERVER_PORT); - }); - - test("it can handle execution of multiple queries scheduled synchronously", - () { - server.setReplayList(["select_v2.dump", "select_v2.dump"]); - var client = new cql.Client.fromHostList( - ["${SERVER_HOST}:${SERVER_PORT}"], - poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); - var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); - var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); - - expect(Future.wait([f1, f2]), completes); - }); - }); -} From 3d9d3bf8a8b1fe4ddda558a715a2b08fdca58d5b Mon Sep 17 00:00:00 2001 From: Anatoly Pulyaevskiy Date: Wed, 17 Feb 2016 15:15:01 -0800 Subject: [PATCH 8/8] Deleted redundant test file --- test/lib/race_condition_test.dart | 34 ------------------------------- 1 file changed, 34 deletions(-) delete mode 100644 test/lib/race_condition_test.dart diff --git a/test/lib/race_condition_test.dart b/test/lib/race_condition_test.dart deleted file mode 100644 index ef3245a..0000000 --- a/test/lib/race_condition_test.dart +++ /dev/null @@ -1,34 +0,0 @@ -library dart_cassandra_cql.tests.race_condition; - -import "dart:async"; -import "package:unittest/unittest.dart"; -import 'package:dart_cassandra_cql/dart_cassandra_cql.dart' as cql; -import "mocks/mocks.dart" as mock; - -main({bool enableLogger: true}) { - if (enableLogger) { - mock.initLogger(); - } - - const String SERVER_HOST = "127.0.0.1"; - const int SERVER_PORT = 32000; - mock.MockServer server = new mock.MockServer(); - - group("Race Conditions:", () { - setUp(() { - return server.listen(SERVER_HOST, SERVER_PORT); - }); - - test("it can handle execution of multiple queries scheduled synchronously", - () { - server.setReplayList(["select_v2.dump", "select_v2.dump"]); - var client = new cql.Client.fromHostList( - ["${SERVER_HOST}:${SERVER_PORT}"], - poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false)); - var f1 = client.execute(new cql.Query('SELECT * from test.type_test')); - var f2 = client.execute(new cql.Query('SELECT * from test.type_test')); - - expect(Future.wait([f1, f2]), completes); - }); - }); -}