Skip to content

Commit

Permalink
Hashed prefix for index metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Aug 28, 2024
1 parent 46a269e commit 0d7845f
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public String[] getBlobPathTokens() {

public abstract String generateBlobFileName();

public BlobPath getPrefixedPath(BlobPath blobPath) {
return blobPath;
}

public String clusterUUID() {
return clusterUUID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@ public BlobPath getBlobPathPrefix(String clusterUUID) {
return blobStoreRepository.basePath().add(encodeString(getClusterName())).add(pathToken).add(clusterUUID);
}



public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID());
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
return obj.getPrefixedPath(blobPath);
}

public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteIndexMetadataManager;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -730,6 +731,8 @@ public void apply(Settings value, Settings current, Settings previous) {
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,9 @@ UploadedMetadataResults writeMetadataInParallel(
indexMetadata,
clusterState.metadata().clusterUUID(),
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
blobStoreRepository.getNamedXContentRegistry(),
remoteIndexMetadataManager.getPathTypeSetting(),
remoteIndexMetadataManager.getPathHashAlgoSetting()
),
listener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.model.RemoteIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -44,11 +45,38 @@ public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityMana
Setting.Property.Deprecated
);

/**
* This setting is used to set the remote index metadata blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_INDEX_METADATA_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index_metadata.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote index metadata blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_INDEX_METADATA_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.index_metadata.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;

private volatile TimeValue indexMetadataUploadTimeout;

private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

public RemoteIndexMetadataManager(
ClusterSettings clusterSettings,
String clusterName,
Expand All @@ -70,7 +98,11 @@ public RemoteIndexMetadataManager(
this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry();
this.compressor = blobStoreRepository.getCompressor();
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.pathType = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
}

/**
Expand Down Expand Up @@ -127,4 +159,20 @@ protected ActionListener<Object> getWrappedReadListener(
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
}

protected RemoteStoreEnums.PathType getPathTypeSetting() {
return pathType;
}

protected RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() {
return pathHashAlgo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.gateway.remote.model;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
Expand All @@ -17,6 +18,9 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

Expand All @@ -41,6 +45,21 @@ public class RemoteIndexMetadata extends AbstractClusterMetadataWriteableBlobEnt
public static final String INDEX = "index";

private IndexMetadata indexMetadata;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

public RemoteIndexMetadata(
final IndexMetadata indexMetadata,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry,
final RemoteStoreEnums.PathType pathType,
final RemoteStoreEnums.PathHashAlgorithm pathHashAlgo
) {
this(indexMetadata, clusterUUID, compressor, namedXContentRegistry);
this.pathType = pathType;
this.pathHashAlgo = pathHashAlgo;
}

public RemoteIndexMetadata(
final IndexMetadata indexMetadata,
Expand Down Expand Up @@ -86,6 +105,18 @@ public String generateBlobFileName() {
return blobFileName;
}

@Override
public BlobPath getPrefixedPath(BlobPath blobPath) {
if (pathType == null) {
return blobPath;
}
assert pathHashAlgo != null;
return pathType.path(
RemoteStorePathStrategy.PathInput.builder().basePath(blobPath).indexUUID(indexMetadata.getIndexUUID()).build(),
pathHashAlgo
);
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.gateway.remote.model.RemoteIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -51,18 +52,21 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64;
import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX;

public class RemoteIndexMetadataManagerTests extends OpenSearchTestCase {

private RemoteIndexMetadataManager remoteIndexMetadataManager;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private Compressor compressor;
private ClusterSettings clusterSettings;
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());

@Before
public void setup() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
blobStoreRepository = mock(BlobStoreRepository.class);
BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down Expand Up @@ -182,6 +186,31 @@ public void testGetAsyncReadRunnable_IOFailure() throws Exception {
assertTrue(listener.getFailure() instanceof RemoteStateTransferException);
}

public void testRemoteRoutingTablePathTypeSetting() {
// Assert the default is HASHED_PREFIX
assertEquals(HASHED_PREFIX.toString(), remoteIndexMetadataManager.getPathTypeSetting().toString());

Settings newSettings = Settings.builder()
.put("cluster.remote_store.index_metadata.path_type", RemoteStoreEnums.PathType.FIXED.toString())
.build();
clusterSettings.applySettings(newSettings);
assertEquals(RemoteStoreEnums.PathType.FIXED.toString(), remoteIndexMetadataManager.getPathTypeSetting().toString());
}

public void testRemoteRoutingTableHashAlgoSetting() {
// Assert the default is FNV_1A_BASE64
assertEquals(FNV_1A_BASE64.toString(), remoteIndexMetadataManager.getPathHashAlgoSetting().toString());

Settings newSettings = Settings.builder()
.put("cluster.remote_store.index_metadata.path_hash_algo", RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString())
.build();
clusterSettings.applySettings(newSettings);
assertEquals(
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(),
remoteIndexMetadataManager.getPathHashAlgoSetting().toString()
);
}

private IndexMetadata getIndexMetadata(String name, @Nullable Boolean writeIndex, String... aliases) {
IndexMetadata.Builder builder = IndexMetadata.builder(name)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.indices.IndicesModule;
Expand Down Expand Up @@ -171,6 +174,18 @@ public void testSerDe() throws IOException {
}
}

public void testPrefixedPath() {
IndexMetadata indexMetadata = getIndexMetadata();
RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry,
PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A_COMPOSITE_1
);
String testPath = "test-path";
String expectedPath = "410100110100101/test-path/index-uuid/";
BlobPath prefixedPath = remoteObjectForUpload.getPrefixedPath(BlobPath.cleanPath().add(testPath));
assertThat(prefixedPath.buildAsString(), is(expectedPath));

}

private IndexMetadata getIndexMetadata() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
Expand Down

0 comments on commit 0d7845f

Please sign in to comment.