-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
…#15180) --------- Signed-off-by: Sachin Kale <kalsac@amazon.com> Co-authored-by: Sachin Kale <kalsac@amazon.com> (cherry picked from commit 1717b55) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import org.opensearch.common.collect.Tuple; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.Set; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { | ||
static final String INDEX_NAME = "remote-store-test-idx-1"; | ||
|
||
ActionListener<Void> noOpActionListener = new ActionListener<>() { | ||
@Override | ||
public void onResponse(Void unused) {} | ||
|
||
@Override | ||
public void onFailure(Exception e) {} | ||
}; | ||
|
||
public void testTimestampPinUnpin() throws Exception { | ||
prepareCluster(1, 1, INDEX_NAME, 0, 2); | ||
ensureGreen(INDEX_NAME); | ||
|
||
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( | ||
RemoteStorePinnedTimestampService.class, | ||
primaryNodeName(INDEX_NAME) | ||
); | ||
|
||
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps(); | ||
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1(); | ||
assertEquals(-1L, lastFetchTimestamp); | ||
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2()); | ||
|
||
assertThrows( | ||
IllegalArgumentException.class, | ||
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener) | ||
); | ||
|
||
long timestamp1 = System.currentTimeMillis() + 30000L; | ||
long timestamp2 = System.currentTimeMillis() + 60000L; | ||
long timestamp3 = System.currentTimeMillis() + 900000L; | ||
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener); | ||
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener); | ||
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener); | ||
|
||
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); | ||
|
||
assertBusy(() -> { | ||
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); | ||
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1(); | ||
assertTrue(lastFetchTimestamp_2 != -1); | ||
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2()); | ||
}); | ||
|
||
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); | ||
|
||
// This should be a no-op as pinning entity is different | ||
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener); | ||
// Unpinning already pinned entity | ||
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener); | ||
// Adding different entity to already pinned timestamp | ||
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener); | ||
|
||
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1)); | ||
|
||
assertBusy(() -> { | ||
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps(); | ||
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1(); | ||
assertTrue(lastFetchTimestamp_3 != -1); | ||
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2()); | ||
}); | ||
|
||
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.common.remote.RemoteWriteableBlobEntity; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.core.common.io.stream.Writeable; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> { | ||
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class); | ||
|
||
/** | ||
* Represents a collection of pinned timestamps and their associated pinning entities. | ||
* This class is thread-safe and implements the Writeable interface for serialization. | ||
*/ | ||
public static class PinnedTimestamps implements Writeable { | ||
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap; | ||
|
||
public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) { | ||
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection); | ||
} | ||
|
||
public static PinnedTimestamps readFrom(StreamInput in) throws IOException { | ||
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList)); | ||
} | ||
|
||
/** | ||
* Pins a timestamp against a pinning entity. | ||
* | ||
* @param timestamp The timestamp to pin. | ||
* @param pinningEntity The entity pinning the timestamp. | ||
*/ | ||
public void pin(Long timestamp, String pinningEntity) { | ||
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity); | ||
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity); | ||
} | ||
|
||
/** | ||
* Unpins a timestamp for a specific pinning entity. | ||
* | ||
* @param timestamp The timestamp to unpin. | ||
* @param pinningEntity The entity unpinning the timestamp. | ||
*/ | ||
public void unpin(Long timestamp, String pinningEntity) { | ||
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); | ||
if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false | ||
|| pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) { | ||
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity); | ||
Check warning on line 82 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java Codecov / codecov/patchserver/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L82
|
||
} | ||
pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> { | ||
v.remove(pinningEntity); | ||
return v.isEmpty() ? null : v; | ||
}); | ||
} | ||
|
||
public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() { | ||
return new HashMap<>(pinnedTimestampPinningEntityMap); | ||
} | ||
} | ||
|
||
public static final String PINNED_TIMESTAMPS = "pinned_timestamps"; | ||
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>( | ||
PINNED_TIMESTAMPS, | ||
PinnedTimestamps::readFrom | ||
); | ||
|
||
private PinnedTimestamps pinnedTimestamps; | ||
|
||
public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) { | ||
super(clusterUUID, compressor); | ||
pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS); | ||
Check warning on line 110 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java Codecov / codecov/patchserver/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L110
|
||
} | ||
|
||
@Override | ||
public String getType() { | ||
return PINNED_TIMESTAMPS; | ||
Check warning on line 115 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java Codecov / codecov/patchserver/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L115
|
||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis())); | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); | ||
} | ||
|
||
@Override | ||
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException { | ||
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream)); | ||
} | ||
|
||
public void setBlobFileName(String blobFileName) { | ||
this.blobFileName = blobFileName; | ||
} | ||
Check warning on line 135 in server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java Codecov / codecov/patchserver/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java#L134-L135
|
||
|
||
public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) { | ||
this.pinnedTimestamps = pinnedTimestamps; | ||
} | ||
|
||
public PinnedTimestamps getPinnedTimestamps() { | ||
return pinnedTimestamps; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.remote.RemoteWriteableBlobEntity; | ||
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore; | ||
import org.opensearch.index.translog.transfer.BlobStoreTransferService; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
/** | ||
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps} | ||
*/ | ||
public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore< | ||
RemotePinnedTimestamps.PinnedTimestamps, | ||
RemotePinnedTimestamps> { | ||
|
||
public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; | ||
private final BlobStoreRepository blobStoreRepository; | ||
|
||
public RemoteStorePinnedTimestampsBlobStore( | ||
BlobStoreTransferService blobStoreTransferService, | ||
BlobStoreRepository blobStoreRepository, | ||
String clusterName, | ||
ThreadPool threadPool, | ||
String executor | ||
) { | ||
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN); | ||
this.blobStoreRepository = blobStoreRepository; | ||
} | ||
|
||
@Override | ||
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) { | ||
return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN); | ||
Check warning on line 41 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java Codecov / codecov/patchserver/src/main/java/org/opensearch/gateway/remote/model/RemoteStorePinnedTimestampsBlobStore.java#L41
|
||
} | ||
} |