diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 6c166afd61e..18afde92e72 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse { get deletedCount() { return this.get('nDeleted', BSONType.int, true); } + + get writeConcernError() { + return this.get('writeConcernError', BSONType.object, false); + } } diff --git a/src/error.ts b/src/error.ts index 4aed6b93146..4e3679bd9a7 100644 --- a/src/error.ts +++ b/src/error.ts @@ -643,6 +643,33 @@ export class MongoClientBulkWriteCursorError extends MongoRuntimeError { } } +/** + * An error indicating that an error occurred when generating a bulk write update. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteUpdateError extends MongoRuntimeError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message: string) { + super(message); + } + + override get name(): string { + return 'MongoClientBulkWriteUpdateError'; + } +} + /** * An error indicating that an error occurred on the client when executing a client bulk write. * diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 6b809a08c58..bd6aee836bb 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,5 +1,6 @@ import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; +import { MongoClientBulkWriteUpdateError } from '../../error'; import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import { DEFAULT_PK_FACTORY } from '../../utils'; @@ -343,6 +344,22 @@ export const buildUpdateManyOperation = ( return createUpdateOperation(model, index, true); }; +/** + * Validate the update document. + * @param update - The update document. + */ +function validateUpdate(update: Document) { + const keys = Object.keys(update); + if (keys.length === 0) { + throw new MongoClientBulkWriteUpdateError('Client bulk write update models may not be empty.'); + } + if (!keys[0].startsWith('$')) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write update models must only contain atomic modifiers (start with $).' + ); + } +} + /** * Creates a delete operation based on the parameters. */ @@ -351,6 +368,22 @@ function createUpdateOperation( index: number, multi: boolean ): ClientUpdateOperation { + // Update documents provided in UpdateOne and UpdateMany write models are + // required only to contain atomic modifiers (i.e. keys that start with "$"). + // Drivers MUST throw an error if an update document is empty or if the + // document's first key does not start with "$". + if (Array.isArray(model.update)) { + if (model.update.length === 0) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write update model pipelines may not be empty.' + ); + } + for (const update of model.update) { + validateUpdate(update); + } + } else { + validateUpdate(model.update); + } const document: ClientUpdateOperation = { update: index, multi: multi, @@ -393,6 +426,16 @@ export const buildReplaceOneOperation = ( model: ClientReplaceOneModel, index: number ): ClientReplaceOneOperation => { + const keys = Object.keys(model.replacement); + if (keys.length === 0) { + throw new MongoClientBulkWriteUpdateError('Client bulk write replace models may not be empty.'); + } + if (keys[0].startsWith('$')) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write replace models must not contain atomic modifiers (start with $).' + ); + } + const document: ClientReplaceOneOperation = { update: index, multi: false, diff --git a/src/operations/client_bulk_write/common.ts b/src/operations/client_bulk_write/common.ts index c41d971f020..29d3e5e04f8 100644 --- a/src/operations/client_bulk_write/common.ts +++ b/src/operations/client_bulk_write/common.ts @@ -1,4 +1,5 @@ import { type Document } from '../../bson'; +import { type ErrorDescription, type MongoRuntimeError, MongoServerError } from '../../error'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import type { CollationOptions, CommandOperationOptions } from '../../operations/command'; import type { Hint } from '../../operations/operation'; @@ -181,6 +182,55 @@ export interface ClientBulkWriteResult { deleteResults?: Map; } +export interface ClientBulkWriteError { + code: number; + message: string; +} + +/** + * An error indicating that an error occurred when executing the bulk write. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteError extends MongoServerError { + /** + * A top-level error that occurred when attempting to communicate with the server or execute + * the bulk write. This value may not be populated if the exception was thrown due to errors + * occurring on individual writes. + */ + error?: MongoRuntimeError; + /** + * Write concern errors that occurred while executing the bulk write. This list may have + * multiple items if more than one server command was required to execute the bulk write. + */ + writeConcernErrors: Document[]; + /** + * Errors that occurred during the execution of individual write operations. This map will + * contain at most one entry if the bulk write was ordered. + */ + writeErrors: Map; + /** + * The results of any successful operations that were performed before the error was + * encountered. + */ + partialResult?: ClientBulkWriteResult; + + /** + * Initialize the client bulk write error. + * @param message - The error message. + */ + constructor(message: ErrorDescription) { + super(message); + this.writeConcernErrors = []; + this.writeErrors = new Map(); + } + + override get name(): string { + return 'MongoClientBulkWriteError'; + } +} + /** @public */ export interface ClientInsertOneResult { /** diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 1c02a42add6..0925f7661bb 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,7 +1,7 @@ import { type Document } from 'bson'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; -import { MongoClientBulkWriteExecutionError } from '../../error'; +import { MongoClientBulkWriteExecutionError, MongoWriteConcernError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; @@ -10,7 +10,8 @@ import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './co import { type AnyClientBulkWriteModel, type ClientBulkWriteOptions, - type ClientBulkWriteResult + type ClientBulkWriteResult, + MongoClientBulkWriteError } from './common'; import { ClientBulkWriteResultsMerger } from './results_merger'; @@ -34,9 +35,13 @@ export class ClientBulkWriteExecutor { operations: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ) { + if (operations.length === 0) { + throw new MongoClientBulkWriteExecutionError('No client bulk write models were provided.'); + } + this.client = client; this.operations = operations; - this.options = { ...options }; + this.options = { ordered: true, ...options }; // If no write concern was provided, we inherit one from the client. if (!this.options.writeConcern) { @@ -96,12 +101,46 @@ async function executeAcknowledged( let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); - const docs = await cursor.toArray(); + let docs = []; + let writeConcernErrorResult; + try { + docs = await cursor.toArray(); + } catch (error) { + // Write concern errors are recorded in the writeConcernErrors field on MongoClientBulkWriteError. + // When a write concern error is encountered, it should not terminate execution of the bulk write + // for either ordered or unordered bulk writes. However, drivers MUST throw an exception at the end + // of execution if any write concern errors were observed. + if (error instanceof MongoWriteConcernError) { + const result = error.result; + writeConcernErrorResult = { + insertedCount: result.nInserted, + upsertedCount: result.nUpserted, + matchedCount: result.nMatched, + modifiedCount: result.nModified, + deletedCount: result.nDeleted, + writeConcernError: result.writeConcernError + }; + docs = result.cursor.firstBatch; + } else { + throw error; + } + } + // Note if we have a write concern error there will be no cursor response present. + const response = writeConcernErrorResult ?? cursor.response; const operations = command.ops.documents; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + resultsMerger.merge(currentBatchOffset, operations, response, docs); // Set the new batch index so we can back back to the index in the original models. currentBatchOffset += operations.length; } + + if (resultsMerger.writeConcernErrors.length > 0) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client bulk write encountered write concern errors during execution.' + }); + error.writeConcernErrors = resultsMerger.writeConcernErrors; + error.partialResult = resultsMerger.result; + throw error; + } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index ca5f3f16048..34bf615e89f 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -5,7 +5,8 @@ import { type ClientBulkWriteResult, type ClientDeleteResult, type ClientInsertOneResult, - type ClientUpdateResult + type ClientUpdateResult, + MongoClientBulkWriteError } from './common'; /** @@ -15,6 +16,7 @@ import { export class ClientBulkWriteResultsMerger { result: ClientBulkWriteResult; options: ClientBulkWriteOptions; + writeConcernErrors: Document[]; /** * Instantiate the merger. @@ -22,6 +24,7 @@ export class ClientBulkWriteResultsMerger { */ constructor(options: ClientBulkWriteOptions) { this.options = options; + this.writeConcernErrors = []; this.result = { insertedCount: 0, upsertedCount: 0, @@ -50,7 +53,7 @@ export class ClientBulkWriteResultsMerger { merge( currentBatchOffset: number, operations: Document[], - response: ClientBulkWriteCursorResponse, + response: ClientBulkWriteCursorResponse | Document, documents: Document[] ): ClientBulkWriteResult { // Update the counts from the cursor response. @@ -60,42 +63,77 @@ export class ClientBulkWriteResultsMerger { this.result.modifiedCount += response.modifiedCount; this.result.deletedCount += response.deletedCount; - if (this.options.verboseResults) { - // Iterate all the documents in the cursor and update the result. - for (const document of documents) { - // Only add to maps if ok: 1 - if (document.ok === 1) { - // Get the corresponding operation from the command. - const operation = operations[document.idx]; - // Handle insert results. - if ('insert' in operation) { - this.result.insertResults?.set(document.idx + currentBatchOffset, { - insertedId: operation.document._id - }); - } - // Handle update results. - if ('update' in operation) { - const result: ClientUpdateResult = { - matchedCount: document.n, - modifiedCount: document.nModified ?? 0, - // Check if the bulk did actually upsert. - didUpsert: document.upserted != null - }; - if (document.upserted) { - result.upsertedId = document.upserted._id; - } - this.result.updateResults?.set(document.idx + currentBatchOffset, result); - } - // Handle delete results. - if ('delete' in operation) { - this.result.deleteResults?.set(document.idx + currentBatchOffset, { - deletedCount: document.n - }); + if (response.writeConcernError) { + this.writeConcernErrors.push({ + code: response.writeConcernError.code, + message: response.writeConcernError.errmsg + }); + } + // Iterate all the documents in the cursor and update the result. + const writeErrors = new Map(); + for (const document of documents) { + // Only add to maps if ok: 1 + if (document.ok === 1 && this.options.verboseResults) { + // Get the corresponding operation from the command. + const operation = operations[document.idx]; + // Handle insert results. + if ('insert' in operation) { + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); + } + // Handle update results. + if ('update' in operation) { + const result: ClientUpdateResult = { + matchedCount: document.n, + modifiedCount: document.nModified ?? 0, + // Check if the bulk did actually upsert. + didUpsert: document.upserted != null + }; + if (document.upserted) { + result.upsertedId = document.upserted._id; } + this.result.updateResults?.set(document.idx + currentBatchOffset, result); + } + // Handle delete results. + if ('delete' in operation) { + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); + } + } else { + // If an individual write error is encountered during an ordered bulk write, drivers MUST + // record the error in writeErrors and immediately throw the exception. Otherwise, drivers + // MUST continue to iterate the results cursor and execute any further bulkWrite batches. + if (this.options.ordered) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client ordered bulk write encountered a write error.' + }); + error.writeErrors.set(document.idx + currentBatchOffset, { + code: document.code, + message: document.errmsg + }); + error.partialResult = this.result; + throw error; + } else { + writeErrors.set(document.idx + currentBatchOffset, { + code: document.code, + message: document.errmsg + }); } } } + // Handle the unordered bulk write errors here. + if (writeErrors.size > 0) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client unordered bulk write encountered write errors.' + }); + error.writeErrors = writeErrors; + error.partialResult = this.result; + throw error; + } + return this.result; } } diff --git a/test/integration/crud/crud.spec.test.ts b/test/integration/crud/crud.spec.test.ts index a8a0d2987fe..5439c775236 100644 --- a/test/integration/crud/crud.spec.test.ts +++ b/test/integration/crud/crud.spec.test.ts @@ -3,22 +3,6 @@ import * as path from 'path'; import { loadSpecTests } from '../../spec/index'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; -const clientBulkWriteTests = new RegExp( - [ - 'client bulkWrite operations support errorResponse assertions', - 'an individual operation fails during an ordered bulkWrite', - 'an individual operation fails during an unordered bulkWrite', - 'detailed results are omitted from error when verboseResults is false', - 'a top-level failure occurs during a bulkWrite', - 'a bulk write with only errors does not report a partial result', - 'an empty list of write models is a client-side error', - 'a write concern error occurs during a bulkWrite', - 'client bulkWrite replaceOne prohibits atomic modifiers', - 'client bulkWrite updateOne requires atomic modifiers', - 'client bulkWrite updateMany requires atomic modifiers' - ].join('|') -); - const unacknowledgedHintTests = [ 'Unacknowledged updateOne with hint document on 4.2+ server', 'Unacknowledged updateOne with hint string on 4.2+ server', @@ -59,13 +43,11 @@ describe('CRUD unified', function () { runUnifiedSuite( loadSpecTests(path.join('crud', 'unified')), ({ description }, { isLoadBalanced }) => { - return description.match(clientBulkWriteTests) - ? 'TODO(NODE-6257): implement client level bulk write' - : unacknowledgedHintTests.includes(description) - ? `TODO(NODE-3541)` - : isLoadBalanced && loadBalancedCollationTests.includes(description) - ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` - : false; + return unacknowledgedHintTests.includes(description) + ? `TODO(NODE-3541)` + : isLoadBalanced && loadBalancedCollationTests.includes(description) + ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` + : false; } ); }); diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index f92004c7760..cb1266d006d 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -240,6 +240,7 @@ export function resultCheck( } if (typeof actual !== 'object') { + console.log(expected, actual); expect.fail('Expected actual value to be an object'); } @@ -793,7 +794,11 @@ export function expectErrorCheck( } if (expected.expectResult != null) { - resultCheck(error, expected.expectResult as any, entities); + if ('partialResult' in error) { + resultCheck(error.partialResult, expected.expectResult as any, entities); + } else { + resultCheck(error, expected.expectResult as any, entities); + } } if (expected.errorResponse != null) {