Skip to content

Commit

Permalink
Address further comments on PR
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jun 11, 2024
1 parent a11a5af commit 3ed4d14
Show file tree
Hide file tree
Showing 19 changed files with 159 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public AbstractRemoteWritableBlobEntity(

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getType();

public String getFullBlobName() {
return blobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ private UploadedMetadataResults writeMetadataInParallel(
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
),
remoteGlobalMetadataManager.getPersistentSettingsBlobStore(),
listener
)
);
Expand All @@ -424,7 +423,6 @@ private UploadedMetadataResults writeMetadataInParallel(
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
),
remoteGlobalMetadataManager.getCoordinationMetadataBlobStore(),
listener
)
);
Expand All @@ -440,7 +438,6 @@ private UploadedMetadataResults writeMetadataInParallel(
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
),
remoteGlobalMetadataManager.getTemplatesMetadataBlobStore(),
listener
)
);
Expand All @@ -458,7 +455,6 @@ private UploadedMetadataResults writeMetadataInParallel(
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
),
remoteGlobalMetadataManager.getCustomMetadataBlobStore(),
listener
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ public class RemoteGlobalMetadataManager {
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;

private volatile TimeValue globalMetadataUploadTimeout;
private final RemoteWritableEntityStore<Metadata, RemoteGlobalMetadata> globalMetadataBlobStore;
private final RemoteClusterStateBlobStore<CoordinationMetadata, RemoteCoordinationMetadata> coordinationMetadataBlobStore;
private final RemoteClusterStateBlobStore<Settings, RemotePersistentSettingsMetadata> persistentSettingsBlobStore;
private final RemoteClusterStateBlobStore<TemplatesMetadata, RemoteTemplatesMetadata> templatesMetadataBlobStore;
private final RemoteClusterStateBlobStore<Custom, RemoteCustomMetadata> customMetadataBlobStore;
private Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;

Expand All @@ -80,40 +76,56 @@ public class RemoteGlobalMetadataManager {
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
this.compressor = blobStoreRepository.getCompressor();
this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry();
globalMetadataBlobStore = new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
this.remoteWritableEntityStores = new HashMap<>();
this.remoteWritableEntityStores.put(
RemoteGlobalMetadata.GLOBAL_METADATA,
new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
)
);
coordinationMetadataBlobStore = new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
this.remoteWritableEntityStores.put(
RemoteCoordinationMetadata.COORDINATION_METADATA,
new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
)
);
persistentSettingsBlobStore = new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
this.remoteWritableEntityStores.put(
RemotePersistentSettingsMetadata.SETTING_METADATA,
new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
)
);
templatesMetadataBlobStore = new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
this.remoteWritableEntityStores.put(
RemoteTemplatesMetadata.TEMPLATES_METADATA,
new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
)
);
customMetadataBlobStore = new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
this.remoteWritableEntityStores.put(
RemoteCustomMetadata.CUSTOM_METADATA,
new RemoteClusterStateBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
)
);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
}
Expand All @@ -123,10 +135,20 @@ public class RemoteGlobalMetadataManager {
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
AbstractRemoteWritableBlobEntity writeEntity,
RemoteWritableEntityStore remoteStore,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> remoteStore.writeAsync(writeEntity, getActionListener(writeEntity, latchedActionListener));
return (() -> getRemoteWriteableEntityStoreForObject(writeEntity).writeAsync(
writeEntity,
getActionListener(writeEntity, latchedActionListener)
));
}

private RemoteWritableEntityStore getRemoteWriteableEntityStoreForObject(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

private ActionListener<Void> getActionListener(
Expand All @@ -150,7 +172,7 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe
compressor,
namedXContentRegistry
);
return globalMetadataBlobStore.read(remoteGlobalMetadata);
return (Metadata) getRemoteWriteableEntityStoreForObject(remoteGlobalMetadata).read(remoteGlobalMetadata);
} else if (clusterMetadataManifest.hasMetadataAttributesFiles()) {
// from CODEC_V2, we have started uploading all the metadata in granular files instead of a single entity
Metadata.Builder builder = new Metadata.Builder();
Expand All @@ -161,7 +183,11 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe
compressor,
namedXContentRegistry
);
builder.coordinationMetadata(coordinationMetadataBlobStore.read(remoteCoordinationMetadata));
builder.coordinationMetadata(
(CoordinationMetadata) getRemoteWriteableEntityStoreForObject(remoteCoordinationMetadata).read(
remoteCoordinationMetadata
)
);
}
if (clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename() != null) {
RemoteTemplatesMetadata remoteTemplatesMetadata = new RemoteTemplatesMetadata(
Expand All @@ -170,7 +196,9 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe
compressor,
namedXContentRegistry
);
builder.templates(templatesMetadataBlobStore.read(remoteTemplatesMetadata));
builder.templates(
(TemplatesMetadata) getRemoteWriteableEntityStoreForObject(remoteTemplatesMetadata).read(remoteTemplatesMetadata)
);
}
if (clusterMetadataManifest.getSettingsMetadata().getUploadedFilename() != null) {
RemotePersistentSettingsMetadata remotePersistentSettingsMetadata = new RemotePersistentSettingsMetadata(
Expand All @@ -179,17 +207,26 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe
compressor,
namedXContentRegistry
);
builder.persistentSettings(persistentSettingsBlobStore.read(remotePersistentSettingsMetadata));
builder.persistentSettings(
(Settings) getRemoteWriteableEntityStoreForObject(remotePersistentSettingsMetadata).read(
remotePersistentSettingsMetadata
)
);
}
builder.clusterUUID(clusterMetadataManifest.getClusterUUID());
builder.clusterUUIDCommitted(clusterMetadataManifest.isClusterUUIDCommitted());
clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> {
try {
RemoteCustomMetadata remoteCustomMetadata = new RemoteCustomMetadata(
value.getUploadedFilename(),
key,
clusterUUID,
compressor,
namedXContentRegistry
);
builder.putCustom(
key,
customMetadataBlobStore.read(
new RemoteCustomMetadata(value.getUploadedFilename(), key, clusterUUID, compressor, namedXContentRegistry)
)
(Custom) getRemoteWriteableEntityStoreForObject(remoteCustomMetadata).read(remoteCustomMetadata)
);
} catch (IOException e) {
throw new IllegalStateException(
Expand Down Expand Up @@ -248,20 +285,4 @@ private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTim
public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

public RemoteClusterStateBlobStore<CoordinationMetadata, RemoteCoordinationMetadata> getCoordinationMetadataBlobStore() {
return coordinationMetadataBlobStore;
}

public RemoteClusterStateBlobStore<Settings, RemotePersistentSettingsMetadata> getPersistentSettingsBlobStore() {
return persistentSettingsBlobStore;
}

public RemoteClusterStateBlobStore<TemplatesMetadata, RemoteTemplatesMetadata> getTemplatesMetadataBlobStore() {
return templatesMetadataBlobStore;
}

public RemoteClusterStateBlobStore<Custom, RemoteCustomMetadata> getCustomMetadataBlobStore() {
return customMetadataBlobStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), CLUSTER_BLOCKS);
}

@Override
public String getType() {
return CLUSTER_BLOCKS;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/transient/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(MANIFEST), MANIFEST);
}

@Override
public String getType() {
return MANIFEST;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__
Expand Down Expand Up @@ -150,4 +155,5 @@ private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManif
}
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), prefix);
}

@Override
public String getType() {
return CLUSTER_STATE_CUSTOM;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/ephemeral/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA);
}

@Override
public String getType() {
return COORDINATION_METADATA;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(GLOBAL_METADATA_PATH_TOKEN), prefix);
}

@Override
public String getType() {
return CUSTOM_METADATA;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), DISCOVERY_NODES);
}

@Override
public String getType() {
return DISCOVERY_NODES;
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/ephemeral/<componentPrefix>__<inverted_state_version>__<inverted__timestamp>__<codec_version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Wrapper class for uploading/downloading global metadata ({@link Metadata}) to/from remote blob store
*/
public class RemoteGlobalMetadata extends AbstractRemoteWritableBlobEntity<Metadata> {
public static final String GLOBAL_METADATA = "global_metadata";

public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"metadata",
Expand All @@ -48,6 +49,11 @@ public BlobPathParameters getBlobPathParameters() {
throw new UnsupportedOperationException();
}

@Override
public String getType() {
return GLOBAL_METADATA;
}

@Override
public String generateBlobFileName() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(GLOBAL_METADATA_PATH_TOKEN), HASHES_OF_CONSISTENT_SETTINGS);
}

@Override
public String getType() {
return HASHES_OF_CONSISTENT_SETTINGS;
}

@Override
public String generateBlobFileName() {
String blobFileName = String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class RemoteIndexMetadata extends AbstractRemoteWritableBlobEntity<IndexM
METADATA_NAME_PLAIN_FORMAT,
IndexMetadata::fromXContent
);
public static final String INDEX_PATH_TOKEN = "index";
public static final String INDEX = "index";

private IndexMetadata indexMetadata;

Expand All @@ -64,7 +64,12 @@ public RemoteIndexMetadata(

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(INDEX_PATH_TOKEN, indexMetadata.getIndexUUID()), "metadata");
return new BlobPathParameters(List.of(INDEX, indexMetadata.getIndexUUID()), "metadata");
}

@Override
public String getType() {
return INDEX;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("global-metadata"), SETTING_METADATA);
}

@Override
public String getType() {
return SETTING_METADATA;
}

@Override
public String generateBlobFileName() {
String blobFileName = String.join(
Expand Down
Loading

0 comments on commit 3ed4d14

Please sign in to comment.