Skip to content

Commit

Permalink
Fiixed batchSize 0 on cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
Giorgio committed Mar 29, 2021
1 parent 5bb73a8 commit dc05e46
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Recent change notes

### 0.5.0

- Added batchSize variable to ModernCursor class
- Added cursor example
- Fixed index count problem in unordered bulk write operations
- Fixed setting batchSize to 0 in Cursor operaation.

### 0.5.0-beta

- Added tls client and server certificate management
Expand Down
95 changes: 95 additions & 0 deletions example/manual/crud/cursor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Example Tested on release 0.7.0
import 'package:mongo_dart/mongo_dart.dart';
import 'package:mongo_dart/src/database/cursor/modern_cursor.dart';

const dbName = 'mongo-dart-example';
const dbAddress = '127.0.0.1';

const DefaultUri = 'mongodb://$dbAddress:27017/$dbName';

void main() async {
var db = Db(DefaultUri);
await db.open();

Future cleanupDatabase() async {
await db.close();
}

if (!db.masterConnection.serverCapabilities.supportsOpMsg) {
return;
}

var collectionName = 'cursor';
await db.dropCollection(collectionName);
var collection = db.collection(collectionName);

var documents = <Map<String, dynamic>>[
for (var idx = 0; idx < 1000; idx++) {'key': idx}
];

var ret = await collection.insertMany(documents);
if (!ret.isSuccess) {
print('Error detected in record insertion');
}

/// The batch size refers to the number of records that are fetched
/// togheter from the server.
/// The cursor class store the records fetched and returns one by one
/// when calling the nextObject() method.
///
/// The system differentiate between two different kinds of "fetch":
/// - the first batch
/// - all the other batches
///
/// You can set the size of the first batch setting the operation
/// option (in this example "100"). The default is 101.
/// For all subsequent fetches the cursor batch size will be used,
/// You can (if needed) set it as a parameter when creating the cursor
/// (here 150) or changing it later, setting the cursor.batchSize
/// variable (here 200).
///
/// By default the cursor batch size is set equal to the operation one.
/// In our example, without setting 150 and then 200 it would heve been 100).
/// Please note. This behavior differs from the mongodb shell,
/// where the first bach has a size of 101 and the following
/// read (if not set otherwise) has a <no-limit> size, i.e. reads all other
/// records in one batch.
///
/// The batch size must be a positive integer.
/// The exception is setting zero in the operation
/// In this case the meaning is:
/// "Simply prepare the cursor for further reading".
/// This can be useful if you use the low level operation `execute()`
/// and then a subsequent `getMore` command, otherwise it is transparent in
/// the `nextObject()` call.
var cursor = ModernCursor(
FindOperation(collection,
filter: {
'key': {r'$gte': 0}
},
findOptions: FindOptions(batchSize: 100)),
batchSize: 150);

var sum = 0;
// Just an example, normally it is not needed.
cursor.batchSize = 200;

while (true) {
var doc = await cursor.nextObject();
if (doc == null) {
await cursor.close();
break;
}

// do something with "doc"
var idx = doc['key'] as int ?? 0;
if (idx > 10) {
continue;
}
sum += idx;
}

print('The sum of the first 10 records is $sum'); // 55;

await cleanupDatabase();
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,17 @@ class UnorderedBulk extends Bulk {

@override
List<Map<int, int>> getBulkInputOrigins() => <Map<int, int>>[
...splitInputOrigins(insertCommandsOrigin, insertCommand.length),
...splitInputOrigins(updateCommandsOrigin, updateCommand.length),
...splitInputOrigins(deleteCommandsOrigin, deleteCommand.length),
if (insertCommand[keyInsertArgument] != null &&
(insertCommand[keyInsertArgument] as List).isNotEmpty)
...splitInputOrigins(insertCommandsOrigin,
(insertCommand[keyInsertArgument] as List).length),
if (updateCommand[keyUpdateArgument] != null &&
(updateCommand[keyUpdateArgument] as List).isNotEmpty)
...splitInputOrigins(updateCommandsOrigin,
(updateCommand[keyUpdateArgument] as List).length),
if (deleteCommand[keyDeleteArgument] != null &&
(deleteCommand[keyDeleteArgument] as List).isNotEmpty)
...splitInputOrigins(deleteCommandsOrigin,
(deleteCommand[keyDeleteArgument] as List).length),
];
}
42 changes: 40 additions & 2 deletions lib/src/database/cursor/modern_cursor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@ import 'package:mongo_dart/src/database/commands/aggreagation_commands/wrapper/c
import 'package:mongo_dart/src/database/commands/aggreagation_commands/wrapper/change_stream/change_stream_operation.dart';
import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/get_more_command/get_more_command.dart';
import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/find_operation/find_operation.dart';
import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/get_more_command/get_more_options.dart';
import 'package:mongo_dart/src/database/utils/map_keys.dart';

import '../../../mongo_dart.dart';

const defaultBatchSize = 101;

typedef MonadicBlock = void Function(Map<String, dynamic> value);

class ModernCursor {
ModernCursor(this.operation,
{this.checksumPresent, this.moreToCome, this.exhaustAllowed})
{this.checksumPresent,
this.moreToCome,
this.exhaustAllowed,
int batchSize})
: collection = operation.collection,
db = operation.collection?.db ?? operation.db {
if (operation is FindOperation && collection == null) {
Expand All @@ -31,6 +37,15 @@ class ModernCursor {
} else if (operation is ChangeStreamOperation) {
isChangeStream = tailable = awaitData = true;
}
var internalBatchSize = batchSize;
if (internalBatchSize == null) {
var operationBatchSize = operation.options[keyBatchSize] as int;
if (operationBatchSize != null && operationBatchSize != 0) {
internalBatchSize = operationBatchSize;
}
}

_batchSize = internalBatchSize ?? defaultBatchSize;
}

/// This method allows the creation of the cursor from the Id and the
Expand Down Expand Up @@ -61,6 +76,7 @@ class ModernCursor {
if (isChangeStream) {
tailable = awaitData = true;
}
_batchSize = defaultBatchSize;
}

State state = State.INIT;
Expand All @@ -72,6 +88,16 @@ class ModernCursor {
bool awaitData = false;
bool isChangeStream = false;

// Batch size for the getMore command if different from the default
int _batchSize;
int get batchSize => _batchSize;
set batchSize(int _value) {
if (_value < 0) {
throw MongoDartError('Batch size must be a non negative value');
}
_batchSize = _value;
}

// in case of collection agnostic commands (aggregate) is the name
// of the collecton as returns from the first batch (taken from ns)
String collectionName;
Expand Down Expand Up @@ -147,16 +173,23 @@ class ModernCursor {
return _getNextItem();
}

var justPrepareCursor = false;
Map<String, Object> result;
if (state == State.INIT) {
if (operation.options[keyBatchSize] != null &&
operation.options[keyBatchSize] == 0) {
justPrepareCursor = true;
}
result = await operation.execute();
state = State.OPEN;
} else if (state == State.OPEN) {
if (cursorId.data == 0) {
return _serverSideCursorClose();
}
var command = GetMoreCommand(collection, cursorId,
db: db, collectionName: collectionName);
db: db,
collectionName: collectionName,
getMoreOptions: GetMoreOptions(batchSize: _batchSize));
result = await command.execute();
}
if (result[keyOk] == 0.0) {
Expand All @@ -168,6 +201,11 @@ class ModernCursor {
cursorId = cursorMap == null ? 0 : BsonLong(cursorMap[keyId] ?? 0);
// The result map returns last records while setting cursorId to zero.
extractCursorData(result);
// batch size for "first batch" was 0, no data returned.
// Just prepared the cursor for further fetching
if (justPrepareCursor) {
return nextObject();
}
if (items.isNotEmpty) {
return _getNextItem();
}
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: mongo_dart
version: 0.5.0-rc.1
version: 0.5.0
description: MongoDB driver, implemented in pure Dart. All CRUD operations, aggregation pipeline and more!
homepage: https://github.com/mongo-dart/mongo_dart
environment:
Expand Down

0 comments on commit dc05e46

Please sign in to comment.