Skip to content

Commit

Permalink
feat(NODE-6338): implement client bulk write error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 26, 2024
1 parent 9f63397 commit dff7736
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 62 deletions.
4 changes: 4 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse {
get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}

get writeConcernError() {
return this.get('writeConcernError', BSONType.object, false);
}
}
27 changes: 27 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,33 @@ export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
}
}

/**
* An error indicating that an error occurred when generating a bulk write update.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteUpdateError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteUpdateError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
Expand Down
43 changes: 43 additions & 0 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoClientBulkWriteUpdateError } from '../../error';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { DEFAULT_PK_FACTORY } from '../../utils';
Expand Down Expand Up @@ -343,6 +344,22 @@ export const buildUpdateManyOperation = (
return createUpdateOperation(model, index, true);
};

/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
const keys = Object.keys(update);
if (keys.length === 0) {
throw new MongoClientBulkWriteUpdateError('Client bulk write update models may not be empty.');
}
if (!keys[0].startsWith('$')) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write update models must only contain atomic modifiers (start with $).'
);
}
}

/**
* Creates a delete operation based on the parameters.
*/
Expand All @@ -351,6 +368,22 @@ function createUpdateOperation(
index: number,
multi: boolean
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
if (Array.isArray(model.update)) {
if (model.update.length === 0) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write update model pipelines may not be empty.'
);
}
for (const update of model.update) {
validateUpdate(update);
}
} else {
validateUpdate(model.update);
}
const document: ClientUpdateOperation = {
update: index,
multi: multi,
Expand Down Expand Up @@ -393,6 +426,16 @@ export const buildReplaceOneOperation = (
model: ClientReplaceOneModel,
index: number
): ClientReplaceOneOperation => {
const keys = Object.keys(model.replacement);
if (keys.length === 0) {
throw new MongoClientBulkWriteUpdateError('Client bulk write replace models may not be empty.');
}
if (keys[0].startsWith('$')) {
throw new MongoClientBulkWriteUpdateError(
'Client bulk write replace models must not contain atomic modifiers (start with $).'
);
}

const document: ClientReplaceOneOperation = {
update: index,
multi: false,
Expand Down
50 changes: 50 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type Document } from '../../bson';
import { type ErrorDescription, type MongoRuntimeError, MongoServerError } from '../../error';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../../operations/command';
import type { Hint } from '../../operations/operation';
Expand Down Expand Up @@ -181,6 +182,55 @@ export interface ClientBulkWriteResult {
deleteResults?: Map<number, ClientDeleteResult>;
}

export interface ClientBulkWriteError {
code: number;
message: string;
}

/**
* An error indicating that an error occurred when executing the bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteError extends MongoServerError {
/**
* A top-level error that occurred when attempting to communicate with the server or execute
* the bulk write. This value may not be populated if the exception was thrown due to errors
* occurring on individual writes.
*/
error?: MongoRuntimeError;
/**
* Write concern errors that occurred while executing the bulk write. This list may have
* multiple items if more than one server command was required to execute the bulk write.
*/
writeConcernErrors: Document[];
/**
* Errors that occurred during the execution of individual write operations. This map will
* contain at most one entry if the bulk write was ordered.
*/
writeErrors: Map<number, ClientBulkWriteError>;
/**
* The results of any successful operations that were performed before the error was
* encountered.
*/
partialResult?: ClientBulkWriteResult;

/**
* Initialize the client bulk write error.
* @param message - The error message.
*/
constructor(message: ErrorDescription) {
super(message);
this.writeConcernErrors = [];
this.writeErrors = new Map();
}

override get name(): string {
return 'MongoClientBulkWriteError';
}
}

/** @public */
export interface ClientInsertOneResult {
/**
Expand Down
49 changes: 44 additions & 5 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { MongoClientBulkWriteExecutionError } from '../../error';
import { MongoClientBulkWriteExecutionError, MongoWriteConcernError } from '../../error';
import { type MongoClient } from '../../mongo_client';
import { WriteConcern } from '../../write_concern';
import { executeOperation } from '../execute_operation';
Expand All @@ -10,7 +10,8 @@ import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './co
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
type ClientBulkWriteResult,
MongoClientBulkWriteError
} from './common';
import { ClientBulkWriteResultsMerger } from './results_merger';

Expand All @@ -34,9 +35,13 @@ export class ClientBulkWriteExecutor {
operations: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
) {
if (operations.length === 0) {
throw new MongoClientBulkWriteExecutionError('No client bulk write models were provided.');
}

this.client = client;
this.operations = operations;
this.options = { ...options };
this.options = { ordered: true, ...options };

// If no write concern was provided, we inherit one from the client.
if (!this.options.writeConcern) {
Expand Down Expand Up @@ -96,12 +101,46 @@ async function executeAcknowledged(
let currentBatchOffset = 0;
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(client, command, options);
const docs = await cursor.toArray();
let docs = [];
let writeConcernErrorResult;
try {
docs = await cursor.toArray();
} catch (error) {
// Write concern errors are recorded in the writeConcernErrors field on MongoClientBulkWriteError.
// When a write concern error is encountered, it should not terminate execution of the bulk write
// for either ordered or unordered bulk writes. However, drivers MUST throw an exception at the end
// of execution if any write concern errors were observed.
if (error instanceof MongoWriteConcernError) {
const result = error.result;
writeConcernErrorResult = {
insertedCount: result.nInserted,
upsertedCount: result.nUpserted,
matchedCount: result.nMatched,
modifiedCount: result.nModified,
deletedCount: result.nDeleted,
writeConcernError: result.writeConcernError
};
docs = result.cursor.firstBatch;
} else {
throw error;
}
}
// Note if we have a write concern error there will be no cursor response present.
const response = writeConcernErrorResult ?? cursor.response;
const operations = command.ops.documents;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
resultsMerger.merge(currentBatchOffset, operations, response, docs);
// Set the new batch index so we can back back to the index in the original models.
currentBatchOffset += operations.length;
}

if (resultsMerger.writeConcernErrors.length > 0) {
const error = new MongoClientBulkWriteError({
message: 'Mongo client bulk write encountered write concern errors during execution.'
});
error.writeConcernErrors = resultsMerger.writeConcernErrors;
error.partialResult = resultsMerger.result;
throw error;
}
return resultsMerger.result;
}

Expand Down
Loading

0 comments on commit dff7736

Please sign in to comment.