Skip to content

Commit

Permalink
Repository stats for remote store (opensearch-project#10567)
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
Bukhtawar authored and shiv0408 committed Apr 25, 2024
1 parent e0b34cb commit 2e252e7
Show file tree
Hide file tree
Showing 30 changed files with 479 additions and 204 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;

import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
Expand All @@ -51,10 +53,15 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.RepositoryStats;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -63,12 +70,18 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.StreamSupport;

import fixture.s3.S3HttpHandler;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
Expand Down Expand Up @@ -152,6 +165,66 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

@Override
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

final String snapshot = "snapshot";
assertSuccessfulSnapshot(
client().admin().cluster().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index)
);

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());

final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(),
false
).map(repositoriesService -> {
try {
return repositoriesService.repository(repository);
} catch (RepositoryMissingException e) {
return null;
}
}).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get();

Map<BlobStore.Metric, Map<String, Long>> extendedStats = repositoryStats.extendedStats;
Map<String, Long> aggregatedStats = new HashMap<>();
extendedStats.forEach((k, v) -> {
if (k == BlobStore.Metric.RETRY_COUNT || k == BlobStore.Metric.REQUEST_SUCCESS || k == BlobStore.Metric.REQUEST_FAILURE) {
for (Map.Entry<String, Long> entry : v.entrySet()) {
aggregatedStats.merge(entry.getKey(), entry.getValue(), Math::addExact);
}
}

});
final Map<String, Long> mockCalls = getMockRequestCounts();

String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls", aggregatedStats, mockCalls);

assertEquals(assertionErrorMsg, mockCalls, aggregatedStats);
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down Expand Up @@ -263,6 +336,8 @@ public void maybeTrack(final String request, Headers requestHeaders) {
trackRequest("PutMultipartObject");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PutObject");
} else if (Regex.simpleMatch("POST /*?delete*", request)) {
trackRequest("DeleteObjects");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext);
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
Expand Down Expand Up @@ -384,7 +384,7 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
assert outstanding.isEmpty();
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand All @@ -393,6 +393,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand Down Expand Up @@ -180,6 +182,16 @@ public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
}

@Override
public Map<Metric, Map<String, Long>> extendedStats() {
if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) {
return Collections.emptyMap();
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
return extendedStats;
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Loading

0 comments on commit 2e252e7

Please sign in to comment.