From dc05e4658eeec2707b9795024cdf9b1945ecbcdd Mon Sep 17 00:00:00 2001 From: Giorgio Date: Mon, 29 Mar 2021 12:36:51 +0200 Subject: [PATCH] Fiixed batchSize 0 on cursor --- CHANGELOG.md | 7 ++ example/manual/crud/cursor.dart | 95 +++++++++++++++++++ .../wrapper/bulk/unordered_bulk.dart | 15 ++- lib/src/database/cursor/modern_cursor.dart | 42 +++++++- pubspec.yaml | 2 +- 5 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 example/manual/crud/cursor.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index ebd179b5..412a4c8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## Recent change notes +### 0.5.0 + +- Added batchSize variable to ModernCursor class +- Added cursor example +- Fixed index count problem in unordered bulk write operations +- Fixed setting batchSize to 0 in Cursor operaation. + ### 0.5.0-beta - Added tls client and server certificate management diff --git a/example/manual/crud/cursor.dart b/example/manual/crud/cursor.dart new file mode 100644 index 00000000..32da1bd5 --- /dev/null +++ b/example/manual/crud/cursor.dart @@ -0,0 +1,95 @@ +// Example Tested on release 0.7.0 +import 'package:mongo_dart/mongo_dart.dart'; +import 'package:mongo_dart/src/database/cursor/modern_cursor.dart'; + +const dbName = 'mongo-dart-example'; +const dbAddress = '127.0.0.1'; + +const DefaultUri = 'mongodb://$dbAddress:27017/$dbName'; + +void main() async { + var db = Db(DefaultUri); + await db.open(); + + Future cleanupDatabase() async { + await db.close(); + } + + if (!db.masterConnection.serverCapabilities.supportsOpMsg) { + return; + } + + var collectionName = 'cursor'; + await db.dropCollection(collectionName); + var collection = db.collection(collectionName); + + var documents = >[ + for (var idx = 0; idx < 1000; idx++) {'key': idx} + ]; + + var ret = await collection.insertMany(documents); + if (!ret.isSuccess) { + print('Error detected in record insertion'); + } + + /// The batch size refers to the number of records that are fetched + /// togheter from the server. + /// The cursor class store the records fetched and returns one by one + /// when calling the nextObject() method. + /// + /// The system differentiate between two different kinds of "fetch": + /// - the first batch + /// - all the other batches + /// + /// You can set the size of the first batch setting the operation + /// option (in this example "100"). The default is 101. + /// For all subsequent fetches the cursor batch size will be used, + /// You can (if needed) set it as a parameter when creating the cursor + /// (here 150) or changing it later, setting the cursor.batchSize + /// variable (here 200). + /// + /// By default the cursor batch size is set equal to the operation one. + /// In our example, without setting 150 and then 200 it would heve been 100). + /// Please note. This behavior differs from the mongodb shell, + /// where the first bach has a size of 101 and the following + /// read (if not set otherwise) has a size, i.e. reads all other + /// records in one batch. + /// + /// The batch size must be a positive integer. + /// The exception is setting zero in the operation + /// In this case the meaning is: + /// "Simply prepare the cursor for further reading". + /// This can be useful if you use the low level operation `execute()` + /// and then a subsequent `getMore` command, otherwise it is transparent in + /// the `nextObject()` call. + var cursor = ModernCursor( + FindOperation(collection, + filter: { + 'key': {r'$gte': 0} + }, + findOptions: FindOptions(batchSize: 100)), + batchSize: 150); + + var sum = 0; + // Just an example, normally it is not needed. + cursor.batchSize = 200; + + while (true) { + var doc = await cursor.nextObject(); + if (doc == null) { + await cursor.close(); + break; + } + + // do something with "doc" + var idx = doc['key'] as int ?? 0; + if (idx > 10) { + continue; + } + sum += idx; + } + + print('The sum of the first 10 records is $sum'); // 55; + + await cleanupDatabase(); +} diff --git a/lib/src/database/commands/query_and_write_operation_commands/wrapper/bulk/unordered_bulk.dart b/lib/src/database/commands/query_and_write_operation_commands/wrapper/bulk/unordered_bulk.dart index 6e292303..408bc730 100644 --- a/lib/src/database/commands/query_and_write_operation_commands/wrapper/bulk/unordered_bulk.dart +++ b/lib/src/database/commands/query_and_write_operation_commands/wrapper/bulk/unordered_bulk.dart @@ -117,8 +117,17 @@ class UnorderedBulk extends Bulk { @override List> getBulkInputOrigins() => >[ - ...splitInputOrigins(insertCommandsOrigin, insertCommand.length), - ...splitInputOrigins(updateCommandsOrigin, updateCommand.length), - ...splitInputOrigins(deleteCommandsOrigin, deleteCommand.length), + if (insertCommand[keyInsertArgument] != null && + (insertCommand[keyInsertArgument] as List).isNotEmpty) + ...splitInputOrigins(insertCommandsOrigin, + (insertCommand[keyInsertArgument] as List).length), + if (updateCommand[keyUpdateArgument] != null && + (updateCommand[keyUpdateArgument] as List).isNotEmpty) + ...splitInputOrigins(updateCommandsOrigin, + (updateCommand[keyUpdateArgument] as List).length), + if (deleteCommand[keyDeleteArgument] != null && + (deleteCommand[keyDeleteArgument] as List).isNotEmpty) + ...splitInputOrigins(deleteCommandsOrigin, + (deleteCommand[keyDeleteArgument] as List).length), ]; } diff --git a/lib/src/database/cursor/modern_cursor.dart b/lib/src/database/cursor/modern_cursor.dart index 456c3caf..2f12a73c 100644 --- a/lib/src/database/cursor/modern_cursor.dart +++ b/lib/src/database/cursor/modern_cursor.dart @@ -11,15 +11,21 @@ import 'package:mongo_dart/src/database/commands/aggreagation_commands/wrapper/c import 'package:mongo_dart/src/database/commands/aggreagation_commands/wrapper/change_stream/change_stream_operation.dart'; import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/get_more_command/get_more_command.dart'; import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/find_operation/find_operation.dart'; +import 'package:mongo_dart/src/database/commands/query_and_write_operation_commands/get_more_command/get_more_options.dart'; import 'package:mongo_dart/src/database/utils/map_keys.dart'; import '../../../mongo_dart.dart'; +const defaultBatchSize = 101; + typedef MonadicBlock = void Function(Map value); class ModernCursor { ModernCursor(this.operation, - {this.checksumPresent, this.moreToCome, this.exhaustAllowed}) + {this.checksumPresent, + this.moreToCome, + this.exhaustAllowed, + int batchSize}) : collection = operation.collection, db = operation.collection?.db ?? operation.db { if (operation is FindOperation && collection == null) { @@ -31,6 +37,15 @@ class ModernCursor { } else if (operation is ChangeStreamOperation) { isChangeStream = tailable = awaitData = true; } + var internalBatchSize = batchSize; + if (internalBatchSize == null) { + var operationBatchSize = operation.options[keyBatchSize] as int; + if (operationBatchSize != null && operationBatchSize != 0) { + internalBatchSize = operationBatchSize; + } + } + + _batchSize = internalBatchSize ?? defaultBatchSize; } /// This method allows the creation of the cursor from the Id and the @@ -61,6 +76,7 @@ class ModernCursor { if (isChangeStream) { tailable = awaitData = true; } + _batchSize = defaultBatchSize; } State state = State.INIT; @@ -72,6 +88,16 @@ class ModernCursor { bool awaitData = false; bool isChangeStream = false; + // Batch size for the getMore command if different from the default + int _batchSize; + int get batchSize => _batchSize; + set batchSize(int _value) { + if (_value < 0) { + throw MongoDartError('Batch size must be a non negative value'); + } + _batchSize = _value; + } + // in case of collection agnostic commands (aggregate) is the name // of the collecton as returns from the first batch (taken from ns) String collectionName; @@ -147,8 +173,13 @@ class ModernCursor { return _getNextItem(); } + var justPrepareCursor = false; Map result; if (state == State.INIT) { + if (operation.options[keyBatchSize] != null && + operation.options[keyBatchSize] == 0) { + justPrepareCursor = true; + } result = await operation.execute(); state = State.OPEN; } else if (state == State.OPEN) { @@ -156,7 +187,9 @@ class ModernCursor { return _serverSideCursorClose(); } var command = GetMoreCommand(collection, cursorId, - db: db, collectionName: collectionName); + db: db, + collectionName: collectionName, + getMoreOptions: GetMoreOptions(batchSize: _batchSize)); result = await command.execute(); } if (result[keyOk] == 0.0) { @@ -168,6 +201,11 @@ class ModernCursor { cursorId = cursorMap == null ? 0 : BsonLong(cursorMap[keyId] ?? 0); // The result map returns last records while setting cursorId to zero. extractCursorData(result); + // batch size for "first batch" was 0, no data returned. + // Just prepared the cursor for further fetching + if (justPrepareCursor) { + return nextObject(); + } if (items.isNotEmpty) { return _getNextItem(); } diff --git a/pubspec.yaml b/pubspec.yaml index 2d0810e7..1f869489 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: mongo_dart -version: 0.5.0-rc.1 +version: 0.5.0 description: MongoDB driver, implemented in pure Dart. All CRUD operations, aggregation pipeline and more! homepage: https://github.com/mongo-dart/mongo_dart environment: