Skip to content

Commit

Permalink
Merge branch 'tiramisu-stats-api' into tiramisu-stats-tsc
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Apr 9, 2024
2 parents 96c9493 + c4cea24 commit e7d9261
Show file tree
Hide file tree
Showing 29 changed files with 1,035 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener(), builder.getWeigher());
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
this.statsHolder = new StatsHolder(dimensionNames);
this.statsHolder = new StatsHolder(dimensionNames, EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME);
}

@SuppressWarnings({ "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

Expand Down Expand Up @@ -797,6 +803,75 @@ public void testInvalidate() throws Exception {
}
}

// Modified from OpenSearchOnHeapCacheTests.java
public void testInvalidateWithDropDimensions() throws Exception {
Settings settings = Settings.builder().build();
List<String> dimensionNames = List.of("dim1", "dim2");
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setDimensionNames(dimensionNames)
.setKeyType(String.class)
.setValueType(String.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setRemovalListener(new MockRemovalListener<>())
.setWeigher((key, value) -> 1)
.build();

List<ICacheKey<String>> keysAdded = new ArrayList<>();

for (int i = 0; i < 20; i++) {
ICacheKey<String> key = new ICacheKey<>(UUID.randomUUID().toString(), getRandomDimensions(dimensionNames));
keysAdded.add(key);
ehCacheDiskCachingTier.put(key, UUID.randomUUID().toString());
}

ICacheKey<String> keyToDrop = keysAdded.get(0);

Map<String, Object> xContentMap = getStatsXContentMap(ehCacheDiskCachingTier.stats(), dimensionNames);
List<String> xContentMapKeys = getXContentMapKeys(keyToDrop, dimensionNames);
Map<String, Object> individualSnapshotMap = (Map<String, Object>) getValueFromNestedXContentMap(xContentMap, xContentMapKeys);
assertNotNull(individualSnapshotMap);
assertEquals(5, individualSnapshotMap.size()); // Assert all 5 stats are present and not null
for (Map.Entry<String, Object> entry : individualSnapshotMap.entrySet()) {
Integer value = (Integer) entry.getValue();
assertNotNull(value);
}

keyToDrop.setDropStatsForDimensions(true);
ehCacheDiskCachingTier.invalidate(keyToDrop);

// Now assert the stats are gone for any key that has this combination of dimensions, but still there otherwise
xContentMap = getStatsXContentMap(ehCacheDiskCachingTier.stats(), dimensionNames);
for (ICacheKey<String> keyAdded : keysAdded) {
xContentMapKeys = getXContentMapKeys(keyAdded, dimensionNames);
individualSnapshotMap = (Map<String, Object>) getValueFromNestedXContentMap(xContentMap, xContentMapKeys);
if (keyAdded.dimensions.equals(keyToDrop.dimensions)) {
assertNull(individualSnapshotMap);
} else {
assertNotNull(individualSnapshotMap);
}
}

ehCacheDiskCachingTier.close();
}
}

private List<String> getRandomDimensions(List<String> dimensionNames) {
Random rand = Randomness.get();
int bound = 3;
List<String> result = new ArrayList<>();
for (String dimName : dimensionNames) {
result.add(String.valueOf(rand.nextInt(bound)));
}
return result;
}

private static String generateRandomString(int length) {
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder randomString = new StringBuilder(length);
Expand Down Expand Up @@ -831,6 +906,42 @@ private ToLongBiFunction<ICacheKey<String>, String> getWeigher() {
};
}

// Helper functions duplicated from server.test; we can't add a dependency on that module
private static Map<String, Object> getStatsXContentMap(CacheStats cacheStats, List<String> levels) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
Map<String, String> paramMap = Map.of("level", String.join(",", levels));
ToXContent.Params params = new ToXContent.MapParams(paramMap);

builder.startObject();
cacheStats.toXContent(builder, params);
builder.endObject();

String resultString = builder.toString();
return XContentHelper.convertToMap(MediaTypeRegistry.JSON.xContent(), resultString, true);
}

private List<String> getXContentMapKeys(ICacheKey<?> iCacheKey, List<String> dimensionNames) {
List<String> result = new ArrayList<>();
assert iCacheKey.dimensions.size() == dimensionNames.size();
for (int i = 0; i < dimensionNames.size(); i++) {
result.add(dimensionNames.get(i));
result.add(iCacheKey.dimensions.get(i));
}
return result;
}

public static Object getValueFromNestedXContentMap(Map<String, Object> xContentMap, List<String> keys) {
Map<String, Object> current = xContentMap;
for (int i = 0; i < keys.size() - 1; i++) {
Object next = current.get(keys.get(i));
if (next == null) {
return null;
}
current = (Map<String, Object>) next;
}
return current.get(keys.get(keys.size() - 1));
}

static class MockRemovalListener<K, V> implements RemovalListener<ICacheKey<K>, V> {

CounterMetric evictionMetric = new CounterMetric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,46 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.cache.service.NodeCacheStats;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
import org.opensearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
Expand Down Expand Up @@ -677,6 +692,34 @@ public void testCacheWithInvalidation() throws Exception {
assertCacheState(client, "index", 1, 2);
}

public void testCacheStatsAPI() throws Exception {
final String nodeId = internalCluster().startNode();
Client client = client(nodeId);
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));

Map<String, Object> xContentMap = getNodeCacheStatsXContentMap(client, nodeId, List.of());
int j = 0;

}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
Expand All @@ -694,4 +737,26 @@ private static void assertCacheState(Client client, String index, long expectedH

}

private static Map<String, Object> getNodeCacheStatsXContentMap(Client client, String nodeId, List<String> aggregationLevels) throws IOException {


NodesStatsResponse nodeStatsResponse = client.admin().cluster()
.prepareNodesStats("data:true")
.addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName())
.get();
Map<String, NodeStats> intermediate = nodeStatsResponse.getNodesMap();
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();

XContentBuilder builder = XContentFactory.jsonBuilder();
Map<String, String> paramMap = Map.of("level", String.join(",", aggregationLevels));
ToXContent.Params params = new ToXContent.MapParams(paramMap);

builder.startObject();
ncs.toXContent(builder, params);
builder.endObject();

String resultString = builder.toString();
return XContentHelper.convertToMap(MediaTypeRegistry.JSON.xContent(), resultString, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.cache.service.NodeCacheStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.indices.breaker.AllCircuitBreakerStats;
Expand Down Expand Up @@ -158,6 +159,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private AdmissionControlStats admissionControlStats;

@Nullable
private NodeCacheStats nodeCacheStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -234,6 +238,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
admissionControlStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
nodeCacheStats = in.readOptionalWriteable(NodeCacheStats::new);
} else {
nodeCacheStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -264,7 +273,8 @@ public NodeStats(
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -294,6 +304,7 @@ public NodeStats(
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -451,6 +462,11 @@ public AdmissionControlStats getAdmissionControlStats() {
return admissionControlStats;
}

@Nullable
public NodeCacheStats getNodeCacheStats() {
return nodeCacheStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -506,6 +522,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(admissionControlStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(nodeCacheStats);
}
}

@Override
Expand Down Expand Up @@ -609,6 +628,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
if (getNodeCacheStats() != null) {
getNodeCacheStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public enum Metric {
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control");
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics)
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Loading

0 comments on commit e7d9261

Please sign in to comment.