Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor remote writeable entity and store to make it more reusable #15210

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
Expand Down Expand Up @@ -262,12 +263,13 @@ protected void doStart() {
clusterSettings
);

this.remoteRoutingTableDiffStore = new RemoteClusterStateBlobStore<>(
this.remoteRoutingTableDiffStore = new RemoteWriteableEntityBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractClusterMetadataWriteableBlobEntity<T> extends RemoteWriteableBlobEntity<T> {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

protected final NamedXContentRegistry namedXContentRegistry;

public AbstractClusterMetadataWriteableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor);
this.namedXContentRegistry = namedXContentRegistry;
}

public AbstractClusterMetadataWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
super(clusterUUID, compressor);
this.namedXContentRegistry = null;
}

public abstract UploadedMetadata getUploadedMetadata();
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita
* @return the remote writable entity store for the given entity
* @throws IllegalArgumentException if the entity type is unknown
*/
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
protected RemoteWritableEntityStore getStore(AbstractClusterMetadataWriteableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
Expand All @@ -49,7 +49,7 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en
*/
protected abstract ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

Expand All @@ -64,21 +64,21 @@ protected abstract ActionListener<Void> getWrappedWriteListener(
*/
protected abstract ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
);

@Override
public void writeAsync(
String component,
AbstractRemoteWritableBlobEntity entity,
AbstractClusterMetadataWriteableBlobEntity entity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
}

@Override
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface RemoteWritableEntityManager {
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the read operation fails.
*/
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);
void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener);

/**
* Performs an asynchronous write operation for the specified component and entity.
Expand All @@ -43,5 +43,5 @@ public interface RemoteWritableEntityManager {
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the write operation fails.
*/
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
void writeAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<UploadedMetadata> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,25 @@

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
* The abstract class which represents a {@link RemoteWriteableEntity} that can be written to a store
* @param <T> the entity to be written
*/
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {
public abstract class RemoteWriteableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;

protected String blobName;
private final String clusterUUID;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private String[] pathTokens;

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
public RemoteWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
this.clusterUUID = clusterUUID;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) {
this(clusterUUID, compressor, null);
}

public abstract BlobPathParameters getBlobPathParameters();
Expand Down Expand Up @@ -80,16 +67,10 @@ public String clusterUUID() {
return clusterUUID;
}

public abstract UploadedMetadata getUploadedMetadata();

public void setFullBlobName(BlobPath blobPath) {
this.blobName = blobPath.buildAsString() + blobFileName;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}

protected Compressor getCompressor() {
return compressor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,48 @@
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;
package org.opensearch.common.remote;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.ExecutorService;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN;

/**
* Abstract class for a blob type storage
*
* @param <T> The entity which can be uploaded to / downloaded from blob store
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
*/
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {
public class RemoteWriteableEntityBlobStore<T, U extends RemoteWriteableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;
private final String pathToken;

public RemoteClusterStateBlobStore(
public RemoteWriteableEntityBlobStore(
final BlobStoreTransferService blobStoreTransferService,
final BlobStoreRepository blobStoreRepository,
final String clusterName,
final ThreadPool threadPool,
final String executor
final String executor,
final String pathToken
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(executor);
this.pathToken = pathToken;
}

@Override
Expand Down Expand Up @@ -95,21 +94,18 @@ public String getClusterName() {
}

public BlobPath getBlobPathPrefix(String clusterUUID) {
return blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(getClusterName()))
.add(CLUSTER_STATE_PATH_TOKEN)
.add(clusterUUID);
return blobStoreRepository.basePath().add(encodeString(getClusterName())).add(pathToken).add(clusterUUID);
}

public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID());
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
String[] pathTokens = obj.getBlobPathTokens();
BlobPath blobPath = new BlobPath();
if (pathTokens == null || pathTokens.length < 1) {
Expand All @@ -122,4 +118,8 @@ public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T>
return blobPath;
}

private static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteReadResult;
Expand All @@ -28,7 +28,7 @@
import java.util.Map;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteWriteableEntityBlobStore}
*
* @opensearch.internal
*/
Expand All @@ -47,40 +47,43 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE
) {
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
this.remoteWritableEntityStores.put(
RemoteClusterBlocks.CLUSTER_BLOCKS,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
this.remoteWritableEntityStores.put(
RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM,
new RemoteClusterStateBlobStore<>(
new RemoteWriteableEntityBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
threadpool,
ThreadPool.Names.REMOTE_STATE_READ
ThreadPool.Names.REMOTE_STATE_READ,
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
)
);
}

@Override
protected ActionListener<Void> getWrappedWriteListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
Expand All @@ -92,7 +95,7 @@ protected ActionListener<Void> getWrappedWriteListener(
@Override
protected ActionListener<Object> getWrappedReadListener(
String component,
AbstractRemoteWritableBlobEntity remoteEntity,
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
Expand Down
Loading
Loading