Skip to content

Commit

Permalink
Add MongoCommandStartedEventTags for Mongodb metrics (#3069)
Browse files Browse the repository at this point in the history
Support database tag at MongoCommandEvent
Resolves gh-2932
  • Loading branch information
SeHun Shin authored Mar 22, 2022
1 parent e4dfd43 commit 8d9db81
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,16 @@
*/
package io.micrometer.binder.mongodb;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.util.StringUtils;
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Default implementation for {@link MongoCommandTagsProvider}.
Expand All @@ -41,80 +34,34 @@
*/
public class DefaultMongoCommandTagsProvider implements MongoCommandTagsProvider {

// See https://docs.mongodb.com/manual/reference/command for the command reference
private static final Set<String> COMMANDS_WITH_COLLECTION_NAME = new HashSet<>(Arrays.asList(
"aggregate", "count", "distinct", "mapReduce", "geoSearch", "delete", "find", "findAndModify",
"insert", "update", "collMod", "compact", "convertToCapped", "create", "createIndexes", "drop",
"dropIndexes", "killCursors", "listIndexes", "reIndex"));

private static final WarnThenDebugLogger WARN_THEN_DEBUG_LOGGER = new WarnThenDebugLogger(DefaultMongoCommandTagsProvider.class);

private final ConcurrentMap<Integer, String> inFlightCommandCollectionNames = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, MongoCommandStartedEventTags> inFlightCommandStartedEventTags = new ConcurrentHashMap<>();

@Override
public Iterable<Tag> commandTags(CommandEvent event) {
Optional<MongoCommandStartedEventTags> mongoCommandStartedEventTags = Optional.ofNullable(inFlightCommandStartedEventTags.remove(event.getRequestId()));
return Tags.of(
Tag.of("command", event.getCommandName()),
Tag.of("collection", getAndRemoveCollectionNameForCommand(event)),
Tag.of("database", mongoCommandStartedEventTags.map(MongoCommandStartedEventTags::getDatabase).orElse("unknown")),
Tag.of("collection", mongoCommandStartedEventTags.map(MongoCommandStartedEventTags::getCollection).orElse("unknown")),
Tag.of("cluster.id", event.getConnectionDescription().getConnectionId().getServerId().getClusterId().getValue()),
Tag.of("server.address", event.getConnectionDescription().getServerAddress().toString()),
Tag.of("status", (event instanceof CommandSucceededEvent) ? "SUCCESS" : "FAILED"));
}

@Override
public void commandStarted(CommandStartedEvent event) {
determineCollectionName(event.getCommandName(), event.getCommand())
.ifPresent(collectionName -> addCollectionNameForCommand(event, collectionName));
MongoCommandStartedEventTags tags = new MongoCommandStartedEventTags(event);
addTagsForStartedCommandEvent(event, tags);
}

private void addCollectionNameForCommand(CommandEvent event, String collectionName) {
if (inFlightCommandCollectionNames.size() < 1000) {
inFlightCommandCollectionNames.put(event.getRequestId(), collectionName);
private void addTagsForStartedCommandEvent(CommandEvent event, MongoCommandStartedEventTags tags) {
if (inFlightCommandStartedEventTags.size() < 1000) {
inFlightCommandStartedEventTags.put(event.getRequestId(), tags);
return;
}
// Cache over capacity
WARN_THEN_DEBUG_LOGGER.log("Collection names cache is full - Mongo is not calling listeners properly");
}

private String getAndRemoveCollectionNameForCommand(CommandEvent event) {
String collectionName = inFlightCommandCollectionNames.remove(event.getRequestId());
return collectionName != null ? collectionName : "unknown";
}

/**
* Attempts to determine the name of the collection a command is operating on.
*
* <p>Because some commands either do not have collection info or it is problematic to determine the collection info,
* there is an allow list of command names {@code COMMANDS_WITH_COLLECTION_NAME} used. If {@code commandName} is
* not in the allow list or there is no collection info in {@code command}, it will use the content of the
* {@code 'collection'} field on {@code command}, if it exists.
*
* <p>Taken from <a href="https://github.com/openzipkin/brave/blob/master/instrumentation/mongodb/src/main/java/brave/mongodb/TraceMongoCommandListener.java#L115">TraceMongoCommandListener.java in Brave</a>
*
* @param commandName name of the mongo command
* @param command mongo command object
* @return optional collection name or empty if could not be determined or not in the allow list of command names
*/
protected Optional<String> determineCollectionName(String commandName, BsonDocument command) {
if (COMMANDS_WITH_COLLECTION_NAME.contains(commandName)) {
Optional<String> collectionName = getNonEmptyBsonString(command.get(commandName));
if (collectionName.isPresent()) {
return collectionName;
}
}
// Some other commands, like getMore, have a field like {"collection": collectionName}.
return getNonEmptyBsonString(command.get("collection"));
}

/**
* @return trimmed string from {@code bsonValue} in the Optional or empty Optional if value was not a non-empty string
*/
private Optional<String> getNonEmptyBsonString(BsonValue bsonValue) {
return Optional.ofNullable(bsonValue)
.filter(BsonValue::isString)
.map(BsonValue::asString)
.map(BsonString::getValue)
.map(String::trim)
.filter(StringUtils::isNotEmpty);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2021 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.binder.mongodb;

import com.mongodb.event.CommandStartedEvent;
import io.micrometer.core.instrument.util.StringUtils;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

class MongoCommandStartedEventTags {

// See https://docs.mongodb.com/manual/reference/command for the command reference
private static final Set<String> COMMANDS_WITH_COLLECTION_NAME = new HashSet<>(Arrays.asList(
"aggregate", "count", "distinct", "mapReduce", "geoSearch", "delete", "find", "findAndModify",
"insert", "update", "collMod", "compact", "convertToCapped", "create", "createIndexes", "drop",
"dropIndexes", "killCursors", "listIndexes", "reIndex"));
public static final String UNKNOWN = "unknown";

public MongoCommandStartedEventTags(CommandStartedEvent event) {
this.database = event.getDatabaseName();
this.collection = this.determineCollectionName(event.getCommandName(), event.getCommand())
.orElse(UNKNOWN);
}

private final String collection;
private final String database;

public String getDatabase() {
return database;
}

public String getCollection() {
return collection;
}

/**
* Attempts to determine the name of the collection a command is operating on.
*
* <p>Because some commands either do not have collection info or it is problematic to determine the collection info,
* there is an allow list of command names {@code COMMANDS_WITH_COLLECTION_NAME} used. If {@code commandName} is
* not in the allow list or there is no collection info in {@code command}, it will use the content of the
* {@code 'collection'} field on {@code command}, if it exists.
*
* <p>Taken from <a href="https://github.com/openzipkin/brave/blob/master/instrumentation/mongodb/src/main/java/brave/mongodb/TraceMongoCommandListener.java#L115">TraceMongoCommandListener.java in Brave</a>
*
* @param commandName name of the mongo command
* @param command mongo command object
* @return optional collection name or empty if could not be determined or not in the allow list of command names
*/
private Optional<String> determineCollectionName(String commandName, BsonDocument command) {
Optional<String> collectionName = Optional.ofNullable(commandName)
.filter(COMMANDS_WITH_COLLECTION_NAME::contains)
.map(command::get)
.flatMap(this::getNonEmptyBsonString);

if (collectionName.isPresent()) {
return collectionName;
}

return getNonEmptyBsonString(command.get("collection"));
}

/**
* @return trimmed string from {@code bsonValue} in the Optional or empty Optional if value was not a non-empty string
*/
private Optional<String> getNonEmptyBsonString(BsonValue bsonValue) {
return Optional.ofNullable(bsonValue)
.filter(BsonValue::isString)
.map(BsonValue::asString)
.map(BsonString::getValue)
.map(String::trim)
.filter(StringUtils::isNotEmpty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,15 @@
*/
package io.micrometer.binder.mongodb;

import java.util.Arrays;

import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerId;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import io.micrometer.core.instrument.Tag;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonElement;
import org.bson.BsonString;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -40,6 +35,7 @@
*/
class DefaultMongoCommandTagsProviderTest {


private final ConnectionDescription connectionDesc = new ConnectionDescription(
new ServerId(new ClusterId("cluster1"), new ServerAddress("localhost", 5150)));

Expand All @@ -51,6 +47,7 @@ void defaultCommandTags() {
Iterable<Tag> tags = tagsProvider.commandTags(event);
assertThat(tags).containsExactlyInAnyOrder(
Tag.of("command", "find"),
Tag.of("database", "unknown"),
Tag.of("collection", "unknown"),
Tag.of("cluster.id", connectionDesc.getConnectionId().getServerId().getClusterId().getValue()),
Tag.of("server.address", "localhost:5150"),
Expand All @@ -68,11 +65,11 @@ void handlesCommandsOverLimitGracefully() {
// 1001 will not be added to state map and therefore will use 'unknown'
tagsProvider.commandStarted(commandStartedEvent(1001));
Iterable<Tag> tags = tagsProvider.commandTags(commandSucceededEvent(1001));
assertThat(tags).contains(Tag.of("collection", "unknown"));
assertThat(tags).contains(Tag.of("database", "unknown"), Tag.of("collection", "unknown"));

// Complete 1000 - which will remove previously added entry from state map
tags = tagsProvider.commandTags(commandSucceededEvent(1000));
assertThat(tags).contains(Tag.of("collection", "collection-1000"));
assertThat(tags).contains(Tag.of("database", "db1"), Tag.of("collection", "collection-1000"));

// 1001 will now be put in state map (since 1000 removed and made room for it)
tagsProvider.commandStarted(commandStartedEvent(1001));
Expand All @@ -81,10 +78,10 @@ void handlesCommandsOverLimitGracefully() {
tagsProvider.commandStarted(commandStartedEvent(1002));

tags = tagsProvider.commandTags(commandSucceededEvent(1001));
assertThat(tags).contains(Tag.of("collection", "collection-1001"));
assertThat(tags).contains(Tag.of("database", "db1"), Tag.of("collection", "collection-1001"));

tags = tagsProvider.commandTags(commandSucceededEvent(1002));
assertThat(tags).contains(Tag.of("collection", "unknown"));
assertThat(tags).contains(Tag.of("database", "unknown"), Tag.of("collection", "unknown"));
}

private CommandStartedEvent commandStartedEvent(int requestId) {
Expand All @@ -104,57 +101,4 @@ private CommandSucceededEvent commandSucceededEvent(int requestId) {
new BsonDocument(),
1200L);
}

@Nested
class DetermineCollectionName {

@Test
void withNameInAllowList() {
assertThat(tagsProvider.determineCollectionName("find", new BsonDocument("find", new BsonString(" bar ")))).hasValue("bar");
}

@Test
void withNameNotInAllowList() {
assertThat(tagsProvider.determineCollectionName("cmd", new BsonDocument("cmd", new BsonString(" bar ")))).isEmpty();
}

@Test
void withNameNotInCommand() {
assertThat(tagsProvider.determineCollectionName("find", new BsonDocument())).isEmpty();
}

@Test
void withNonStringCommand() {
assertThat(tagsProvider.determineCollectionName("find", new BsonDocument("find", BsonBoolean.TRUE))).isEmpty();
}

@Test
void withEmptyStringCommand() {
assertThat(tagsProvider.determineCollectionName("find", new BsonDocument("find", new BsonString(" ")))).isEmpty();
}

@Test
void withCollectionFieldOnly() {
assertThat(tagsProvider.determineCollectionName("find", new BsonDocument("collection", new BsonString(" bar ")))).hasValue("bar");
}

@Test
void withCollectionFieldAndAllowListedCommand() {
BsonDocument command = new BsonDocument(Arrays.asList(
new BsonElement("collection", new BsonString("coll")),
new BsonElement("find", new BsonString("bar"))
));
assertThat(tagsProvider.determineCollectionName("find", command)).hasValue("bar");
}

@Test
void withCollectionFieldAndNotAllowListedCommand() {
BsonDocument command = new BsonDocument(Arrays.asList(
new BsonElement("collection", new BsonString("coll")),
new BsonElement("cmd", new BsonString("bar"))
));
assertThat(tagsProvider.determineCollectionName("find", command)).hasValue("coll");
}
}

}
Loading

0 comments on commit 8d9db81

Please sign in to comment.