Skip to content

Commit

Permalink
Merge branch 'main' into NODE-4686/CLAM-logging-with-serverConnectionid
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Jan 18, 2024
2 parents e9d4f8d + 86e2659 commit 517a826
Show file tree
Hide file tree
Showing 64 changed files with 61,818 additions and 197 deletions.
11 changes: 5 additions & 6 deletions .github/workflows/release-nightly.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
on:
# TODO: We can reenable cron when needed
# schedule:
# # Timezone is UTC
# # https://crontab.guru/#0_0_*_*_*
# # At 00:00 every day.
# - cron: '0 0 * * *'
schedule:
# Timezone is UTC
# https://crontab.guru/#0_0_*_*_*
# At 00:00 every day.
- cron: '0 0 * * *'

# Allows us to manually trigger a nightly
# Since npm prevents duplicate releases we can run this at any time
Expand Down
29 changes: 12 additions & 17 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,23 +579,18 @@ function executeCommands(
}

try {
if (isInsertBatch(batch)) {
executeOperation(
bulkOperation.s.collection.client,
new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isUpdateBatch(batch)) {
executeOperation(
bulkOperation.s.collection.client,
new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isDeleteBatch(batch)) {
executeOperation(
bulkOperation.s.collection.client,
new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
const operation = isInsertBatch(batch)
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: isUpdateBatch(batch)
? new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: isDeleteBatch(batch)
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: null;

if (operation != null) {
executeOperation(bulkOperation.s.collection.client, operation).then(
result => resultHandler(undefined, result),
error => resultHandler(error)
);
}
} catch (err) {
Expand Down
41 changes: 14 additions & 27 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
import { supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand All @@ -51,36 +51,23 @@ export interface ExecutionResult {
* @internal
*
* @remarks
* This method reduces large amounts of duplication in the entire codebase by providing
* a single point for determining whether callbacks or promises should be used. Additionally
* it allows for a single point of entry to provide features such as implicit sessions, which
* Allows for a single point of entry to provide features such as implicit sessions, which
* are required by the Driver Sessions specification in the event that a ClientSession is
* not provided
* not provided.
*
* @param topology - The topology to execute this operation on
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected
* - Creates a session if none is provided and cleans up the session it creates
* - Selects a server based on readPreference or various factors
* - Retries an operation if it fails for certain errors, see {@link retryOperation}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
*
* @param client - The MongoClient to execute this operation with
* @param operation - The operation to execute
* @param callback - The command result callback
*/
export function executeOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult>;
export function executeOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback: Callback<TResult>): void;
export function executeOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
export function executeOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
return maybeCallback(() => executeOperationAsync(client, operation), callback);
}

async function executeOperationAsync<
export async function executeOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(client: MongoClient, operation: T): Promise<TResult> {
Expand Down
65 changes: 33 additions & 32 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,45 +754,46 @@ function endTransaction(
command.recoveryToken = session.transaction.recoveryToken;
}

const handleFirstCommandAttempt = (error?: Error) => {
if (command.abortTransaction) {
// always unpin on abort regardless of command outcome
session.unpin();
}

if (error instanceof MongoError && isRetryableWriteError(error)) {
// SPEC-1185: apply majority write concern when retrying commitTransaction
if (command.commitTransaction) {
// per txns spec, must unpin session in this case
session.unpin({ force: true });

command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
w: 'majority'
});
}

executeOperation(
session.client,
new RunAdminCommandOperation(command, {
session,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
})
).then(() => commandHandler(), commandHandler);
return;
}

commandHandler(error);
};

// send the command
executeOperation(
session.client,
new RunAdminCommandOperation(command, {
session,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
}),
error => {
if (command.abortTransaction) {
// always unpin on abort regardless of command outcome
session.unpin();
}

if (error instanceof MongoError && isRetryableWriteError(error)) {
// SPEC-1185: apply majority write concern when retrying commitTransaction
if (command.commitTransaction) {
// per txns spec, must unpin session in this case
session.unpin({ force: true });

command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
w: 'majority'
});
}

return executeOperation(
session.client,
new RunAdminCommandOperation(command, {
session,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
}),
commandHandler
);
}

commandHandler(error);
}
);
})
).then(() => handleFirstCommandAttempt(), handleFirstCommandAttempt);
}

/** @public */
Expand Down
24 changes: 0 additions & 24 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,30 +330,6 @@ export function* makeCounter(seed = 0): Generator<number> {
}
}

/**
* Helper for handling legacy callback support.
*/
export function maybeCallback<T>(promiseFn: () => Promise<T>, callback: null): Promise<T>;
export function maybeCallback<T>(
promiseFn: () => Promise<T>,
callback?: Callback<T>
): Promise<T> | void;
export function maybeCallback<T>(
promiseFn: () => Promise<T>,
callback?: Callback<T> | null
): Promise<T> | void {
const promise = promiseFn();
if (callback == null) {
return promise;
}

promise.then(
result => callback(undefined, result),
error => callback(error)
);
return;
}

/**
* Synchronously Generate a UUIDv4
* @internal
Expand Down
Empty file.
Loading

0 comments on commit 517a826

Please sign in to comment.