Skip to content

Commit

Permalink
refactor(NODE-6201): cursor to use fetchBatch when current batch is e…
Browse files Browse the repository at this point in the history
…mpty (#4093)
  • Loading branch information
nbbeeken authored Jun 7, 2024
1 parent aa429f8 commit 2a65d43
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 276 deletions.
416 changes: 170 additions & 246 deletions src/cursor/abstract_cursor.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Sort } from '../sort';
import type { MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { AbstractCursor } from './abstract_cursor';

/** @public */
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
Expand Down Expand Up @@ -101,7 +101,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
addStage(stage: Document): this;
addStage<T = Document>(stage: Document): AggregationCursor<T>;
addStage<T = Document>(stage: Document): AggregationCursor<T> {
assertUninitialized(this);
this.throwIfInitialized();
this[kPipeline].push(stage);
return this as unknown as AggregationCursor<T>;
}
Expand Down
34 changes: 17 additions & 17 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { Hint } from '../operations/operation';
import type { ClientSession } from '../sessions';
import { formatSort, type Sort, type SortDirection } from '../sort';
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
import { AbstractCursor } from './abstract_cursor';

/** @internal */
const kFilter = Symbol('filter');
Expand Down Expand Up @@ -163,7 +163,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {

/** Set the cursor query */
filter(filter: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kFilter] = filter;
return this;
}
Expand All @@ -174,7 +174,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param hint - If specified, then the query system will only consider plans using the hinted index.
*/
hint(hint: Hint): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].hint = hint;
return this;
}
Expand All @@ -185,7 +185,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
*/
min(min: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].min = min;
return this;
}
Expand All @@ -196,7 +196,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
*/
max(max: Document): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].max = max;
return this;
}
Expand All @@ -209,7 +209,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - the returnKey value.
*/
returnKey(value: boolean): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].returnKey = value;
return this;
}
Expand All @@ -220,7 +220,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
*/
showRecordId(value: boolean): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].showRecordId = value;
return this;
}
Expand All @@ -232,7 +232,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The modifier value.
*/
addQueryModifier(name: string, value: string | boolean | number | Document): this {
assertUninitialized(this);
this.throwIfInitialized();
if (name[0] !== '$') {
throw new MongoInvalidArgumentError(`${name} is not a valid query modifier`);
}
Expand Down Expand Up @@ -295,7 +295,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The comment attached to this query.
*/
comment(value: string): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].comment = value;
return this;
}
Expand All @@ -306,7 +306,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - Number of milliseconds to wait before aborting the tailed query.
*/
maxAwaitTimeMS(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (typeof value !== 'number') {
throw new MongoInvalidArgumentError('Argument for maxAwaitTimeMS must be a number');
}
Expand All @@ -321,7 +321,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - Number of milliseconds to wait before aborting the query.
*/
override maxTimeMS(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (typeof value !== 'number') {
throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
}
Expand Down Expand Up @@ -371,7 +371,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* ```
*/
project<T extends Document = Document>(value: Document): FindCursor<T> {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].projection = value;
return this as unknown as FindCursor<T>;
}
Expand All @@ -383,7 +383,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param direction - The direction of the sorting (1 or -1).
*/
sort(sort: Sort | string, direction?: SortDirection): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support sorting');
}
Expand All @@ -399,7 +399,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* {@link https://www.mongodb.com/docs/manual/reference/command/find/#find-cmd-allowdiskuse | find command allowDiskUse documentation}
*/
allowDiskUse(allow = true): this {
assertUninitialized(this);
this.throwIfInitialized();

if (!this[kBuiltOptions].sort) {
throw new MongoInvalidArgumentError('Option "allowDiskUse" requires a sort specification');
Expand All @@ -421,7 +421,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
*/
collation(value: CollationOptions): this {
assertUninitialized(this);
this.throwIfInitialized();
this[kBuiltOptions].collation = value;
return this;
}
Expand All @@ -432,7 +432,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The limit for the cursor query.
*/
limit(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support limit');
}
Expand All @@ -451,7 +451,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
* @param value - The skip for the cursor query.
*/
skip(value: number): this {
assertUninitialized(this);
this.throwIfInitialized();
if (this[kBuiltOptions].tailable) {
throw new MongoTailableCursorError('Tailable cursor does not support skip');
}
Expand Down
5 changes: 2 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { PassThrough } from 'stream';
import { setTimeout } from 'timers';

import {
AbstractCursor,
type ChangeStream,
type ChangeStreamOptions,
type Collection,
Expand All @@ -33,9 +32,9 @@ import {
import { delay, filterForCommands } from '../shared';

const initIteratorMode = async (cs: ChangeStream) => {
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await cs.cursor[kInit]();
//@ts-expect-error: private method
await cs.cursor.cursorInit();
await initEvent;
return;
};
Expand Down
6 changes: 2 additions & 4 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as sinon from 'sinon';
import { setTimeout } from 'timers';

import {
AbstractCursor,
type ChangeStream,
type CommandFailedEvent,
type CommandStartedEvent,
Expand All @@ -18,7 +17,6 @@ import {
Timestamp
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { getSymbolFrom } from '../../tools/utils';
import { setupDatabase } from '../shared';

/**
Expand Down Expand Up @@ -72,9 +70,9 @@ function triggerResumableError(
}

const initIteratorMode = async (cs: ChangeStream) => {
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await cs.cursor[kInit]();
//@ts-expect-error: private method
await cs.cursor.cursorInit();
await initEvent;
return;
};
Expand Down
2 changes: 1 addition & 1 deletion test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ describe('Cursor', function () {
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

await client.close();
expect(cursor).to.have.property('killed', true);
expect(cursor).to.have.property('closed', true);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
Expand Down
20 changes: 20 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ describe('class AbstractCursor', function () {
await client.close();
});

it('wraps transform in result checking for each map call', async () => {
const control = { functionThatShouldReturnNull: 0 };
const makeCursor = () => {
const cursor = collection.find();
cursor
.map(doc => (control.functionThatShouldReturnNull === 0 ? null : doc))
.map(doc => (control.functionThatShouldReturnNull === 1 ? null : doc))
.map(doc => (control.functionThatShouldReturnNull === 2 ? null : doc));
return cursor;
};

for (const testFn of [0, 1, 2]) {
control.functionThatShouldReturnNull = testFn;
const error = await makeCursor()
.toArray()
.catch(error => error);
expect(error).to.be.instanceOf(MongoAPIError);
}
});

context('toArray() with custom transforms', function () {
for (const value of falseyValues) {
it(`supports mapping to falsey value '${inspect(value)}'`, async function () {
Expand Down
12 changes: 12 additions & 0 deletions test/integration/node-specific/cursor_async_iterator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ describe('Cursor Async Iterator Tests', function () {
}
});

it('should not iterate if closed immediately', async function () {
const cursor = collection.find();
await cursor.close();

let count = 0;
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) count++;

expect(count).to.equal(0);
expect(cursor.closed).to.be.true;
});

it('should properly stop when cursor is closed', async function () {
const cursor = collection.find();

Expand Down
7 changes: 4 additions & 3 deletions test/tools/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AssertionError, expect } from 'chai';

import {
AbstractCursor,
type ChangeStream,
Collection,
CommandStartedEvent,
Db,
Expand Down Expand Up @@ -240,9 +241,9 @@ operations.set('createChangeStream', async ({ entities, operation }) => {
}

const { pipeline, ...args } = operation.arguments!;
const changeStream = watchable.watch(pipeline, args);
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
await changeStream.cursor[kInit]();
const changeStream: ChangeStream = watchable.watch(pipeline, args);
//@ts-expect-error: private method
await changeStream.cursor.cursorInit();
return changeStream;
});

Expand Down

0 comments on commit 2a65d43

Please sign in to comment.