Skip to content

Commit

Permalink
fix(ChangeStream): whitelist change stream resumable errors
Browse files Browse the repository at this point in the history
  - Changes which errors are considered resumable on change streams,
    adding support for the new ResumableChangeStreamError label.
  - Removes ElectionInProgress (216) from ResumableChangeStreamError.
  - Updates ChangeStream prose tests which described startAfter 
    behavior for unsupported server versions.
  - Fixes use of startAfter/resumeAfter when resuming from an
    invalidate event. Implement prose tests #17 and #18.

NODE-2478
NODE-2522
  • Loading branch information
emadum authored Apr 10, 2020
1 parent c5d60fc commit 8a9c108
Show file tree
Hide file tree
Showing 16 changed files with 6,088 additions and 714 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ language: node_js
branches:
only:
- master
- next
- 3.6

before_install:
# we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells
Expand Down
60 changes: 34 additions & 26 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ class ChangeStreamCursor extends Cursor {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);

if (this.resumeToken) {
result.resumeAfter = this.resumeToken;
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand All @@ -297,6 +299,26 @@ class ChangeStreamCursor extends Cursor {
return result;
}

cacheResumeToken(resumeToken) {
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
this.resumeToken = this.cursorState.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(batchName, response) {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor[batchName].length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
Expand All @@ -315,15 +337,9 @@ class ChangeStreamCursor extends Cursor {
this.startAtOperationTime = response.operationTime;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.firstBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('firstBatch', response);

this.emit('init', result);
this.emit('response');
callback(err, result);
});
Expand All @@ -336,15 +352,9 @@ class ChangeStreamCursor extends Cursor {
return;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.nextBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
callback(err, response);
});
Expand All @@ -367,6 +377,7 @@ function createChangeStreamCursor(self, options) {

const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand Down Expand Up @@ -465,9 +476,10 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
const cursor = changeStream.cursor;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// If the cursor is null, then it should not process a change.
if (cursor == null) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
Expand All @@ -479,12 +491,12 @@ function processNewChange(args) {
: changeStream.promiseLibrary.reject(error);
}

const cursor = changeStream.cursor;
const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
Expand Down Expand Up @@ -550,11 +562,7 @@ function processNewChange(args) {
}

// cache the resume token
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
} else {
cursor.resumeToken = change._id;
}
cursor.cacheResumeToken(change._id);

// wipe the startAtOperationTime if there was one so that there won't be a conflict
// between resumeToken and startAtOperationTime if we need to reconnect the cursor
Expand Down
24 changes: 13 additions & 11 deletions lib/core/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,24 @@ function retrieveEJSON() {
* @param {(Topology|Server)} topologyOrServer
*/
function maxWireVersion(topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}
if (topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}

if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
}
}
}

if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
}
}

return null;
return 0;
}

/*
Expand Down
45 changes: 29 additions & 16 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,34 @@ const GET_MORE_NON_RESUMABLE_CODES = new Set([
11601 // Interrupted
]);

// From spec@https://github.com/mongodb/specifications/blob/7a2e93d85935ee4b1046a8d2ad3514c657dc74fa/source/change-streams/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the error label
// NonRetryableChangeStreamError and those containing the following error codes:
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.
// From spec@https://github.com/mongodb/specifications/blob/f93d78191f3db2898a59013a7ed5650352ef6da8/source/change-streams/change-streams.rst#resumable-error
const GET_MORE_RESUMABLE_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
262, // ExceededTimeLimit
9001, // SocketException
10107, // NotMaster
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
63, // StaleShardVersion
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133 // FailedToSatisfyReadPreference
]);

function isGetMoreError(error) {
if (error[mongoErrorContextSymbol]) {
return error[mongoErrorContextSymbol].isGetMore;
}
}

function isResumableError(error) {
function isResumableError(error, wireVersion) {
if (!isGetMoreError(error)) {
return false;
}
Expand All @@ -36,10 +45,14 @@ function isResumableError(error) {
return true;
}

return !(
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.hasErrorLabel('NonRetryableChangeStreamError')
if (wireVersion >= 9) {
return error.hasErrorLabel('ResumableChangeStreamError');
}

return (
GET_MORE_RESUMABLE_CODES.has(error.code) &&
!error.hasErrorLabel('NonResumableChangeStreamError')
);
}

module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
module.exports = { GET_MORE_NON_RESUMABLE_CODES, GET_MORE_RESUMABLE_CODES, isResumableError };
Loading

0 comments on commit 8a9c108

Please sign in to comment.