Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when executing multiple queries scheduled synchronously. #2

Merged
merged 11 commits into from
Feb 18, 2016
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
.buildlog
.pub/
build/
packages/
packages
~packages/
.packages

# Or the files created by dart2js.
*.dart.js
Expand All @@ -22,4 +23,4 @@ pubspec.lock
.DS_Store
._*
.Spotlight-V100
.Trashes
.Trashes
21 changes: 12 additions & 9 deletions lib/src/connection/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,18 @@ class Connection {
Completer reply = new Completer();
// Make sure we have flushed our socket data and then
// block till we get back a frame writer
_socketFlushed
.then( (_) => _frameWriterPool.reserve())
.then((FrameWriter writer) {
_reservedFrameWriters[ writer.getStreamId()] = writer;
_pendingResponses[ writer.getStreamId() ] = reply;
connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}");
writer.writeMessage(message, _socket, compression : _poolConfig.compression);
_socketFlushed = _socket.flush();
return _socketFlushed;
// We also assign returned future to _socketFlushed to avoid
// race conditions on consecutive calls to _writeMessage.
_socketFlushed = _socketFlushed
.then((_) => _frameWriterPool.reserve())
.then((FrameWriter writer) {
_reservedFrameWriters[writer.getStreamId()] = writer;
_pendingResponses[writer.getStreamId()] = reply;
connectionLogger.fine(
"[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}");
writer.writeMessage(message, _socket,
compression: _poolConfig.compression);
return _socket.flush();
})
.catchError((e, trace) {
// Wrap SocketExceptions
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ homepage: https://github.com/achilleasa/dart_cassandra_cql
dependencies:
uuid: ">=0.4.1 <0.5.0"
collection: ">=1.1.0 <2.0.0"
logging: ">=0.9.1+1 <0.10.0"
logging: "^0.11.2"
dev_dependencies:
unittest: ">=0.11.0+5 <0.12.0"
13 changes: 13 additions & 0 deletions test/lib/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,19 @@ main({bool enableLogger : true}) {
})
);
});

test(
"it can handle execution of multiple queries scheduled synchronously",
() {
server.setReplayList(["select_v2.dump", "select_v2.dump"]);
var client = new cql.Client.fromHostList(
["${SERVER_HOST}:${SERVER_PORT}"],
poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false));
var f1 = client.execute(new cql.Query('SELECT * from test.type_test'));
var f2 = client.execute(new cql.Query('SELECT * from test.type_test'));

expect(Future.wait([f1, f2]), completes);
});
});

group("query:", () {
Expand Down
32 changes: 0 additions & 32 deletions test/lib/connection_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -285,38 +285,6 @@ main({bool enableLogger : true}) {
.catchError(expectAsync(handleError));
});

test("server responds with compressed frame but we have unregistered the codec", () {

// Register codec for the mock server
cql.registerCodec(cql.Compression.SNAPPY.value, new compress.MockCompressionCodec());

cql.PoolConfiguration config = new cql.PoolConfiguration(
protocolVersion : cql.ProtocolVersion.V2
, compression : cql.Compression.SNAPPY
, streamsPerConnection : 1
);

conn = new cql.Connection("conn-0", SERVER_HOST, SERVER_PORT, config : config);

void handleError(e) {
expect(e.message, equals("A compression codec needs to be registered via registerCodec() for type '${cql.Compression.SNAPPY}'"));
}

conn.open()
.then((_) {
Future res = conn.execute(new cql.Query("SELECT * from test.type_test"));

new Timer(new Duration(milliseconds: 10), () {
server.replayFile(0, "void_result_v2.dump");

cql.unregisterCodec(cql.Compression.SNAPPY.value);
});

return res;
})
.catchError(expectAsync(handleError));
});

test("Using a compression codec", () {
server.setReplayList(["void_result_v2.dump"]);

Expand Down
12 changes: 8 additions & 4 deletions test/lib/mocks/mocks.dart
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class MockServer {
List<String> _replayAuthDumpFileList;
String _pathToDumps;
Duration responseDelay;
Future _replayFuture = new Future.value();

MockServer() {
List<String> pathSegments = Platform.script.pathSegments.getRange(0, Platform.script.pathSegments.length - 1).toList();
Expand All @@ -112,6 +113,7 @@ class MockServer {
mockLogger.info("Shutting down server [${_server.address}:${_server.port}]");

List<Future> cleanupFutures = []
..add(_replayFuture)
..addAll(clients.map((Socket client) => new Future.value(client.destroy())))
..add(_server.close().then((_) => new Future.delayed(new Duration(milliseconds:20), () => true)));

Expand Down Expand Up @@ -210,9 +212,11 @@ class MockServer {

}

return responseDelay != null
return _replayFuture.then((_){
return responseDelay != null
? new Future.delayed(responseDelay, onReplay)
: onReplay();
});
}

Future listen(String host, int port) {
Expand Down Expand Up @@ -253,10 +257,10 @@ class MockServer {
writeMessage(client, Opcode.READY.value, streamId : frame.header.streamId);
} else if (_replayAuthDumpFileList != null && !_replayAuthDumpFileList.isEmpty) {
// Respond with the next payload in replay list
replayFile(clients.indexOf(client), _replayAuthDumpFileList.removeAt(0), frame.header.streamId);
_replayFuture = replayFile(clients.indexOf(client), _replayAuthDumpFileList.removeAt(0), frame.header.streamId);
} else if (_replayDumpFileList != null && !_replayDumpFileList.isEmpty) {
// Respond with the next payload in replay list
replayFile(clients.indexOf(client), _replayDumpFileList.removeAt(0), frame.header.streamId);
_replayFuture = replayFile(clients.indexOf(client), _replayDumpFileList.removeAt(0), frame.header.streamId);
}
}

Expand Down Expand Up @@ -298,4 +302,4 @@ class MockAuthenticator extends Authenticator {
return null;
}

}
}
22 changes: 11 additions & 11 deletions test/lib/serialization_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ main({bool enableLogger : true}) {

test("TIMESTAMP", () {

Object input = new DateTime.now();
Object input = new DateTime.fromMillisecondsSinceEpoch(1455746327000);
TypeSpec type = new TypeSpec(DataType.TIMESTAMP);
encoder.writeTypedValue('test', input, typeSpec : type, size : size);
Object output = mock.createDecoder(encoder).readTypedValue(type, size : size);
Expand Down Expand Up @@ -559,7 +559,7 @@ main({bool enableLogger : true}) {
});

test("LIST", () {
Object input = [ new DateTime.now(), new DateTime.now() ];
Object input = [ new DateTime.fromMillisecondsSinceEpoch(1455746327000), new DateTime.fromMillisecondsSinceEpoch(1455746327000) ];
TypeSpec type = new TypeSpec(DataType.LIST, valueSubType : new TypeSpec(DataType.TIMESTAMP));
encoder.writeTypedValue('test', input, typeSpec : type, size : size);
Object output = mock.createDecoder(encoder).readTypedValue(type, size : size);
Expand All @@ -569,8 +569,8 @@ main({bool enableLogger : true}) {

test("MAP", () {
Object input = {
"foo" : new DateTime.now(),
"bar" : new DateTime.now()
"foo" : new DateTime.fromMillisecondsSinceEpoch(1455746327000),
"bar" : new DateTime.fromMillisecondsSinceEpoch(1455746327000)
};
TypeSpec type = new TypeSpec(
DataType.MAP
Expand Down Expand Up @@ -669,7 +669,7 @@ main({bool enableLogger : true}) {

test("TIMESTAMP", () {

Object input = new DateTime.now();
Object input = new DateTime.fromMillisecondsSinceEpoch(1455746327000);
TypeSpec type = new TypeSpec(DataType.TIMESTAMP);
encoder.writeTypedValue('test', input, typeSpec : type, size : size);
Object output = mock.createDecoder(encoder).readTypedValue(type, size : size);
Expand Down Expand Up @@ -890,7 +890,7 @@ main({bool enableLogger : true}) {
});

test("LIST", () {
Object input = [ new DateTime.now(), new DateTime.now() ];
Object input = [ new DateTime.fromMillisecondsSinceEpoch(1455746327000), new DateTime.fromMillisecondsSinceEpoch(1455746327000) ];
TypeSpec type = new TypeSpec(DataType.LIST, valueSubType : new TypeSpec(DataType.TIMESTAMP));
encoder.writeTypedValue('test', input, typeSpec : type, size : size);
Object output = mock.createDecoder(encoder).readTypedValue(type, size : size);
Expand All @@ -900,8 +900,8 @@ main({bool enableLogger : true}) {

test("MAP", () {
Object input = {
"foo" : new DateTime.now(),
"bar" : new DateTime.now()
"foo" : new DateTime.fromMillisecondsSinceEpoch(1455746327000),
"bar" : new DateTime.fromMillisecondsSinceEpoch(1455746327000)
};
TypeSpec type = new TypeSpec(
DataType.MAP
Expand Down Expand Up @@ -930,7 +930,7 @@ main({bool enableLogger : true}) {
]
, "tags" : {
"home" : {
"when" : new DateTime.now()
"when" : new DateTime.fromMillisecondsSinceEpoch(1455746327000)
, "labels" : [ "red", "green", "blue" ]
}
}
Expand All @@ -957,7 +957,7 @@ main({bool enableLogger : true}) {
});

test("TUPLE", () {
Object input = new Tuple.fromIterable(["Test", 3.14, new DateTime.now()]);
Object input = new Tuple.fromIterable(["Test", 3.14, new DateTime.fromMillisecondsSinceEpoch(1455746327000)]);
TypeSpec type = new TypeSpec(DataType.TUPLE)
..tupleFields.add(new TypeSpec(DataType.TEXT))
..tupleFields.add(new TypeSpec(DataType.DOUBLE))
Expand All @@ -971,4 +971,4 @@ main({bool enableLogger : true}) {

});
});
}
}