Skip to content

Commit

Permalink
Merge pull request achilleasa#2 from pulyaevskiy/version-update
Browse files Browse the repository at this point in the history
Race condition when executing multiple queries scheduled synchronously.
  • Loading branch information
achilleasa committed Feb 18, 2016
2 parents a285612 + 3d9d3bf commit fcb858f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
.buildlog
.pub/
build/
packages/
packages
~packages/
.packages

# Or the files created by dart2js.
*.dart.js
Expand All @@ -22,4 +23,4 @@ pubspec.lock
.DS_Store
._*
.Spotlight-V100
.Trashes
.Trashes
21 changes: 12 additions & 9 deletions lib/src/connection/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,18 @@ class Connection {
Completer reply = new Completer();
// Make sure we have flushed our socket data and then
// block till we get back a frame writer
_socketFlushed
.then( (_) => _frameWriterPool.reserve())
.then((FrameWriter writer) {
_reservedFrameWriters[ writer.getStreamId()] = writer;
_pendingResponses[ writer.getStreamId() ] = reply;
connectionLogger.fine("[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}");
writer.writeMessage(message, _socket, compression : _poolConfig.compression);
_socketFlushed = _socket.flush();
return _socketFlushed;
// We also assign returned future to _socketFlushed to avoid
// race conditions on consecutive calls to _writeMessage.
_socketFlushed = _socketFlushed
.then((_) => _frameWriterPool.reserve())
.then((FrameWriter writer) {
_reservedFrameWriters[writer.getStreamId()] = writer;
_pendingResponses[writer.getStreamId()] = reply;
connectionLogger.fine(
"[${connId}] [stream #${writer.getStreamId()}] Sending message of type ${Opcode.nameOf(message.opcode)} (${message.opcode}) ${message}");
writer.writeMessage(message, _socket,
compression: _poolConfig.compression);
return _socket.flush();
})
.catchError((e, trace) {
// Wrap SocketExceptions
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ homepage: https://github.com/achilleasa/dart_cassandra_cql
dependencies:
uuid: ">=0.4.1 <0.5.0"
collection: ">=1.1.0 <2.0.0"
logging: ">=0.9.1+1 <0.10.0"
logging: "^0.11.2"
dev_dependencies:
unittest: ">=0.11.0+5 <0.12.0"
13 changes: 13 additions & 0 deletions test/lib/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,19 @@ main({bool enableLogger : true}) {
})
);
});

test(
"it can handle execution of multiple queries scheduled synchronously",
() {
server.setReplayList(["select_v2.dump", "select_v2.dump"]);
var client = new cql.Client.fromHostList(
["${SERVER_HOST}:${SERVER_PORT}"],
poolConfig: new cql.PoolConfiguration(autoDiscoverNodes: false));
var f1 = client.execute(new cql.Query('SELECT * from test.type_test'));
var f2 = client.execute(new cql.Query('SELECT * from test.type_test'));

expect(Future.wait([f1, f2]), completes);
});
});

group("query:", () {
Expand Down

0 comments on commit fcb858f

Please sign in to comment.