-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add timestamp pinning service and scheduler to update in-memory state
Signed-off-by: Sachin Kale <kalsac@amazon.com>
- Loading branch information
Sachin Kale
committed
Aug 9, 2024
1 parent
978d14e
commit 3fe94b0
Showing
7 changed files
with
594 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.AbstractDiffable; | ||
import org.opensearch.cluster.block.ClusterBlocks; | ||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemotePinnedTimestamps extends AbstractRemoteWritableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> { | ||
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); | ||
|
||
public static class PinnedTimestamps extends AbstractDiffable<ClusterBlocks> { | ||
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap; | ||
|
||
public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) { | ||
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection); | ||
} | ||
|
||
public static PinnedTimestamps readFrom(StreamInput in) throws IOException { | ||
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList)); | ||
} | ||
|
||
public void pin(Long timestamp, String pinningEntity) { | ||
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity); | ||
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity); | ||
} | ||
|
||
public void unpin(Long timestamp, String pinningEntity) { | ||
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); | ||
pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> { | ||
v.remove(pinningEntity); | ||
return v.isEmpty() ? null : v; | ||
}); | ||
} | ||
|
||
public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() { | ||
return new HashMap<>(pinnedTimestampPinningEntityMap); | ||
} | ||
} | ||
|
||
public static final String PINNED_TIMESTAMPS = "pinned_timestamps"; | ||
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>( | ||
PINNED_TIMESTAMPS, | ||
PinnedTimestamps::readFrom | ||
); | ||
|
||
private PinnedTimestamps pinnedTimestamps; | ||
|
||
public RemotePinnedTimestamps(String clusterUUID, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS); | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return PINNED_TIMESTAMPS; | ||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis())); | ||
} | ||
|
||
@Override | ||
public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); | ||
} | ||
|
||
@Override | ||
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException { | ||
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); | ||
} | ||
|
||
public void setBlobFileName(String blobFileName) { | ||
this.blobFileName = blobFileName; | ||
} | ||
|
||
public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) { | ||
this.pinnedTimestamps = pinnedTimestamps; | ||
} | ||
|
||
public PinnedTimestamps getPinnedTimestamps() { | ||
return pinnedTimestamps; | ||
} | ||
} |
41 changes: 41 additions & 0 deletions
41
...c/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.index.translog.transfer.BlobStoreTransferService; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
/** | ||
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps} | ||
*/ | ||
public class RemoteStorePinnedTimestampsBlobStore extends RemoteClusterStateBlobStore< | ||
RemotePinnedTimestamps.PinnedTimestamps, | ||
RemotePinnedTimestamps> { | ||
|
||
private final BlobStoreRepository blobStoreRepository; | ||
|
||
public RemoteStorePinnedTimestampsBlobStore( | ||
BlobStoreTransferService blobStoreTransferService, | ||
BlobStoreRepository blobStoreRepository, | ||
String clusterName, | ||
ThreadPool threadPool, | ||
String executor | ||
) { | ||
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor); | ||
this.blobStoreRepository = blobStoreRepository; | ||
} | ||
|
||
@Override | ||
public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) { | ||
return blobStoreRepository.basePath(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.