From 1715da57f5c6223bf590118bf5adad8a7afaf9da Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Wed, 18 Oct 2023 18:23:24 +0530 Subject: [PATCH] Repository stats for remote store (#10567) Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 1 + .../s3/S3BlobStoreRepositoryTests.java | 75 ++++++++ .../repositories/s3/S3BlobContainer.java | 5 +- .../repositories/s3/S3BlobStore.java | 12 ++ .../repositories/s3/StatsMetricPublisher.java | 182 +++++++++++++----- .../s3/async/AsyncTransferManager.java | 24 ++- .../s3/async/AsyncTransferManagerTests.java | 13 +- .../admin/cluster/node/stats/NodeStats.java | 24 ++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 1 + .../common/blobstore/BlobStore.java | 31 +++ .../common/blobstore/EncryptedBlobStore.java | 10 + .../main/java/org/opensearch/node/Node.java | 3 +- .../java/org/opensearch/node/NodeService.java | 12 +- .../repositories/RepositoriesService.java | 19 +- .../repositories/RepositoriesStats.java | 52 +++++ .../RepositoriesStatsArchive.java | 14 -- .../repositories/RepositoryInfo.java | 54 +----- .../repositories/RepositoryStats.java | 69 ++++++- .../repositories/RepositoryStatsSnapshot.java | 22 +-- .../blobstore/BlobStoreRepository.java | 2 + .../blobstore/MeteredBlobStoreRepository.java | 18 +- .../cluster/node/stats/NodeStatsTests.java | 1 + .../opensearch/cluster/DiskUsageTests.java | 6 + .../RepositoriesServiceTests.java | 13 +- .../RepositoriesStatsArchiveTests.java | 8 +- .../MockInternalClusterInfoService.java | 3 +- ...chMockAPIBasedRepositoryIntegTestCase.java | 2 +- .../opensearch/test/InternalTestCluster.java | 1 + 30 files changed, 479 insertions(+), 204 deletions(-) create mode 100644 server/src/main/java/org/opensearch/repositories/RepositoriesStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 243423223ca52..0742215a2dfb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) +- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 5f88ad7867513..1361f3165b653 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -37,6 +37,8 @@ import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SuppressForbidden; @@ -51,10 +53,15 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.RepositoryStats; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase; import org.opensearch.repositories.s3.utils.AwsRequestSigner; import org.opensearch.snapshots.mockstore.BlobStoreWrapper; +import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; @@ -63,12 +70,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.StreamSupport; import fixture.s3.S3HttpHandler; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") // Need to set up a new cluster for each test because cluster settings use randomized authentication settings @@ -152,6 +165,66 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } + @Override + public void testRequestStats() throws Exception { + final String repository = createRepository(randomName()); + final String index = "index-no-merges"; + createIndex( + index, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + + final long nbDocs = randomLongBetween(10_000L, 20_000L); + try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) { + waitForDocs(nbDocs, indexer); + } + + flushAndRefresh(index); + ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get(); + assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + final String snapshot = "snapshot"; + assertSuccessfulSnapshot( + client().admin().cluster().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index) + ); + + assertAcked(client().admin().indices().prepareDelete(index)); + + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true)); + ensureGreen(index); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get()); + + final RepositoryStats repositoryStats = StreamSupport.stream( + internalCluster().getInstances(RepositoriesService.class).spliterator(), + false + ).map(repositoriesService -> { + try { + return repositoriesService.repository(repository); + } catch (RepositoryMissingException e) { + return null; + } + }).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get(); + + Map> extendedStats = repositoryStats.extendedStats; + Map aggregatedStats = new HashMap<>(); + extendedStats.forEach((k, v) -> { + if (k == BlobStore.Metric.RETRY_COUNT || k == BlobStore.Metric.REQUEST_SUCCESS || k == BlobStore.Metric.REQUEST_FAILURE) { + for (Map.Entry entry : v.entrySet()) { + aggregatedStats.merge(entry.getKey(), entry.getValue(), Math::addExact); + } + } + + }); + final Map mockCalls = getMockRequestCounts(); + + String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls", aggregatedStats, mockCalls); + + assertEquals(assertionErrorMsg, mockCalls, aggregatedStats); + } + /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */ @@ -263,6 +336,8 @@ public void maybeTrack(final String request, Headers requestHeaders) { trackRequest("PutMultipartObject"); } else if (Regex.simpleMatch("PUT /*/*", request)) { trackRequest("PutObject"); + } else if (Regex.simpleMatch("POST /*?delete*", request)) { + trackRequest("DeleteObjects"); } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 9777bd974d56c..24aee99242957 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -199,7 +199,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp ? amazonS3Reference.get().priorityClient() : amazonS3Reference.get().client(); CompletableFuture completableFuture = blobStore.getAsyncTransferManager() - .uploadObject(s3AsyncClient, uploadRequest, streamContext); + .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); completableFuture.whenComplete((response, throwable) -> { if (throwable == null) { completionListener.onResponse(response); @@ -384,7 +384,7 @@ private void doDeleteBlobs(List blobNames, boolean relative) throws IOEx assert outstanding.isEmpty(); } - private static DeleteObjectsRequest bulkDelete(String bucket, List blobs) { + private DeleteObjectsRequest bulkDelete(String bucket, List blobs) { return DeleteObjectsRequest.builder() .bucket(bucket) .delete( @@ -393,6 +393,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs .quiet(true) .build() ) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher)) .build(); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index 80005d92344a4..f568d871dd31a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -47,6 +47,8 @@ import org.opensearch.repositories.s3.async.AsyncTransferManager; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -180,6 +182,16 @@ public Map stats() { return statsMetricPublisher.getStats().toMap(); } + @Override + public Map> extendedStats() { + if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) { + return Collections.emptyMap(); + } + Map> extendedStats = new HashMap<>(); + statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap())); + return extendedStats; + } + public ObjectCannedACL getCannedACL() { return cannedACL; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java index cad0037f99249..0c63bfdb1ff97 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java @@ -8,10 +8,13 @@ package org.opensearch.repositories.s3; -import software.amazon.awssdk.http.HttpMetric; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.metrics.MetricRecord; +import org.opensearch.common.blobstore.BlobStore; + +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -20,18 +23,67 @@ public class StatsMetricPublisher { private final Stats stats = new Stats(); + private final Map extendedStats = new HashMap<>() { + { + put(BlobStore.Metric.REQUEST_LATENCY, new Stats()); + put(BlobStore.Metric.REQUEST_SUCCESS, new Stats()); + put(BlobStore.Metric.REQUEST_FAILURE, new Stats()); + put(BlobStore.Metric.RETRY_COUNT, new Stats()); + } + }; + public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { - stats.listCount.addAndGet( - metricCollection.children() - .stream() - .filter( - metricRecords -> metricRecords.name().equals("ApiCallAttempt") - && !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty() - ) - .count() - ); + for (MetricRecord metricRecord : metricCollection) { + switch (metricRecord.metric().name()) { + case "ApiCallDuration": + extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).listMetrics.addAndGet( + ((Duration) metricRecord.value()).toMillis() + ); + break; + case "RetryCount": + extendedStats.get(BlobStore.Metric.RETRY_COUNT).listMetrics.addAndGet(((Integer) metricRecord.value())); + break; + case "ApiCallSuccessful": + if ((Boolean) metricRecord.value()) { + extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).listMetrics.addAndGet(1); + } else { + extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).listMetrics.addAndGet(1); + } + stats.listMetrics.addAndGet(1); + break; + } + } + } + + @Override + public void close() {} + }; + + public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() { + @Override + public void publish(MetricCollection metricCollection) { + for (MetricRecord metricRecord : metricCollection) { + switch (metricRecord.metric().name()) { + case "ApiCallDuration": + extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).deleteMetrics.addAndGet( + ((Duration) metricRecord.value()).toMillis() + ); + break; + case "RetryCount": + extendedStats.get(BlobStore.Metric.RETRY_COUNT).deleteMetrics.addAndGet(((Integer) metricRecord.value())); + break; + case "ApiCallSuccessful": + if ((Boolean) metricRecord.value()) { + extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).deleteMetrics.addAndGet(1); + } else { + extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).deleteMetrics.addAndGet(1); + } + stats.deleteMetrics.addAndGet(1); + break; + } + } } @Override @@ -41,15 +93,26 @@ public void close() {} public MetricPublisher getObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { - stats.getCount.addAndGet( - metricCollection.children() - .stream() - .filter( - metricRecords -> metricRecords.name().equals("ApiCallAttempt") - && !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty() - ) - .count() - ); + for (MetricRecord metricRecord : metricCollection) { + switch (metricRecord.metric().name()) { + case "ApiCallDuration": + extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).getMetrics.addAndGet( + ((Duration) metricRecord.value()).toMillis() + ); + break; + case "RetryCount": + extendedStats.get(BlobStore.Metric.RETRY_COUNT).getMetrics.addAndGet(((Integer) metricRecord.value())); + break; + case "ApiCallSuccessful": + if ((Boolean) metricRecord.value()) { + extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).getMetrics.addAndGet(1); + } else { + extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).getMetrics.addAndGet(1); + } + stats.getMetrics.addAndGet(1); + break; + } + } } @Override @@ -59,15 +122,26 @@ public void close() {} public MetricPublisher putObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { - stats.putCount.addAndGet( - metricCollection.children() - .stream() - .filter( - metricRecords -> metricRecords.name().equals("ApiCallAttempt") - && !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty() - ) - .count() - ); + for (MetricRecord metricRecord : metricCollection) { + switch (metricRecord.metric().name()) { + case "ApiCallDuration": + extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).putMetrics.addAndGet( + ((Duration) metricRecord.value()).toMillis() + ); + break; + case "RetryCount": + extendedStats.get(BlobStore.Metric.RETRY_COUNT).putMetrics.addAndGet(((Integer) metricRecord.value())); + break; + case "ApiCallSuccessful": + if ((Boolean) metricRecord.value()) { + extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).putMetrics.addAndGet(1); + } else { + extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).putMetrics.addAndGet(1); + } + stats.putMetrics.addAndGet(1); + break; + } + } } @Override @@ -77,15 +151,26 @@ public void close() {} public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { - stats.postCount.addAndGet( - metricCollection.children() - .stream() - .filter( - metricRecords -> metricRecords.name().equals("ApiCallAttempt") - && !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty() - ) - .count() - ); + for (MetricRecord metricRecord : metricCollection) { + switch (metricRecord.metric().name()) { + case "ApiCallDuration": + extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).multiPartPutMetrics.addAndGet( + ((Duration) metricRecord.value()).toMillis() + ); + break; + case "RetryCount": + extendedStats.get(BlobStore.Metric.RETRY_COUNT).multiPartPutMetrics.addAndGet(((Integer) metricRecord.value())); + break; + case "ApiCallSuccessful": + if ((Boolean) metricRecord.value()) { + extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).multiPartPutMetrics.addAndGet(1); + } else { + extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).multiPartPutMetrics.addAndGet(1); + } + stats.multiPartPutMetrics.addAndGet(1); + break; + } + } } @Override @@ -96,22 +181,29 @@ public Stats getStats() { return stats; } + public Map getExtendedStats() { + return extendedStats; + } + static class Stats { - final AtomicLong listCount = new AtomicLong(); + final AtomicLong listMetrics = new AtomicLong(); + + final AtomicLong getMetrics = new AtomicLong(); - final AtomicLong getCount = new AtomicLong(); + final AtomicLong putMetrics = new AtomicLong(); - final AtomicLong putCount = new AtomicLong(); + final AtomicLong deleteMetrics = new AtomicLong(); - final AtomicLong postCount = new AtomicLong(); + final AtomicLong multiPartPutMetrics = new AtomicLong(); Map toMap() { final Map results = new HashMap<>(); - results.put("GetObject", getCount.get()); - results.put("ListObjects", listCount.get()); - results.put("PutObject", putCount.get()); - results.put("PutMultipartObject", postCount.get()); + results.put("GetObject", getMetrics.get()); + results.put("ListObjects", listMetrics.get()); + results.put("PutObject", putMetrics.get()); + results.put("DeleteObjects", deleteMetrics.get()); + results.put("PutMultipartObject", multiPartPutMetrics.get()); return results; } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index fb1e409388a69..db04636b89d50 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -35,6 +35,7 @@ import org.opensearch.common.util.ByteUtils; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; +import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; import java.io.BufferedInputStream; @@ -87,16 +88,21 @@ public AsyncTransferManager(long minimumPartSize, ExecutorService executorServic * @param streamContext The {@link StreamContext} to supply streams during upload * @return A {@link CompletableFuture} to listen for upload completion */ - public CompletableFuture uploadObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext) { + public CompletableFuture uploadObject( + S3AsyncClient s3AsyncClient, + UploadRequest uploadRequest, + StreamContext streamContext, + StatsMetricPublisher statsMetricPublisher + ) { CompletableFuture returnFuture = new CompletableFuture<>(); try { if (streamContext.getNumberOfParts() == 1) { log.debug(() -> "Starting the upload as a single upload part request"); - uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture); + uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); } else { log.debug(() -> "Starting the upload as multipart upload request"); - uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture); + uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); } } catch (Throwable throwable) { returnFuture.completeExceptionally(throwable); @@ -109,12 +115,14 @@ private void uploadInParts( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext, - CompletableFuture returnFuture + CompletableFuture returnFuture, + StatsMetricPublisher statsMetricPublisher ) { CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder() .bucket(uploadRequest.getBucket()) - .key(uploadRequest.getKey()); + .key(uploadRequest.getKey()) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)); if (uploadRequest.doRemoteDataIntegrityCheck()) { createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); } @@ -287,12 +295,14 @@ private void uploadInOneChunk( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, InputStreamContainer inputStreamContainer, - CompletableFuture returnFuture + CompletableFuture returnFuture, + StatsMetricPublisher statsMetricPublisher ) { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) .key(uploadRequest.getKey()) - .contentLength(uploadRequest.getContentLength()); + .contentLength(uploadRequest.getContentLength()) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher)); if (uploadRequest.doRemoteDataIntegrityCheck()) { putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 9c07b929052bc..607453986ab16 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -33,6 +33,7 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.blobstore.ZeroInputStream; +import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -80,7 +81,8 @@ public void testOneChunkUpload() { ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1 - ) + ), + new StatsMetricPublisher() ); try { @@ -118,7 +120,8 @@ public void testOneChunkUploadCorruption() { ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1 - ) + ), + new StatsMetricPublisher() ); try { @@ -169,7 +172,8 @@ public void testMultipartUpload() { ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5 - ) + ), + new StatsMetricPublisher() ); try { @@ -219,7 +223,8 @@ public void testMultipartUploadCorruption() { ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5 - ) + ), + new StatsMetricPublisher() ); try { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 69efea186d927..6ce6ca40cbce4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -57,6 +57,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; @@ -146,6 +147,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private NodesResourceUsageStats resourceUsageStats; + @Nullable + private RepositoriesStats repositoriesStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -207,6 +211,11 @@ public NodeStats(StreamInput in) throws IOException { } else { resourceUsageStats = null; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new); + } else { + repositoriesStats = null; + } } public NodeStats( @@ -234,7 +243,8 @@ public NodeStats( @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, - @Nullable SearchPipelineStats searchPipelineStats + @Nullable SearchPipelineStats searchPipelineStats, + @Nullable RepositoriesStats repositoriesStats ) { super(node); this.timestamp = timestamp; @@ -261,6 +271,7 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; + this.repositoriesStats = repositoriesStats; } public long getTimestamp() { @@ -403,6 +414,11 @@ public SearchPipelineStats getSearchPipelineStats() { return searchPipelineStats; } + @Nullable + public RepositoriesStats getRepositoriesStats() { + return repositoriesStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -449,6 +465,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport out.writeOptionalWriteable(resourceUsageStats); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(repositoriesStats); + } } @Override @@ -542,6 +561,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getResourceUsageStats() != null) { getResourceUsageStats().toXContent(builder, params); } + if (getRepositoriesStats() != null) { + getRepositoriesStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 99c9fb2d1e26a..88dff20354aa2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -214,7 +214,8 @@ public enum Metric { FILE_CACHE_STATS("file_cache"), TASK_CANCELLATION("task_cancellation"), SEARCH_PIPELINE("search_pipeline"), - RESOURCE_USAGE_STATS("resource_usage_stats"); + RESOURCE_USAGE_STATS("resource_usage_stats"), + REPOSITORIES("repositories"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 204157236a282..aa02f8e580f4a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -125,7 +125,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), - NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), + NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index d8323e209be23..f51fabbfb2388 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -169,6 +169,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index 2ee3e9557b354..0f6646d37f950 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -49,6 +49,9 @@ public interface BlobStore extends Closeable { */ BlobContainer blobContainer(BlobPath path); + /** + * Returns statistics on the count of operations that have been performed on this blob store + */ /** * Returns statistics on the count of operations that have been performed on this blob store */ @@ -56,8 +59,36 @@ default Map stats() { return Collections.emptyMap(); } + /** + * Returns details statistics of operations that have been performed on this blob store + */ + default Map> extendedStats() { + return Collections.emptyMap(); + } + /** * Reload the blob store inplace */ default void reload(RepositoryMetadata repositoryMetadata) {} + + /** + * Metrics for BlobStore interactions + */ + enum Metric { + REQUEST_SUCCESS("request_success_total"), + REQUEST_FAILURE("request_failures_total"), + REQUEST_LATENCY("request_time_in_millis"), + RETRY_COUNT("request_retry_count_total"); + + private String metricName; + + Metric(String name) { + this.metricName = name; + } + + public String metricName() { + return this.metricName; + } + } + } diff --git a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobStore.java index 4d2d69e473438..a18ca8b9d5c39 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobStore.java @@ -75,6 +75,16 @@ public Map stats() { return blobStore.stats(); } + /** + * Retrieves extended statistics about the BlobStore. Delegates the call to the underlying BlobStore's extendedStats() method. + * + * @return A map containing extended statistics about the BlobStore. + */ + @Override + public Map> extendedStats() { + return blobStore.extendedStats(); + } + /** * Closes the EncryptedBlobStore by decrementing the reference count of the CryptoManager and closing the * underlying BlobStore. This ensures proper cleanup of resources. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 69b80462bbf0b..c9148f382a028 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1115,7 +1115,8 @@ protected Node( searchPipelineService, fileCache, taskCancellationMonitoringService, - resourceUsageCollectorService + resourceUsageCollectorService, + repositoryService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 9bb07080fa717..e2d7bc2c86ba3 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -53,6 +53,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.plugins.PluginsService; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.backpressure.SearchBackpressureService; @@ -93,6 +94,7 @@ public class NodeService implements Closeable { private final Discovery discovery; private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; + private final RepositoriesService repositoriesService; NodeService( Settings settings, @@ -116,7 +118,8 @@ public class NodeService implements Closeable { SearchPipelineService searchPipelineService, FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, - ResourceUsageCollectorService resourceUsageCollectorService + ResourceUsageCollectorService resourceUsageCollectorService, + RepositoriesService repositoriesService ) { this.settings = settings; this.threadPool = threadPool; @@ -140,6 +143,7 @@ public class NodeService implements Closeable { this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; this.resourceUsageCollectorService = resourceUsageCollectorService; + this.repositoriesService = repositoriesService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } @@ -221,7 +225,8 @@ public NodeStats stats( boolean fileCacheStats, boolean taskCancellation, boolean searchPipelineStats, - boolean resourceUsageStats + boolean resourceUsageStats, + boolean repositoriesStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -250,7 +255,8 @@ public NodeStats stats( weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, - searchPipelineStats ? this.searchPipelineService.stats() : null + searchPipelineStats ? this.searchPipelineService.stats() : null, + repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null ); } diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 72266c053a1ae..68669feb16abc 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -457,7 +457,6 @@ public void applyClusterState(ClusterChangedEvent event) { logger.debug("unregistering repository [{}]", entry.getKey()); Repository repository = entry.getValue(); closeRepository(repository); - archiveRepositoryStats(repository, state.version()); } else { survivors.put(entry.getKey(), entry.getValue()); } @@ -485,7 +484,6 @@ public void applyClusterState(ClusterChangedEvent event) { } else { logger.debug("updating repository [{}]", repositoryMetadata.name()); closeRepository(repository); - archiveRepositoryStats(repository, state.version()); repository = null; try { repository = createRepository(repositoryMetadata, typesRegistry); @@ -575,12 +573,12 @@ public Repository repository(String repositoryName) { } public List repositoriesStats() { - List archivedRepoStats = repositoriesStatsArchive.getArchivedStats(); List activeRepoStats = getRepositoryStatsForActiveRepositories(); + return activeRepoStats; + } - List repositoriesStats = new ArrayList<>(archivedRepoStats); - repositoriesStats.addAll(activeRepoStats); - return repositoriesStats; + public RepositoriesStats getRepositoriesStats() { + return new RepositoriesStats(repositoriesStats()); } private List getRepositoryStatsForActiveRepositories() { @@ -640,15 +638,6 @@ public void closeRepository(Repository repository) { repository.close(); } - private void archiveRepositoryStats(Repository repository, long clusterStateVersion) { - if (repository instanceof MeteredBlobStoreRepository) { - RepositoryStatsSnapshot stats = ((MeteredBlobStoreRepository) repository).statsSnapshotForArchival(clusterStateVersion); - if (repositoriesStatsArchive.archive(stats) == false) { - logger.warn("Unable to archive the repository stats [{}] as the archive is full.", stats); - } - } - } - /** * Creates repository holder. This method starts the non-internal repository */ diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesStats.java b/server/src/main/java/org/opensearch/repositories/RepositoriesStats.java new file mode 100644 index 0000000000000..b24e0dddd852a --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesStats.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.util.CollectionUtils; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Encapsulates stats for multiple repositories* + */ +public class RepositoriesStats implements Writeable, ToXContentObject { + + List repositoryStatsSnapshots; + + public RepositoriesStats(List repositoryStatsSnapshots) { + this.repositoryStatsSnapshots = repositoryStatsSnapshots; + } + + public RepositoriesStats(StreamInput in) throws IOException { + this.repositoryStatsSnapshots = in.readList(RepositoryStatsSnapshot::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(repositoryStatsSnapshots); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("repositories"); + if (CollectionUtils.isEmpty(repositoryStatsSnapshots) == false) { + for (RepositoryStatsSnapshot repositoryStatsSnapshot : repositoryStatsSnapshots) { + repositoryStatsSnapshot.toXContent(builder, params); + } + } + builder.endArray(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesStatsArchive.java b/server/src/main/java/org/opensearch/repositories/RepositoriesStatsArchive.java index b8f100706f81e..3d35f75176eaf 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesStatsArchive.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesStatsArchive.java @@ -70,11 +70,6 @@ public RepositoriesStatsArchive(TimeValue retentionPeriod, int maxCapacity, Long * @return {@code true} if the repository stats were archived, {@code false} otherwise. */ synchronized boolean archive(final RepositoryStatsSnapshot repositoryStats) { - assert containsRepositoryStats(repositoryStats) == false : "A repository with ephemeral id " - + repositoryStats.getRepositoryInfo().ephemeralId - + " is already archived"; - assert repositoryStats.isArchived(); - evict(); if (archive.size() >= maxCapacity) { @@ -116,15 +111,6 @@ private void evict() { } } - private boolean containsRepositoryStats(RepositoryStatsSnapshot repositoryStats) { - return archive.stream() - .anyMatch( - entry -> entry.repositoryStatsSnapshot.getRepositoryInfo().ephemeralId.equals( - repositoryStats.getRepositoryInfo().ephemeralId - ) - ); - } - private static class ArchiveEntry { private final RepositoryStatsSnapshot repositoryStatsSnapshot; private final long createdAtMillis; diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryInfo.java b/server/src/main/java/org/opensearch/repositories/RepositoryInfo.java index 8aa86fc46d591..387a685bd6526 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryInfo.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryInfo.java @@ -32,7 +32,6 @@ package org.opensearch.repositories; -import org.opensearch.common.Nullable; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -51,64 +50,27 @@ * @opensearch.internal */ public final class RepositoryInfo implements Writeable, ToXContentFragment { - public final String ephemeralId; public final String name; public final String type; public final Map location; - public final long startedAt; - @Nullable - public final Long stoppedAt; - public RepositoryInfo(String ephemeralId, String name, String type, Map location, long startedAt) { - this(ephemeralId, name, type, location, startedAt, null); - } - - public RepositoryInfo( - String ephemeralId, - String name, - String type, - Map location, - long startedAt, - @Nullable Long stoppedAt - ) { - this.ephemeralId = ephemeralId; + public RepositoryInfo(String name, String type, Map location) { this.name = name; this.type = type; this.location = location; - this.startedAt = startedAt; - if (stoppedAt != null && startedAt > stoppedAt) { - throw new IllegalArgumentException("createdAt must be before or equal to stoppedAt"); - } - this.stoppedAt = stoppedAt; } public RepositoryInfo(StreamInput in) throws IOException { - this.ephemeralId = in.readString(); this.name = in.readString(); this.type = in.readString(); this.location = in.readMap(StreamInput::readString, StreamInput::readString); - this.startedAt = in.readLong(); - this.stoppedAt = in.readOptionalLong(); - } - - public RepositoryInfo stopped(long stoppedAt) { - assert isStopped() == false : "The repository is already stopped"; - - return new RepositoryInfo(ephemeralId, name, type, location, startedAt, stoppedAt); - } - - public boolean isStopped() { - return stoppedAt != null; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(ephemeralId); out.writeString(name); out.writeString(type); out.writeMap(location, StreamOutput::writeString, StreamOutput::writeString); - out.writeLong(startedAt); - out.writeOptionalLong(stoppedAt); } @Override @@ -116,11 +78,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("repository_name", name); builder.field("repository_type", type); builder.field("repository_location", location); - builder.field("repository_ephemeral_id", ephemeralId); - builder.field("repository_started_at", startedAt); - if (stoppedAt != null) { - builder.field("repository_stopped_at", stoppedAt); - } return builder; } @@ -129,17 +86,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RepositoryInfo that = (RepositoryInfo) o; - return ephemeralId.equals(that.ephemeralId) - && name.equals(that.name) - && type.equals(that.type) - && location.equals(that.location) - && startedAt == that.startedAt - && Objects.equals(stoppedAt, that.stoppedAt); + return name.equals(that.name) && type.equals(that.type) && location.equals(that.location); } @Override public int hashCode() { - return Objects.hash(ephemeralId, name, type, location, startedAt, stoppedAt); + return Objects.hash(name, type, location); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryStats.java b/server/src/main/java/org/opensearch/repositories/RepositoryStats.java index efd5d6f8560b6..ab97c5eaa1f7a 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryStats.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryStats.java @@ -32,9 +32,13 @@ package org.opensearch.repositories; +import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; import java.util.Collections; @@ -47,32 +51,63 @@ * * @opensearch.internal */ -public class RepositoryStats implements Writeable { +public class RepositoryStats implements Writeable, ToXContentFragment { public static final RepositoryStats EMPTY_STATS = new RepositoryStats(Collections.emptyMap()); + @Nullable public final Map requestCounts; + @Nullable + public final Map> extendedStats; + public final boolean detailed; public RepositoryStats(Map requestCounts) { this.requestCounts = Collections.unmodifiableMap(requestCounts); + this.extendedStats = Collections.emptyMap(); + this.detailed = false; + } + + public RepositoryStats(Map> extendedStats, boolean detailed) { + this.requestCounts = Collections.emptyMap(); + this.extendedStats = Collections.unmodifiableMap(extendedStats); + this.detailed = detailed; } public RepositoryStats(StreamInput in) throws IOException { this.requestCounts = in.readMap(StreamInput::readString, StreamInput::readLong); + this.extendedStats = in.readMap( + e -> e.readEnum(BlobStore.Metric.class), + i -> i.readMap(StreamInput::readString, StreamInput::readLong) + ); + this.detailed = in.readBoolean(); } public RepositoryStats merge(RepositoryStats otherStats) { - final Map result = new HashMap<>(); - result.putAll(requestCounts); - for (Map.Entry entry : otherStats.requestCounts.entrySet()) { - result.merge(entry.getKey(), entry.getValue(), Math::addExact); + assert this.detailed == otherStats.detailed; + if (detailed) { + final Map> result = new HashMap<>(); + result.putAll(extendedStats); + for (Map.Entry> entry : otherStats.extendedStats.entrySet()) { + for (Map.Entry nested : entry.getValue().entrySet()) { + result.get(entry.getKey()).merge(nested.getKey(), nested.getValue(), Math::addExact); + } + } + return new RepositoryStats(result, true); + } else { + final Map result = new HashMap<>(); + result.putAll(requestCounts); + for (Map.Entry entry : otherStats.requestCounts.entrySet()) { + result.merge(entry.getKey(), entry.getValue(), Math::addExact); + } + return new RepositoryStats(result); } - return new RepositoryStats(result); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(requestCounts, StreamOutput::writeString, StreamOutput::writeLong); + out.writeMap(extendedStats, StreamOutput::writeEnum, (o, v) -> o.writeMap(v, StreamOutput::writeString, StreamOutput::writeLong)); + out.writeBoolean(detailed); } @Override @@ -80,16 +115,32 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RepositoryStats that = (RepositoryStats) o; - return requestCounts.equals(that.requestCounts); + return requestCounts.equals(that.requestCounts) && extendedStats.equals(that.extendedStats) && detailed == that.detailed; } @Override public int hashCode() { - return Objects.hash(requestCounts); + return Objects.hash(requestCounts, detailed, extendedStats); } @Override public String toString() { - return "RepositoryStats{" + "requestCounts=" + requestCounts + '}'; + return "RepositoryStats{" + "requestCounts=" + requestCounts + "extendedStats=" + extendedStats + "detailed =" + detailed + "}"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (detailed == false) { + builder.field("request_counts", requestCounts); + } else { + extendedStats.forEach((k, v) -> { + try { + builder.field(k.metricName(), v); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + return builder; } } diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryStatsSnapshot.java b/server/src/main/java/org/opensearch/repositories/RepositoryStatsSnapshot.java index 2b061cd2c2cc9..0a727980fad0d 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryStatsSnapshot.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryStatsSnapshot.java @@ -53,21 +53,17 @@ public final class RepositoryStatsSnapshot implements Writeable, ToXContentObjec private final RepositoryInfo repositoryInfo; private final RepositoryStats repositoryStats; private final long clusterVersion; - private final boolean archived; - public RepositoryStatsSnapshot(RepositoryInfo repositoryInfo, RepositoryStats repositoryStats, long clusterVersion, boolean archived) { - assert archived != (clusterVersion == UNKNOWN_CLUSTER_VERSION); + public RepositoryStatsSnapshot(RepositoryInfo repositoryInfo, RepositoryStats repositoryStats, long clusterVersion) { this.repositoryInfo = repositoryInfo; this.repositoryStats = repositoryStats; this.clusterVersion = clusterVersion; - this.archived = archived; } public RepositoryStatsSnapshot(StreamInput in) throws IOException { this.repositoryInfo = new RepositoryInfo(in); this.repositoryStats = new RepositoryStats(in); this.clusterVersion = in.readLong(); - this.archived = in.readBoolean(); } public RepositoryInfo getRepositoryInfo() { @@ -78,10 +74,6 @@ public RepositoryStats getRepositoryStats() { return repositoryStats; } - public boolean isArchived() { - return archived; - } - public long getClusterVersion() { return clusterVersion; } @@ -91,18 +83,13 @@ public void writeTo(StreamOutput out) throws IOException { repositoryInfo.writeTo(out); repositoryStats.writeTo(out); out.writeLong(clusterVersion); - out.writeBoolean(archived); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); repositoryInfo.toXContent(builder, params); - builder.field("request_counts", repositoryStats.requestCounts); - builder.field("archived", archived); - if (archived) { - builder.field("cluster_version", clusterVersion); - } + repositoryStats.toXContent(builder, params); builder.endObject(); return builder; } @@ -114,13 +101,12 @@ public boolean equals(Object o) { RepositoryStatsSnapshot that = (RepositoryStatsSnapshot) o; return repositoryInfo.equals(that.repositoryInfo) && repositoryStats.equals(that.repositoryStats) - && clusterVersion == that.clusterVersion - && archived == that.archived; + && clusterVersion == that.clusterVersion; } @Override public int hashCode() { - return Objects.hash(repositoryInfo, repositoryStats, clusterVersion, archived); + return Objects.hash(repositoryInfo, repositoryStats, clusterVersion); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c48efa9d3d2ee..8a2260e1f6d90 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -851,6 +851,8 @@ public RepositoryStats stats() { final BlobStore store = blobStore.get(); if (store == null) { return RepositoryStats.EMPTY_STATS; + } else if (store.extendedStats() != null && store.extendedStats().isEmpty() == false) { + return new RepositoryStats(store.extendedStats(), true); } return new RepositoryStats(store.stats()); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java index d4921f4e6d2e7..0651ff586d412 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -34,12 +34,10 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.UUIDs; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.repositories.RepositoryInfo; import org.opensearch.repositories.RepositoryStatsSnapshot; -import org.opensearch.threadpool.ThreadPool; import java.util.Map; @@ -59,14 +57,7 @@ public MeteredBlobStoreRepository( Map location ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings); - ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); - this.repositoryInfo = new RepositoryInfo( - UUIDs.randomBase64UUID(), - metadata.name(), - metadata.type(), - location, - threadPool.absoluteTimeInMillis() - ); + this.repositoryInfo = new RepositoryInfo(metadata.name(), metadata.type(), location); } @Override @@ -78,11 +69,6 @@ public void reload(RepositoryMetadata repositoryMetadata) { } public RepositoryStatsSnapshot statsSnapshot() { - return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false); - } - - public RepositoryStatsSnapshot statsSnapshotForArchival(long clusterVersion) { - RepositoryInfo stoppedRepoInfo = repositoryInfo.stopped(threadPool.absoluteTimeInMillis()); - return new RepositoryStatsSnapshot(stoppedRepoInfo, stats(), clusterVersion, true); + return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 4b865383ee007..3491f18da9550 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -852,6 +852,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { weightedRoutingStats, null, null, + null, null ); } diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 64949cf861f70..6f03e87bf5824 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -191,6 +191,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -218,6 +219,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -245,6 +247,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -303,6 +306,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -330,6 +334,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -357,6 +362,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index c4599a6e7a00e..43ebb86fd5342 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -219,20 +219,17 @@ public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { assertThat(repositoriesService.repositoriesStats().size(), equalTo(1)); repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", emptyState(), clusterStateWithRepoTypeA)); - assertThat(repositoriesService.repositoriesStats().size(), equalTo(1)); + assertThat(repositoriesService.repositoriesStats().size(), equalTo(0)); ClusterState clusterStateWithRepoTypeB = createClusterStateWithRepo(repoName, MeteredRepositoryTypeB.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeB, emptyState())); List repositoriesStats = repositoriesService.repositoriesStats(); - assertThat(repositoriesStats.size(), equalTo(2)); + assertThat(repositoriesStats.size(), equalTo(1)); RepositoryStatsSnapshot repositoryStatsTypeA = repositoriesStats.get(0); - assertThat(repositoryStatsTypeA.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeA.TYPE)); - assertThat(repositoryStatsTypeA.getRepositoryStats(), equalTo(MeteredRepositoryTypeA.STATS)); + assertThat(repositoryStatsTypeA.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeB.TYPE)); + assertThat(repositoryStatsTypeA.getRepositoryStats(), equalTo(MeteredRepositoryTypeB.STATS)); - RepositoryStatsSnapshot repositoryStatsTypeB = repositoriesStats.get(1); - assertThat(repositoryStatsTypeB.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeB.TYPE)); - assertThat(repositoryStatsTypeB.getRepositoryStats(), equalTo(MeteredRepositoryTypeB.STATS)); } public void testWithSameKeyProviderNames() { @@ -258,7 +255,7 @@ public void testWithSameKeyProviderNames() { kpTypeA ); repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeB, emptyState())); - assertThat(repositoriesService.repositoriesStats().size(), equalTo(2)); + assertThat(repositoriesService.repositoriesStats().size(), equalTo(1)); MeteredRepositoryTypeB repositoryB = (MeteredRepositoryTypeB) repositoriesService.repository("repoName"); assertNotNull(repositoryB); assertNotNull(repository.cryptoHandler); diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesStatsArchiveTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesStatsArchiveTests.java index cf0b06a3f7d16..da0cbcb1d4b17 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesStatsArchiveTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesStatsArchiveTests.java @@ -32,7 +32,6 @@ package org.opensearch.repositories; -import org.opensearch.common.UUIDs; import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; @@ -122,14 +121,11 @@ private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repository private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats, long clusterVersion) { RepositoryInfo repositoryInfo = new RepositoryInfo( - UUIDs.randomBase64UUID(), randomAlphaOfLength(10), randomAlphaOfLength(10), - Map.of("bucket", randomAlphaOfLength(10)), - System.currentTimeMillis(), - null + Map.of("bucket", randomAlphaOfLength(10)) ); - return new RepositoryStatsSnapshot(repositoryInfo, repositoryStats, clusterVersion, true); + return new RepositoryStatsSnapshot(repositoryInfo, repositoryStats, clusterVersion); } } diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index a520b6278ea47..60a54110fd0b4 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -121,7 +121,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getWeightedRoutingStats(), nodeStats.getFileCacheStats(), nodeStats.getTaskCancellationStats(), - nodeStats.getSearchPipelineStats() + nodeStats.getSearchPipelineStats(), + nodeStats.getRepositoriesStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchMockAPIBasedRepositoryIntegTestCase.java index dff9b997d87db..faa9d52b105b2 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchMockAPIBasedRepositoryIntegTestCase.java @@ -238,7 +238,7 @@ public void testRequestStats() throws Exception { assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts); } - private Map getMockRequestCounts() { + protected Map getMockRequestCounts() { for (HttpHandler h : handlers.values()) { while (h instanceof DelegatingHttpHandler) { if (h instanceof HttpStatsCollectorHandler) { diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index a290f3a1751a0..898e125b94954 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2721,6 +2721,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(