Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4686): Add log messages to CLAM #3955

Merged
merged 27 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6d32c32
rebased changes
aditi-khare-mongoDB Dec 20, 2023
5f5d77e
All tests passing, need to add in prose tests
aditi-khare-mongoDB Dec 20, 2023
209ba27
Added prose tests
aditi-khare-mongoDB Dec 20, 2023
3bee17a
Added forgotten edge case
aditi-khare-mongoDB Dec 20, 2023
fcfc98b
removed stray only
aditi-khare-mongoDB Dec 21, 2023
422c6a9
workaround for messy messy rebase
aditi-khare-mongoDB Dec 21, 2023
c360ec2
refixed connection.ts, tests passing again
aditi-khare-mongoDB Dec 21, 2023
054094f
PR requested changes
aditi-khare-mongoDB Jan 4, 2024
0d8d051
Error fix
aditi-khare-mongoDB Jan 4, 2024
383f890
use hello not serverConnectionId to ensure support for versions below…
aditi-khare-mongoDB Jan 4, 2024
7033dc8
test versioning fix and package.lock fix
aditi-khare-mongoDB Jan 5, 2024
1d70579
Merge branch 'main' into NODE-4686/CLAM-logging-with-serverConnectionid
durran Jan 8, 2024
0ae8792
test fix
aditi-khare-mongoDB Jan 9, 2024
e92849d
versioning fix
aditi-khare-mongoDB Jan 9, 2024
9776ec9
PR requested changes, all tests passing
aditi-khare-mongoDB Jan 11, 2024
3332f94
extraneous .only removal
aditi-khare-mongoDB Jan 11, 2024
cacaed2
Update src/mongo_logger.ts
aditi-khare-mongoDB Jan 11, 2024
fa223cf
lint fix
aditi-khare-mongoDB Jan 11, 2024
299e8a1
neal's pr requested changes
aditi-khare-mongoDB Jan 16, 2024
1181991
pr again
aditi-khare-mongoDB Jan 16, 2024
8f6c860
pr requested changes again
aditi-khare-mongoDB Jan 16, 2024
dc93151
pr requested changes 4
aditi-khare-mongoDB Jan 16, 2024
cbb5df6
bailey's pr requested changes
aditi-khare-mongoDB Jan 17, 2024
9ace7a0
reauth support for connection.established
aditi-khare-mongoDB Jan 17, 2024
e9d4f8d
lint fix
aditi-khare-mongoDB Jan 17, 2024
7892601
added getter and using authContext?.authenticating directly
aditi-khare-mongoDB Jan 18, 2024
3c18428
Merge branch 'main' into NODE-4686/CLAM-logging-with-serverConnectionid
durran Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/bson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export {
deserialize,
Document,
Double,
EJSON,
EJSONOptions,
Int32,
Long,
MaxKey,
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ async function performInitialHandshake(
throw error;
}
}

// Connection establishment is socket creation (tcp handshake, tls handshake, MongoDB handshake (saslStart, saslContinue))
// Once connection is established, command logging can log events (if enabled)
conn.established = true;
}

export interface HandshakeDocument extends Document {
Expand Down
53 changes: 46 additions & 7 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
MongoWriteConcernError
} from '../error';
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { ReadPreferenceLike } from '../read_preference';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
Expand Down Expand Up @@ -114,6 +115,8 @@ export interface ConnectionOptions
socketTimeoutMS?: number;
cancellationToken?: CancellationToken;
metadata: ClientMetadata;
/** @internal */
mongoLogger?: MongoLogger | undefined;
}

/** @internal */
Expand Down Expand Up @@ -165,6 +168,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
public delayedTimeoutId: NodeJS.Timeout | null = null;
public generation: number;
public readonly description: Readonly<StreamDescription>;
/**
* @public
* Represents if the connection has been established:
* - TCP handshake
* - TLS negotiated
* - mongodb handshake (saslStart, saslContinue), includes authentication
* Once connection is established, command logging can log events (if enabled)
*/
public established: boolean;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior with command logging when we re-authenticate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reauthentication will send commands we don't want to log I think. We may either need to

  • add logic that more specifically determines whether or not a command should be logged
  • set established to false when reauthenticating and then set it back to true after a successful reauthentication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, pinged you on slack for more info on reauth

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't notice this earlier ( I didn't actually check ) but instead of modifying established when we reauthenticate, could we just check the context.reauthenticating field?

one idea is a getter on the connection class (or we could just inline it):

get shouldEmitAndLog(): boolean {
  return this.monitorCommands ||
      (this.established && !this.context.reauthenticating && this.mongoLogger?.willLog(SeverityLevel.DEBUG, MongoLoggableComponent.COMMAND))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it more accurate to edit established as well, though? Since during authentication, the connection is not established.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood established to mean that the connection had been established, not that it was "ready"/currently established. What I would like to avoid is duplicate data that might get out of sync. If established=false can also represent a connection reauthenticating, then it's possible for us to end up in a scenario context.reauthenticating=true with established=false (or vice versa) - which is a poorly defined state for the driver.

One way to avoid this is to define established as whether or not the connection was initially established and rely on context.reauthenticating (my suggestion).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good. I've removed the extra auth changes to conn.established and am using this.authContext?.authenticating in the new getter shouldEmitAndLogCommand instead.


private lastUseTime: number;
private socketTimeoutMS: number;
Expand All @@ -174,6 +186,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;
private clusterTime: Document | null = null;
/** @internal */
override component = MongoLoggableComponent.COMMAND;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting idea - I see it's used in TypedEventEmitter to know which logging component to use.

Will there ever be a case where emitAndLogCommand will be used to log for a component that is not MongoLoggableComponent.COMMAND?

Copy link
Contributor Author

@aditi-khare-mongoDB aditi-khare-mongoDB Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there won't. The extra this.component check should ensure that an emitAndLog or variant function (emitAndLogCommand, emitAndLogHeartbeat) is not called from any class for which the logging component is not defined already.

Copy link
Contributor Author

@aditi-khare-mongoDB aditi-khare-mongoDB Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add in an extra check such as: this.component === MongoLoggableComponent.COMMAND, remove the this.component check for emitAndLogCommand, or keep it the same. What do you think?

Copy link
Contributor

@baileympearson baileympearson Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to me that we have a dedicated method for logging commands, but to use it we also need to set the component on the class. I would expect that either:

  • we only have emitAndLogCommand(), and not the component property.
  • we have a generic emitAndLog() method that uses the component property and no specialized methods on the TypedEventEmitter

No, there won't. The extra this.component check should ensure that an emitAndLog or variant function (emitAndLogCommand, emitAndLogHeartbeat) is not called from any class for which the logging component is not defined already.

If we want to prevent the emitAndLog functions from being called, maybe we could use inheritance for this? that way we could only define the log methods where they can be used

class TypedEventEmitter {
  private logger: MongoLogger;
}

class CommandEventEmitter extends TypedEventEmitter {
  emitAndLogCommand(..) {}
}


class HeartbeatEventEmitter extends TypedEventEmitter {
  emitAndLogHeartbeat(..) {}
}

class Connection extends CommandEventEmitter { ...} 
class Monitor extends HeartbeatEventEmitter { ...} 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. For the sake of this PR, I'll just hardcode in the component in emitAndLogCommand as MongoLoggableComponent.COMMAND. Since I realize emitAndLogCommand already implies that the command component is being utilized.

/** @internal */
override mongoLogger: MongoLogger | undefined;

/** @event */
static readonly COMMAND_STARTED = COMMAND_STARTED;
Expand All @@ -198,6 +214,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.mongoLogger = options.mongoLogger;
this.established = false;

this.description = new StreamDescription(this.address, options);
this.generation = options.generation;
Expand Down Expand Up @@ -441,10 +459,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
if (this.monitorCommands) {
if (
this.monitorCommands ||
(this.established && this.mongoLogger?.willLog(SeverityLevel.DEBUG, this.component))
) {
started = now();
this.emit(
this.emitAndLogCommand(
this.monitorCommands,
Connection.COMMAND_STARTED,
message.databaseName,
this.established,
new CommandStartedEvent(this, message, this.description.serverConnectionId)
);
}
Expand All @@ -464,9 +488,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
throw new MongoServerError(document);
}

if (this.monitorCommands) {
this.emit(
if (
this.monitorCommands ||
(this.established && this.mongoLogger?.willLog(SeverityLevel.DEBUG, this.component))
) {
this.emitAndLogCommand(
this.monitorCommands,
Connection.COMMAND_SUCCEEDED,
message.databaseName,
this.established,
new CommandSucceededEvent(
this,
message,
Expand All @@ -481,10 +511,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.controller.signal.throwIfAborted();
}
} catch (error) {
if (this.monitorCommands) {
if (
this.monitorCommands ||
(this.established && this.mongoLogger?.willLog(SeverityLevel.DEBUG, this.component))
) {
if (error.name === 'MongoWriteConcernError') {
this.emit(
this.emitAndLogCommand(
this.monitorCommands,
Connection.COMMAND_SUCCEEDED,
message.databaseName,
this.established,
new CommandSucceededEvent(
this,
message,
Expand All @@ -494,8 +530,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
)
);
} else {
this.emit(
this.emitAndLogCommand(
this.monitorCommands,
Connection.COMMAND_FAILED,
message.databaseName,
this.established,
new CommandFailedEvent(
this,
message,
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
...this.options,
id: this[kConnectionCounter].next().value,
generation: this[kGeneration],
cancellationToken: this[kCancellationToken]
cancellationToken: this[kCancellationToken],
mongoLogger: this.mongoLogger
};

this[kPending]++;
Expand Down
220 changes: 220 additions & 0 deletions src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import { Duplex, type DuplexOptions } from 'stream';

import type { BSONSerializeOptions, Document } from '../bson';
import { MongoDecompressionError, MongoParseError } from '../error';
import type { ClientSession } from '../sessions';
import { BufferPool, type Callback } from '../utils';
import {
type MessageHeader,
OpCompressedRequest,
OpMsgResponse,
OpQueryResponse,
type WriteProtocolMessageType
} from './commands';
import { compress, Compressor, type CompressorName, decompress } from './wire_protocol/compression';
import { OP_COMPRESSED, OP_MSG } from './wire_protocol/constants';

const MESSAGE_HEADER_SIZE = 16;
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
/** @internal */
const kBuffer = Symbol('buffer');

/** @internal */
export interface MessageStreamOptions extends DuplexOptions {
maxBsonMessageSize?: number;
}

/** @internal */
export interface OperationDescription extends BSONSerializeOptions {
started: number;
cb: Callback<Document>;
documentsReturnedIn?: string;
noResponse: boolean;
raw: boolean;
requestId: number;
session?: ClientSession;
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
$clusterTime?: Document;
}

/**
* A duplex stream that is capable of reading and writing raw wire protocol messages, with
* support for optional compression
* @internal
*/
export class MessageStream extends Duplex {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
/** @internal */
maxBsonMessageSize: number;
/** @internal */
[kBuffer]: BufferPool;
/** @internal */
isMonitoringConnection = false;

constructor(options: MessageStreamOptions = {}) {
super(options);
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
this[kBuffer] = new BufferPool();
}

get buffer(): BufferPool {
return this[kBuffer];
}

override _write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
this[kBuffer].append(chunk);
processIncomingData(this, callback);
}

override _read(/* size */): void {
// NOTE: This implementation is empty because we explicitly push data to be read
// when `writeMessage` is called.
return;
}

writeCommand(
command: WriteProtocolMessageType,
operationDescription: OperationDescription
): void {
const agreedCompressor = operationDescription.agreedCompressor ?? 'none';
if (agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)) {
const data = command.toBin();
this.push(Array.isArray(data) ? Buffer.concat(data) : data);
return;
}
// otherwise, compress the message
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

// Extract information needed for OP_COMPRESSED from the uncompressed message
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

const options = {
agreedCompressor,
zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0
};
// Compress the message body
compress(options, messageToBeCompressed).then(
compressedMessage => {
// Create the msgHeader of OP_COMPRESSED
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
0
); // messageLength
msgHeader.writeInt32LE(command.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode

// Create the compression details of OP_COMPRESSED
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID
this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
},
error => {
operationDescription.cb(error);
}
);
}
}

function processIncomingData(stream: MessageStream, callback: Callback<Buffer>): void {
const buffer = stream[kBuffer];
const sizeOfMessage = buffer.getInt32();

if (sizeOfMessage == null) {
return callback();
}

if (sizeOfMessage < 0) {
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
}

if (sizeOfMessage > stream.maxBsonMessageSize) {
return callback(
new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
)
);
}

if (sizeOfMessage > buffer.length) {
return callback();
}

const message = buffer.read(sizeOfMessage);
const messageHeader: MessageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
};

const monitorHasAnotherHello = () => {
if (stream.isMonitoringConnection) {
// Can we read the next message size?
const sizeOfMessage = buffer.getInt32();
if (sizeOfMessage != null && sizeOfMessage <= buffer.length) {
return true;
}
}
return false;
};

let ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse;
if (messageHeader.opCode !== OP_COMPRESSED) {
const messageBody = message.subarray(MESSAGE_HEADER_SIZE);

// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (monitorHasAnotherHello()) {
return processIncomingData(stream, callback);
}

stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
return processIncomingData(stream, callback);
}
return callback();
}

messageHeader.fromCompressed = true;
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);

// recalculate based on wrapped opcode
ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse;
decompress(compressorID, compressedBuffer).then(
messageBody => {
if (messageBody.length !== messageHeader.length) {
return callback(
new MongoDecompressionError('Message body and message header must be the same length')
);
}

// If we are a monitoring connection message stream and
// there is more in the buffer that can be read, skip processing since we
// want the last hello command response that is in the buffer.
if (monitorHasAnotherHello()) {
return processIncomingData(stream, callback);
}
stream.emit('message', new ResponseType(message, messageHeader, messageBody));

if (buffer.length >= 4) {
return processIncomingData(stream, callback);
}
return callback();
},
error => {
return callback(error);
}
);
}
3 changes: 3 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ export const CONNECTION_CHECKED_OUT = 'connectionCheckedOut' as const;
/** @internal */
export const CONNECTION_CHECKED_IN = 'connectionCheckedIn' as const;
export const CLUSTER_TIME_RECEIVED = 'clusterTimeReceived' as const;
/** @internal */
export const COMMAND_STARTED = 'commandStarted' as const;
/** @internal */
export const COMMAND_SUCCEEDED = 'commandSucceeded' as const;
/** @internal */
export const COMMAND_FAILED = 'commandFailed' as const;
/** @internal */
export const SERVER_HEARTBEAT_STARTED = 'serverHeartbeatStarted' as const;
Expand Down
Loading