diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index b3187c1bd2ec2..030491cf8b7b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -77,7 +77,11 @@ public void pin(Long timestamp, String pinningEntity) { */ public void unpin(Long timestamp, String pinningEntity) { logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity); - pinnedTimestampPinningEntityMap.computeIfPresent(timestamp, (k, v) -> { + if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false + || pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) { + logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity); + } + pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> { v.remove(pinningEntity); return v.isEmpty() ? null : v; }); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 8723e55a6e52f..35730a75a8142 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -35,7 +35,9 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -50,7 +52,6 @@ public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); public static final int PINNED_TIMESTAMP_FILES_TO_KEEP = 5; - public static final int TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS = 10000; private final Supplier repositoriesService; private final Settings settings; @@ -61,6 +62,7 @@ public class RemoteStorePinnedTimestampService implements Closeable { private RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore; private AsyncUpdatePinnedTimestampTask asyncUpdatePinnedTimestampTask; private volatile TimeValue pinnedTimestampsSchedulerInterval; + private final Semaphore updateTimetampPinningSemaphore = new Semaphore(1); /** * Controls pinned timestamp scheduler interval @@ -72,6 +74,17 @@ public class RemoteStorePinnedTimestampService implements Closeable { Setting.Property.NodeScope ); + /** + * Controls allowed timestamp values to be pinned from past + */ + public static final Setting CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL = Setting.timeSetting( + "cluster.remote_store.pinned_timestamps.lookback_interval", + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope + ); + public RemoteStorePinnedTimestampService( Supplier repositoriesService, Settings settings, @@ -101,7 +114,7 @@ private void validateRemoteStoreConfiguration() { final String remoteStoreRepo = settings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY ); - assert remoteStoreRepo != null : "Remote Cluster State repository is not configured"; + assert remoteStoreRepo != null : "Remote Segment Store repository is not configured"; final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; @@ -134,8 +147,11 @@ private void startAsyncUpdateTask() { public void pinTimestamp(long timestamp, String pinningEntity, ActionListener listener) { // If a caller uses current system time to pin the timestamp, following check will almost always fail. // So, we allow pinning timestamp in the past upto some buffer - if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - TIMESTAMP_PINNING_PAST_BUFFER_IN_MILLIS) { - throw new IllegalArgumentException("Timestamp to be pinned is less than current timestamp"); + long lookbackIntervalInMills = CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL.get(settings).millis(); + if (timestamp < TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lookbackIntervalInMills) { + throw new IllegalArgumentException( + "Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval" + ); } updatePinning(pinnedTimestamps -> pinnedTimestamps.pin(timestamp, pinningEntity), listener); } @@ -157,39 +173,72 @@ private void updatePinning(Consumer updateConsumer, ActionList blobStoreRepository.getCompressor() ); BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps); - blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), Integer.MAX_VALUE, new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); - if (blobMetadata.isEmpty() == false) { - pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); - } - updateConsumer.accept(pinnedTimestamps); - remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); - pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, listener); + try { + if (updateTimetampPinningSemaphore.tryAcquire(10, TimeUnit.MINUTES)) { + ActionListener semaphoreAwareListener = ActionListener.runBefore(listener, updateTimetampPinningSemaphore::release); + ActionListener> listCallResponseListener = getListenerForListCallResponse( + remotePinnedTimestamps, + updateConsumer, + semaphoreAwareListener + ); + blobStoreTransferService.listAllInSortedOrder( + path, + remotePinnedTimestamps.getType(), + Integer.MAX_VALUE, + listCallResponseListener + ); + } else { + throw new TimeoutException("Timed out while waiting to acquire lock in updatePinning"); + } + } catch (InterruptedException | TimeoutException e) { + listener.onFailure(e); + } + } - // Delete older pinnedTimestamp files - if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { - List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) - .stream() - .map(BlobMetadata::name) - .collect(Collectors.toList()); - try { - blobStoreTransferService.deleteBlobs( - pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), - oldFilesToBeDeleted - ); - } catch (IOException e) { - logger.error("Exception while deleting stale pinned timestamps", e); - } - } + private ActionListener> getListenerForListCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + Consumer updateConsumer, + ActionListener listener + ) { + return ActionListener.wrap(blobMetadata -> { + PinnedTimestamps pinnedTimestamps = new PinnedTimestamps(new HashMap<>()); + if (blobMetadata.isEmpty() == false) { + pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps); } + updateConsumer.accept(pinnedTimestamps); + remotePinnedTimestamps.setPinnedTimestamps(pinnedTimestamps); + ActionListener writeCallResponseListener = getListenerForWriteCallResponse( + remotePinnedTimestamps, + blobMetadata, + listener + ); + pinnedTimestampsBlobStore.writeAsync(remotePinnedTimestamps, writeCallResponseListener); + }, listener::onFailure); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + private ActionListener getListenerForWriteCallResponse( + RemotePinnedTimestamps remotePinnedTimestamps, + List blobMetadata, + ActionListener listener + ) { + return ActionListener.wrap(unused -> { + // Delete older pinnedTimestamp files + if (blobMetadata.size() > PINNED_TIMESTAMP_FILES_TO_KEEP) { + List oldFilesToBeDeleted = blobMetadata.subList(PINNED_TIMESTAMP_FILES_TO_KEEP, blobMetadata.size()) + .stream() + .map(BlobMetadata::name) + .collect(Collectors.toList()); + try { + blobStoreTransferService.deleteBlobs( + pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps), + oldFilesToBeDeleted + ); + } catch (IOException e) { + logger.error("Exception while deleting stale pinned timestamps", e); + } } - }); + listener.onResponse(null); + }, listener::onFailure); } private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {