Skip to content

Commit

Permalink
Add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Aug 9, 2024
1 parent 3fe94b0 commit 9b2823c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;
import org.opensearch.common.collect.Tuple;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

public void testTimestampPinUnpin() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(RemoteStorePinnedTimestampService.class, primaryNodeName(INDEX_NAME));

Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(IllegalArgumentException.class, () -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener));

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1();
assertTrue(lastFetchTimestamp_2 != -1);
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);
// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
assertTrue(lastFetchTimestamp_3 != -1);
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
});

remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public RemoteStorePinnedTimestampsBlobStore(

@Override
public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) {
return blobStoreRepository.basePath();
return blobStoreRepository.basePath().add("pinned_timestamps");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5;

private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -215,6 +215,7 @@ public void close() throws IOException {
asyncUpdatePinnedTimestampTask.close();
}

// Visible for testing
public void setPinnedTimestampsSchedulerInterval(TimeValue pinnedTimestampsSchedulerInterval) {
this.pinnedTimestampsSchedulerInterval = pinnedTimestampsSchedulerInterval;
rescheduleAsyncUpdatePinnedTimestampTask();
Expand All @@ -228,6 +229,10 @@ private void rescheduleAsyncUpdatePinnedTimestampTask() {
}
}

public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand Down

0 comments on commit 9b2823c

Please sign in to comment.