diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index c4f349500a1..8b85b5ea7e3 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -3,12 +3,10 @@ import { Readable, Transform } from 'stream'; import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson'; import { CursorResponse } from '../cmap/wire_protocol/responses'; import { - type AnyError, MongoAPIError, MongoCursorExhaustedError, MongoCursorInUseError, MongoInvalidArgumentError, - MongoNetworkError, MongoRuntimeError, MongoTailableCursorError } from '../error'; @@ -45,8 +43,6 @@ const kInitialized = Symbol('initialized'); const kClosed = Symbol('closed'); /** @internal */ const kKilled = Symbol('killed'); -/** @internal */ -const kInit = Symbol('kInit'); /** @public */ export const CURSOR_FLAGS = [ @@ -137,13 +133,13 @@ export abstract class AbstractCursor< CursorEvents extends AbstractCursorEvents = AbstractCursorEvents > extends TypedEventEmitter { /** @internal */ - [kId]: Long | null; + private [kId]: Long | null; /** @internal */ - [kSession]: ClientSession; + private [kSession]: ClientSession; /** @internal */ - [kServer]?: Server; + private [kServer]?: Server; /** @internal */ - [kNamespace]: MongoDBNamespace; + private [kNamespace]: MongoDBNamespace; /** @internal */ [kDocuments]: { length: number; @@ -153,23 +149,23 @@ export abstract class AbstractCursor< push(item: TSchema): void; }; /** @internal */ - [kClient]: MongoClient; + private [kClient]: MongoClient; /** @internal */ - [kTransform]?: (doc: TSchema) => any; + private [kTransform]?: (doc: TSchema) => any; /** @internal */ - [kInitialized]: boolean; + private [kInitialized]: boolean; /** @internal */ - [kClosed]: boolean; + private [kClosed]: boolean; /** @internal */ - [kKilled]: boolean; + private [kKilled]: boolean; /** @internal */ - [kOptions]: InternalAbstractCursorOptions; + private [kOptions]: InternalAbstractCursorOptions; /** @event */ static readonly CLOSE = 'close' as const; /** @internal */ - constructor( + protected constructor( client: MongoClient, namespace: MongoDBNamespace, options: AbstractCursorOptions = {} @@ -309,33 +305,26 @@ export abstract class AbstractCursor< try { while (true) { + if (this.killed) { + return; + } + + if (this.closed && this[kDocuments].length === 0) { + return; + } + + if (this[kId] != null && this.isDead && this[kDocuments].length === 0) { + return; + } + const document = await this.next(); - // Intentional strict null check, because users can map cursors to falsey values. - // We allow mapping to all values except for null. // eslint-disable-next-line no-restricted-syntax if (document === null) { - if (!this.closed) { - const message = - 'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.'; - - try { - await cleanupCursor(this, { needsToEmitClosed: true }); - } catch (error) { - squashError(error); - } - - throw new MongoAPIError(message); - } - break; + return; } yield document; - - if (this[kId] === Long.ZERO) { - // Cursor exhausted - break; - } } } finally { // Only close the cursor if it has not already been closed. This finally clause handles @@ -385,11 +374,14 @@ export abstract class AbstractCursor< return false; } - if (this[kDocuments].length !== 0) { - return true; - } + do { + if (this[kDocuments].length !== 0) { + return true; + } + await this.fetchBatch(); + } while (!this.isDead || this[kDocuments].length !== 0); - return await next(this, { blocking: true, transform: false, shift: false }); + return false; } /** Get the next available document from the cursor, returns null if no more documents are available. */ @@ -398,7 +390,16 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return await next(this, { blocking: true, transform: true, shift: true }); + do { + const doc = this[kDocuments].shift(); + if (doc != null) { + if (this[kTransform] != null) return await this.transformDocument(doc); + return doc; + } + await this.fetchBatch(); + } while (!this.isDead || this[kDocuments].length !== 0); + + return null; } /** @@ -409,7 +410,21 @@ export abstract class AbstractCursor< throw new MongoCursorExhaustedError(); } - return await next(this, { blocking: false, transform: true, shift: true }); + let doc = this[kDocuments].shift(); + if (doc != null) { + if (this[kTransform] != null) return await this.transformDocument(doc); + return doc; + } + + await this.fetchBatch(); + + doc = this[kDocuments].shift(); + if (doc != null) { + if (this[kTransform] != null) return await this.transformDocument(doc); + return doc; + } + + return null; } /** @@ -433,9 +448,7 @@ export abstract class AbstractCursor< } async close(): Promise { - const needsToEmitClosed = !this[kClosed]; - this[kClosed] = true; - await cleanupCursor(this, { needsToEmitClosed }); + await this.cleanup(); } /** @@ -459,7 +472,7 @@ export abstract class AbstractCursor< * @param value - The flag boolean value. */ addCursorFlag(flag: CursorFlag, value: boolean): this { - assertUninitialized(this); + this.throwIfInitialized(); if (!CURSOR_FLAGS.includes(flag)) { throw new MongoInvalidArgumentError(`Flag ${flag} is not one of ${CURSOR_FLAGS}`); } @@ -515,7 +528,7 @@ export abstract class AbstractCursor< * @param transform - The mapping transformation method. */ map(transform: (doc: TSchema) => T): AbstractCursor { - assertUninitialized(this); + this.throwIfInitialized(); const oldTransform = this[kTransform] as (doc: TSchema) => TSchema; // TODO(NODE-3283): Improve transform typing if (oldTransform) { this[kTransform] = doc => { @@ -534,7 +547,7 @@ export abstract class AbstractCursor< * @param readPreference - The new read preference for the cursor. */ withReadPreference(readPreference: ReadPreferenceLike): this { - assertUninitialized(this); + this.throwIfInitialized(); if (readPreference instanceof ReadPreference) { this[kOptions].readPreference = readPreference; } else if (typeof readPreference === 'string') { @@ -552,7 +565,7 @@ export abstract class AbstractCursor< * @param readPreference - The new read preference for the cursor. */ withReadConcern(readConcern: ReadConcernLike): this { - assertUninitialized(this); + this.throwIfInitialized(); const resolvedReadConcern = ReadConcern.fromOptions({ readConcern }); if (resolvedReadConcern) { this[kOptions].readConcern = resolvedReadConcern; @@ -567,7 +580,7 @@ export abstract class AbstractCursor< * @param value - Number of milliseconds to wait before aborting the query. */ maxTimeMS(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (typeof value !== 'number') { throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number'); } @@ -582,7 +595,7 @@ export abstract class AbstractCursor< * @param value - The number of documents to return per batch. See {@link https://www.mongodb.com/docs/manual/reference/command/find/|find command documentation}. */ batchSize(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (this[kOptions].tailable) { throw new MongoTailableCursorError('Tailable cursor does not support batchSize'); } @@ -652,7 +665,7 @@ export abstract class AbstractCursor< * operation. We cannot refactor to use the abstract _initialize method without * a significant refactor. */ - async [kInit](): Promise { + private async cursorInit(): Promise { try { const state = await this._initialize(this[kSession]); const response = state.response; @@ -663,24 +676,14 @@ export abstract class AbstractCursor< this[kDocuments] = response; } else if (response.cursor) { // TODO(NODE-2674): Preserve int64 sent from MongoDB - this[kId] = - typeof response.cursor.id === 'number' - ? Long.fromNumber(response.cursor.id) - : typeof response.cursor.id === 'bigint' - ? Long.fromBigInt(response.cursor.id) - : response.cursor.id; - - if (response.cursor.ns) { - this[kNamespace] = ns(response.cursor.ns); - } - + this[kId] = getCursorId(response); + if (response.cursor.ns) this[kNamespace] = ns(response.cursor.ns); this[kDocuments].pushMany(response.cursor.firstBatch); } - // When server responses return without a cursor document, we close this cursor - // and return the raw server response. This is often the case for explain commands - // for example if (this[kId] == null) { + // When server responses return without a cursor document, we close this cursor + // and return the raw server response. This is the case for explain commands this[kId] = Long.ZERO; // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter this[kDocuments].push(state.response as TODO_NODE_3286); @@ -691,130 +694,56 @@ export abstract class AbstractCursor< } catch (error) { // the cursor is now initialized, even if an error occurred this[kInitialized] = true; - await cleanupCursor(this, { error }); + await this.cleanup(error); throw error; } if (this.isDead) { - await cleanupCursor(this, undefined); + await this.cleanup(); } return; } -} -/** - * @param cursor - the cursor on which to call `next` - * @param blocking - a boolean indicating whether or not the cursor should `block` until data - * is available. Generally, this flag is set to `false` because if the getMore returns no documents, - * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and - * `tryNext`, for example) blocking is necessary because a getMore returning no documents does - * not indicate the end of the cursor. - * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists) - * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means - * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer. - */ -async function next( - cursor: AbstractCursor, - { - blocking, - transform, - shift - }: { - blocking: boolean; - transform: boolean; - shift: false; - } -): Promise; - -async function next( - cursor: AbstractCursor, - { - blocking, - transform, - shift - }: { - blocking: boolean; - transform: boolean; - shift: true; - } -): Promise; - -async function next( - cursor: AbstractCursor, - { - blocking, - transform, - shift - }: { - blocking: boolean; - transform: boolean; - shift: boolean; - } -): Promise { - if (cursor.closed) { - if (!shift) return false; - return null; - } - - do { - if (cursor[kId] == null) { - // All cursors must operate within a session, one must be made implicitly if not explicitly provided - await cursor[kInit](); - } - - if (cursor[kDocuments].length !== 0) { - if (!shift) return true; - const doc = cursor[kDocuments].shift(cursor[kOptions]); - - if (doc != null && transform && cursor[kTransform]) { - try { - return cursor[kTransform](doc); - } catch (error) { - try { - await cleanupCursor(cursor, { error, needsToEmitClosed: true }); - } catch (error) { - // `cleanupCursor` should never throw, squash and throw the original error - squashError(error); - } - throw error; - } - } - - return doc; + /** @internal Attempt to obtain more documents */ + private async fetchBatch(): Promise { + if (this.closed) { + return; } - if (cursor.isDead) { + if (this.isDead) { // if the cursor is dead, we clean it up // cleanupCursor should never throw, but if it does it indicates a bug in the driver // and we should surface the error - await cleanupCursor(cursor, {}); - if (!shift) return false; - return null; + await this.cleanup(); + return; + } + + if (this[kId] == null) { + await this.cursorInit(); + // If the cursor died or returned documents, return + if (this[kDocuments].length !== 0 || this.isDead) return; + // Otherwise, run a getMore } // otherwise need to call getMore - const batchSize = cursor[kOptions].batchSize || 1000; + const batchSize = this[kOptions].batchSize || 1000; try { - const response = await cursor.getMore(batchSize); + const response = await this.getMore(batchSize); + // CursorResponse is disabled in this PR + // however the special `emptyGetMore` can be returned from find cursors if (CursorResponse.is(response)) { - cursor[kId] = response.id; - cursor[kDocuments] = response; - } else if (response) { - const cursorId = - typeof response.cursor.id === 'number' - ? Long.fromNumber(response.cursor.id) - : typeof response.cursor.id === 'bigint' - ? Long.fromBigInt(response.cursor.id) - : response.cursor.id; - - cursor[kDocuments].pushMany(response.cursor.nextBatch); - cursor[kId] = cursorId; + this[kId] = response.id; + this[kDocuments] = response; + } else if (response?.cursor) { + const cursorId = getCursorId(response); + this[kDocuments].pushMany(response.cursor.nextBatch); + this[kId] = cursorId; } } catch (error) { try { - await cleanupCursor(cursor, { error, needsToEmitClosed: true }); + await this.cleanup(error); } catch (error) { // `cleanupCursor` should never throw, squash and throw the original error squashError(error); @@ -822,7 +751,7 @@ async function next( throw error; } - if (cursor.isDead) { + if (this.isDead) { // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted, // we intentionally clean up the cursor to release its session back into the pool before the cursor // is iterated. This prevents a cursor that is exhausted on the server from holding @@ -830,104 +759,94 @@ async function next( // // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver // and we should surface the error - await cleanupCursor(cursor, {}); - } - - if (cursor[kDocuments].length === 0 && blocking === false) { - if (!shift) return false; - return null; - } - } while (!cursor.isDead || cursor[kDocuments].length !== 0); - - if (!shift) return false; - return null; -} - -async function cleanupCursor( - cursor: AbstractCursor, - options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined -): Promise { - const cursorId = cursor[kId]; - const cursorNs = cursor[kNamespace]; - const server = cursor[kServer]; - const session = cursor[kSession]; - const error = options?.error; - - // Cursors only emit closed events once the client-side cursor has been exhausted fully or there - // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we - // cleanup the cursor but don't emit a `close` event. - const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; - - if (error) { - if (cursor.loadBalanced && error instanceof MongoNetworkError) { - return await completeCleanup(); + await this.cleanup(); } } - if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { - if (needsToEmitClosed) { - cursor[kClosed] = true; - cursor[kId] = Long.ZERO; - cursor.emit(AbstractCursor.CLOSE); - } - - if (session) { - if (session.owner === cursor) { + /** @internal */ + private async cleanup(error?: Error) { + this[kClosed] = true; + const session = this[kSession]; + try { + if ( + !this[kKilled] && + this[kId] && + !this[kId].isZero() && + this[kNamespace] && + this[kServer] && + !session.hasEnded + ) { + this[kKilled] = true; + await executeOperation( + this[kClient], + new KillCursorsOperation(this[kId], this[kNamespace], this[kServer], { session }) + ); + } + } catch (error) { + squashError(error); + } finally { + if (session?.owner === this) { await session.endSession({ error }); - return; } - - if (!session.inTransaction()) { + if (!session?.inTransaction()) { maybeClearPinnedConnection(session, { error }); } - } - return; + this.emitClose(); + } } - async function completeCleanup() { - if (session) { - if (session.owner === cursor) { - try { - await session.endSession({ error }); - } finally { - cursor.emit(AbstractCursor.CLOSE); - } - return; - } - - if (!session.inTransaction()) { - maybeClearPinnedConnection(session, { error }); + /** @internal */ + private hasEmittedClose = false; + /** @internal */ + private emitClose() { + try { + if (!this.hasEmittedClose && (this[kDocuments].length === 0 || this[kClosed])) { + // @ts-expect-error: CursorEvents is generic so Parameters may not be assignable to `[]`. Not sure how to require extenders do not add parameters. + this.emit('close'); } + } finally { + this.hasEmittedClose = true; } - - cursor.emit(AbstractCursor.CLOSE); - return; } - cursor[kKilled] = true; + /** @internal */ + private async transformDocument(document: NonNullable): Promise { + const transform = this[kTransform]; + if (transform == null) return document; - if (session.hasEnded) { - return await completeCleanup(); + try { + const transformedDocument = transform(document); + // eslint-disable-next-line no-restricted-syntax + if (transformedDocument === null) { + const TRANSFORM_TO_NULL_ERROR = + 'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.'; + throw new MongoAPIError(TRANSFORM_TO_NULL_ERROR); + } + return transformedDocument; + } catch (transformError) { + try { + await this.close(); + } catch (closeError) { + squashError(closeError); + } + throw transformError; + } } - try { - await executeOperation( - cursor[kClient], - new KillCursorsOperation(cursorId, cursorNs, server, { session }) - ); - } catch (error) { - squashError(error); - } finally { - await completeCleanup(); + /** @internal */ + protected throwIfInitialized() { + if (this[kInitialized]) throw new MongoCursorInUseError(); } } -/** @internal */ -export function assertUninitialized(cursor: AbstractCursor): void { - if (cursor[kInitialized]) { - throw new MongoCursorInUseError(); - } +/** A temporary helper to box up the many possible type issue of cursor ids */ +function getCursorId(response: Document) { + return typeof response.cursor.id === 'number' + ? Long.fromNumber(response.cursor.id) + : typeof response.cursor.id === 'bigint' + ? Long.fromBigInt(response.cursor.id) + : response.cursor.id; } class ReadableCursorStream extends Readable { @@ -960,8 +879,13 @@ class ReadableCursorStream extends Readable { } private _readNext() { + if (this._cursor[kId] === Long.ZERO) { + this.push(null); + return; + } + // eslint-disable-next-line github/no-then - next(this._cursor, { blocking: true, transform: true, shift: true }).then( + this._cursor.next().then( result => { if (result == null) { this.push(null); diff --git a/src/cursor/aggregation_cursor.ts b/src/cursor/aggregation_cursor.ts index cba77e9b52f..a36bee85582 100644 --- a/src/cursor/aggregation_cursor.ts +++ b/src/cursor/aggregation_cursor.ts @@ -8,7 +8,7 @@ import type { Sort } from '../sort'; import type { MongoDBNamespace } from '../utils'; import { mergeOptions } from '../utils'; import type { AbstractCursorOptions } from './abstract_cursor'; -import { AbstractCursor, assertUninitialized } from './abstract_cursor'; +import { AbstractCursor } from './abstract_cursor'; /** @public */ export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {} @@ -101,7 +101,7 @@ export class AggregationCursor extends AbstractCursor { addStage(stage: Document): this; addStage(stage: Document): AggregationCursor; addStage(stage: Document): AggregationCursor { - assertUninitialized(this); + this.throwIfInitialized(); this[kPipeline].push(stage); return this as unknown as AggregationCursor; } diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index bb21d49fbed..bac7675eacd 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -11,7 +11,7 @@ import type { Hint } from '../operations/operation'; import type { ClientSession } from '../sessions'; import { formatSort, type Sort, type SortDirection } from '../sort'; import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils'; -import { AbstractCursor, assertUninitialized } from './abstract_cursor'; +import { AbstractCursor } from './abstract_cursor'; /** @internal */ const kFilter = Symbol('filter'); @@ -163,7 +163,7 @@ export class FindCursor extends AbstractCursor { /** Set the cursor query */ filter(filter: Document): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kFilter] = filter; return this; } @@ -174,7 +174,7 @@ export class FindCursor extends AbstractCursor { * @param hint - If specified, then the query system will only consider plans using the hinted index. */ hint(hint: Hint): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].hint = hint; return this; } @@ -185,7 +185,7 @@ export class FindCursor extends AbstractCursor { * @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order. */ min(min: Document): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].min = min; return this; } @@ -196,7 +196,7 @@ export class FindCursor extends AbstractCursor { * @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order. */ max(max: Document): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].max = max; return this; } @@ -209,7 +209,7 @@ export class FindCursor extends AbstractCursor { * @param value - the returnKey value. */ returnKey(value: boolean): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].returnKey = value; return this; } @@ -220,7 +220,7 @@ export class FindCursor extends AbstractCursor { * @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find. */ showRecordId(value: boolean): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].showRecordId = value; return this; } @@ -232,7 +232,7 @@ export class FindCursor extends AbstractCursor { * @param value - The modifier value. */ addQueryModifier(name: string, value: string | boolean | number | Document): this { - assertUninitialized(this); + this.throwIfInitialized(); if (name[0] !== '$') { throw new MongoInvalidArgumentError(`${name} is not a valid query modifier`); } @@ -295,7 +295,7 @@ export class FindCursor extends AbstractCursor { * @param value - The comment attached to this query. */ comment(value: string): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].comment = value; return this; } @@ -306,7 +306,7 @@ export class FindCursor extends AbstractCursor { * @param value - Number of milliseconds to wait before aborting the tailed query. */ maxAwaitTimeMS(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (typeof value !== 'number') { throw new MongoInvalidArgumentError('Argument for maxAwaitTimeMS must be a number'); } @@ -321,7 +321,7 @@ export class FindCursor extends AbstractCursor { * @param value - Number of milliseconds to wait before aborting the query. */ override maxTimeMS(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (typeof value !== 'number') { throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number'); } @@ -371,7 +371,7 @@ export class FindCursor extends AbstractCursor { * ``` */ project(value: Document): FindCursor { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].projection = value; return this as unknown as FindCursor; } @@ -383,7 +383,7 @@ export class FindCursor extends AbstractCursor { * @param direction - The direction of the sorting (1 or -1). */ sort(sort: Sort | string, direction?: SortDirection): this { - assertUninitialized(this); + this.throwIfInitialized(); if (this[kBuiltOptions].tailable) { throw new MongoTailableCursorError('Tailable cursor does not support sorting'); } @@ -399,7 +399,7 @@ export class FindCursor extends AbstractCursor { * {@link https://www.mongodb.com/docs/manual/reference/command/find/#find-cmd-allowdiskuse | find command allowDiskUse documentation} */ allowDiskUse(allow = true): this { - assertUninitialized(this); + this.throwIfInitialized(); if (!this[kBuiltOptions].sort) { throw new MongoInvalidArgumentError('Option "allowDiskUse" requires a sort specification'); @@ -421,7 +421,7 @@ export class FindCursor extends AbstractCursor { * @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields). */ collation(value: CollationOptions): this { - assertUninitialized(this); + this.throwIfInitialized(); this[kBuiltOptions].collation = value; return this; } @@ -432,7 +432,7 @@ export class FindCursor extends AbstractCursor { * @param value - The limit for the cursor query. */ limit(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (this[kBuiltOptions].tailable) { throw new MongoTailableCursorError('Tailable cursor does not support limit'); } @@ -451,7 +451,7 @@ export class FindCursor extends AbstractCursor { * @param value - The skip for the cursor query. */ skip(value: number): this { - assertUninitialized(this); + this.throwIfInitialized(); if (this[kBuiltOptions].tailable) { throw new MongoTailableCursorError('Tailable cursor does not support skip'); } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 473b7879025..a6824432d48 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -7,7 +7,6 @@ import { PassThrough } from 'stream'; import { setTimeout } from 'timers'; import { - AbstractCursor, type ChangeStream, type ChangeStreamOptions, type Collection, @@ -33,9 +32,9 @@ import { import { delay, filterForCommands } from '../shared'; const initIteratorMode = async (cs: ChangeStream) => { - const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit'); const initEvent = once(cs.cursor, 'init'); - await cs.cursor[kInit](); + //@ts-expect-error: private method + await cs.cursor.cursorInit(); await initEvent; return; }; diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 24cb5c013fe..55c11591c51 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -4,7 +4,6 @@ import * as sinon from 'sinon'; import { setTimeout } from 'timers'; import { - AbstractCursor, type ChangeStream, type CommandFailedEvent, type CommandStartedEvent, @@ -18,7 +17,6 @@ import { Timestamp } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; -import { getSymbolFrom } from '../../tools/utils'; import { setupDatabase } from '../shared'; /** @@ -72,9 +70,9 @@ function triggerResumableError( } const initIteratorMode = async (cs: ChangeStream) => { - const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit'); const initEvent = once(cs.cursor, 'init'); - await cs.cursor[kInit](); + //@ts-expect-error: private method + await cs.cursor.cursorInit(); await initEvent; return; }; diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index ed7e2f5695c..43babee612f 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1870,7 +1870,7 @@ describe('Cursor', function () { const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); await client.close(); - expect(cursor).to.have.property('killed', true); + expect(cursor).to.have.property('closed', true); const error = await rejectedEarlyBecauseClientClosed; expect(error).to.be.instanceOf(MongoExpiredSessionError); diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 435bfc4518e..65d93426767 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -153,6 +153,26 @@ describe('class AbstractCursor', function () { await client.close(); }); + it('wraps transform in result checking for each map call', async () => { + const control = { functionThatShouldReturnNull: 0 }; + const makeCursor = () => { + const cursor = collection.find(); + cursor + .map(doc => (control.functionThatShouldReturnNull === 0 ? null : doc)) + .map(doc => (control.functionThatShouldReturnNull === 1 ? null : doc)) + .map(doc => (control.functionThatShouldReturnNull === 2 ? null : doc)); + return cursor; + }; + + for (const testFn of [0, 1, 2]) { + control.functionThatShouldReturnNull = testFn; + const error = await makeCursor() + .toArray() + .catch(error => error); + expect(error).to.be.instanceOf(MongoAPIError); + } + }); + context('toArray() with custom transforms', function () { for (const value of falseyValues) { it(`supports mapping to falsey value '${inspect(value)}'`, async function () { diff --git a/test/integration/node-specific/cursor_async_iterator.test.js b/test/integration/node-specific/cursor_async_iterator.test.js index f0c5254914a..78330ff09c2 100644 --- a/test/integration/node-specific/cursor_async_iterator.test.js +++ b/test/integration/node-specific/cursor_async_iterator.test.js @@ -72,6 +72,18 @@ describe('Cursor Async Iterator Tests', function () { } }); + it('should not iterate if closed immediately', async function () { + const cursor = collection.find(); + await cursor.close(); + + let count = 0; + // eslint-disable-next-line no-unused-vars + for await (const _ of cursor) count++; + + expect(count).to.equal(0); + expect(cursor.closed).to.be.true; + }); + it('should properly stop when cursor is closed', async function () { const cursor = collection.find(); diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 4e46d1772de..0619a5c1110 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -6,6 +6,7 @@ import { AssertionError, expect } from 'chai'; import { AbstractCursor, + type ChangeStream, Collection, CommandStartedEvent, Db, @@ -240,9 +241,9 @@ operations.set('createChangeStream', async ({ entities, operation }) => { } const { pipeline, ...args } = operation.arguments!; - const changeStream = watchable.watch(pipeline, args); - const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit'); - await changeStream.cursor[kInit](); + const changeStream: ChangeStream = watchable.watch(pipeline, args); + //@ts-expect-error: private method + await changeStream.cursor.cursorInit(); return changeStream; });