Skip to content

Commit

Permalink
added preferBiggerTcpPackets option
Browse files Browse the repository at this point in the history
  • Loading branch information
Achilleas Anagnostopoulos committed Apr 17, 2015
1 parent ae860e4 commit a45a5fa
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 5 deletions.
1 change: 1 addition & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ To instanciate a connection pool you need to provide a [PoolConfiguration](https
| autoDiscoverNodes | true | If this flag is set to ```true```, the connection pool will listen for server topology change events and automatically update the pool when new nodes come online/go offline. If set to ```false```, the pool will only process UP/DOWN events for nodes already in the pool
| authenticator | null | An authentication provider instance implementing the [Authenticator](https://github.com/achilleasa/dart_cassandra_cql/blob/master/lib/src/protocol/authentication/authenticator.dart) interface or **null** if no authentication is required. For more information see the section on [authentication](#cassandra-authentication)
| compression | null | The compression algorithm to be used with Cassandra nodes. Its value should be one of the [Compression](https://github.com/achilleasa/dart_cassandra_cql/blob/master/lib/src/types/enums/compression.dart) enums or **null** if no compression should be used. For more information see the section on [compression](#compression)
| preferBiggerTcpPackets | false | Join together frame data before piping them to the underlying TCP socket. Enabling this option will improve performance at the expense of slightly higher memory consumption

### Simple connection pool

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.1.4 (April 17, 2015)

Added the **preferBiggerTcpPackets** option (defaults to false). When enabled, the driver will
join together protocol frame chunks before piping them to the underlying TCP socket. This option
will improve performance at the expense of slightly higher memory consumption.

## 0.1.3 (December 20, 2014)

Improved support for compression codecs
Expand Down
2 changes: 1 addition & 1 deletion lib/src/connection/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class Connection {
// Initialize our writer pool and set the reservation timeout
_reservedFrameWriters.clear();
_frameWriterPool = new AsyncQueue<FrameWriter>.from(
new List<FrameWriter>.generate(_poolConfig.streamsPerConnection, (int id) => new FrameWriter(id, _poolConfig.protocolVersion))
new List<FrameWriter>.generate(_poolConfig.streamsPerConnection, (int id) => new FrameWriter(id, _poolConfig.protocolVersion, preferBiggerTcpPackets : _poolConfig.preferBiggerTcpPackets))
);
_frameWriterPool.reservationTimeout = _poolConfig.streamReservationTimeout;

Expand Down
8 changes: 8 additions & 0 deletions lib/src/connection/pool_configuration.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class PoolConfiguration {
*/
Authenticator authenticator;

/**
* Setting the [preferBiggerTcpPackets] option will join together
* protocol frame data before piping them to the underlying TCP socket.
* This option will improve performance at the expense of slightly higher memory consumption
*/
bool preferBiggerTcpPackets;

PoolConfiguration({
String this.cqlVersion : "3.0.0"
, ProtocolVersion this.protocolVersion : ProtocolVersion.V2
Expand All @@ -54,6 +61,7 @@ class PoolConfiguration {
, bool this.autoDiscoverNodes : true
, Compression this.compression
, Authenticator this.authenticator
, bool this.preferBiggerTcpPackets : false
}) {

validate();
Expand Down
5 changes: 3 additions & 2 deletions lib/src/protocol/frame/frame_writer.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
part of dart_cassandra_cql.protocol;

class FrameWriter {
bool preferBiggerTcpPackets;
TypeEncoder _typeEncoder;
FrameHeader _header = new FrameHeader();

FrameWriter(int streamId, ProtocolVersion protocolVersion, {TypeEncoder withEncoder : null}) {
FrameWriter(int streamId, ProtocolVersion protocolVersion, {TypeEncoder withEncoder : null, bool this.preferBiggerTcpPackets : false}) {
_header
..version = protocolVersion == ProtocolVersion.V2 ? HeaderVersion.REQUEST_V2 : HeaderVersion.REQUEST_V3
..flags = 0
Expand Down Expand Up @@ -85,7 +86,7 @@ class FrameWriter {
//_typeEncoder.dumpToFile("frame-out.dump");

// Pipe everything to the sink
_typeEncoder.writer.pipe(targetSink);
_typeEncoder.writer.pipe(targetSink, preferBiggerTcpPackets : preferBiggerTcpPackets);
}
}

11 changes: 9 additions & 2 deletions lib/src/stream/chunked_output_writer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,20 @@ class ChunkedOutputWriter {

/**
* Pipe all buffered chunks to [destination] and clear the buffer queue
* [preferBiggerTcpPackets] may be set to true to pre-join the chunks and pipe them
* as a contiguous chunk. This reduces the number of transmitted TCP packets
* and should improve performance at the expense of a slightly higher memory usage
*/

void pipe(Sink destination) {
void pipe(Sink destination, {bool preferBiggerTcpPackets : false}) {
if (destination == null) {
return;
}
_bufferedChunks.forEach((Uint8List block) => destination.add(block));
if( preferBiggerTcpPackets ){
destination.add(joinChunks());
} else {
_bufferedChunks.forEach((Uint8List block) => destination.add(block));
}
clear();
}

Expand Down
19 changes: 19 additions & 0 deletions test/lib/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,25 @@ main({bool enableLogger : true}) {
).listen(expectAsync(streamCallback, count : 10, max : 10));
});

test("process streamed rows (preferBiggerTcpPackets)", () {
server.setReplayList([
"stream_v2_1of3.dump"
, "stream_v2_2of3.dump"
, "stream_v2_3of3.dump"
]);
client = new cql.Client.fromHostList([ "${SERVER_HOST}:${SERVER_PORT}"]
, poolConfig : new cql.PoolConfiguration(autoDiscoverNodes : false, preferBiggerTcpPackets : true)
);

void streamCallback(Map<String, Object> row) {
}

client.stream(
new cql.Query("SELECT * FROM test.page_view_counts")
, pageSize: 4
).listen(expectAsync(streamCallback, count : 10, max : 10));
});

test("pause/resume", () {
server.setReplayList([
"stream_v2_1of3.dump"
Expand Down

0 comments on commit a45a5fa

Please sign in to comment.