diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
index ddb046139d5cd..4ee15acf3e6c4 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
@@ -801,7 +801,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
     public void testPressureServiceStats() throws Exception {
         final String primaryNode = internalCluster().startNode();
         createIndex(INDEX_NAME);
-        final String replicaNode = internalCluster().startNode();
+        ensureYellow(INDEX_NAME);
+        final String replicaNode = internalCluster().startDataOnlyNode();
         ensureGreen(INDEX_NAME);
 
         int initialDocCount = scaledRandomIntBetween(100, 200);
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java
index 2b9e38157fb39..e0f052ae75b2e 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java
@@ -316,6 +316,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
             .get()
             .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
         Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
-        assertEquals(1, getFileCount(indexPath));
+        assertEquals(numberOfIterations, getFileCount(indexPath));
     }
 }
diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
index 93c0aad8c41c5..2396c7445ab60 100644
--- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
+++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java
@@ -30,6 +30,7 @@
 import org.opensearch.index.seqno.SequenceNumbers;
 import org.opensearch.index.store.RemoteSegmentStoreDirectory;
 import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
+import org.opensearch.index.translog.Translog;
 import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
 import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
 import org.opensearch.threadpool.Scheduler;
@@ -350,12 +351,19 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
         userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
         segmentInfosSnapshot.setUserData(userData, false);
 
-        remoteDirectory.uploadMetadata(
-            localSegmentsPostRefresh,
-            segmentInfosSnapshot,
-            storeDirectory,
-            indexShard.getOperationPrimaryTerm()
-        );
+        Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration();
+        if (translogGeneration == null) {
+            throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store");
+        } else {
+            long translogFileGeneration = translogGeneration.translogFileGeneration;
+            remoteDirectory.uploadMetadata(
+                localSegmentsPostRefresh,
+                segmentInfosSnapshot,
+                storeDirectory,
+                indexShard.getOperationPrimaryTerm(),
+                translogFileGeneration
+            );
+        }
     }
 
     private boolean uploadNewSegments(Collection<String> localSegmentsPostRefresh) throws IOException {
diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
index be4b4e910bb4d..8782808c070ab 100644
--- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
+++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java
@@ -13,6 +13,8 @@
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.LatchedActionListener;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
 
@@ -20,10 +22,15 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
@@ -61,6 +68,40 @@ public Collection<String> listFilesByPrefix(String filenamePrefix) throws IOExce
         return blobContainer.listBlobsByPrefix(filenamePrefix).keySet();
     }
 
+    public List<String> listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException {
+        List<String> sortedBlobList = new ArrayList<>();
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        LatchedActionListener<List<BlobMetadata>> actionListener = new LatchedActionListener<>(new ActionListener<>() {
+            @Override
+            public void onResponse(List<BlobMetadata> blobMetadata) {
+                sortedBlobList.addAll(blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                exception.set(e);
+            }
+        }, latch);
+
+        try {
+            blobContainer.listBlobsByPrefixInSortedOrder(
+                filenamePrefix,
+                limit,
+                BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC,
+                actionListener
+            );
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new IOException("Exception in listFilesByPrefixInLexicographicOrder with prefix: " + filenamePrefix, e);
+        }
+        if (exception.get() != null) {
+            throw new IOException(exception.get());
+        } else {
+            return sortedBlobList;
+        }
+    }
+
     /**
      * Removes an existing file in the directory.
      *
diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java
index ac129aca8baf7..e7602203440d2 100644
--- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java
+++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java
@@ -22,6 +22,7 @@
 import org.opensearch.common.UUIDs;
 import org.opensearch.common.io.VersionedCodecStreamWrapper;
 import org.opensearch.common.lucene.store.ByteArrayIndexInput;
+import org.opensearch.index.remote.RemoteStoreUtils;
 import org.opensearch.index.store.lockmanager.FileLockInfo;
 import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
 import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
@@ -34,15 +35,14 @@
 import java.nio.file.NoSuchFileException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /**
@@ -62,9 +62,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
      */
     public static final String SEGMENT_NAME_UUID_SEPARATOR = "__";
 
-    public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR =
-        new MetadataFilenameUtils.MetadataFilenameComparator();
-
     /**
      * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data
      */
@@ -78,12 +75,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
 
     private final ThreadPool threadPool;
 
-    /**
-     * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
-     * This is achieved by uploading refresh metadata file with the same UUID suffix.
-     */
-    private String commonFilenameSuffix;
-
     /**
      * Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
      * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time.
@@ -105,6 +96,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
      */
     protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true);
 
+    private final AtomicLong metadataUploadCounter = new AtomicLong(0);
+
     public RemoteSegmentStoreDirectory(
         RemoteDirectory remoteDataDirectory,
         RemoteDirectory remoteMetadataDirectory,
@@ -127,7 +120,6 @@ public RemoteSegmentStoreDirectory(
      * @throws IOException if there were any failures in reading the metadata file
      */
     public RemoteSegmentMetadata init() throws IOException {
-        this.commonFilenameSuffix = UUIDs.base64UUID();
         RemoteSegmentMetadata remoteSegmentMetadata = readLatestMetadataFile();
         if (remoteSegmentMetadata != null) {
             this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
@@ -170,12 +162,15 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
     public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
         RemoteSegmentMetadata remoteSegmentMetadata = null;
 
-        Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
-        Optional<String> latestMetadataFile = metadataFiles.stream().max(METADATA_FILENAME_COMPARATOR);
+        List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+            MetadataFilenameUtils.METADATA_PREFIX,
+            1
+        );
 
-        if (latestMetadataFile.isPresent()) {
-            logger.info("Reading latest Metadata file {}", latestMetadataFile.get());
-            remoteSegmentMetadata = readMetadataFile(latestMetadataFile.get());
+        if (metadataFiles.isEmpty() == false) {
+            String latestMetadataFile = metadataFiles.get(0);
+            logger.info("Reading latest Metadata file {}", latestMetadataFile);
+            remoteSegmentMetadata = readMetadataFile(latestMetadataFile);
         } else {
             logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store");
         }
@@ -187,8 +182,7 @@ private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws I
         try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) {
             byte[] metadataBytes = new byte[(int) indexInput.length()];
             indexInput.readBytes(metadataBytes, 0, (int) indexInput.length());
-            RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes));
-            return metadata;
+            return metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes));
         }
     }
 
@@ -242,56 +236,43 @@ static class MetadataFilenameUtils {
         public static final String SEPARATOR = "__";
         public static final String METADATA_PREFIX = "metadata";
 
-        /**
-         * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID
-         * Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames.
-         */
-        static class MetadataFilenameComparator implements Comparator<String> {
-            @Override
-            public int compare(String first, String second) {
-                String[] firstTokens = first.split(SEPARATOR);
-                String[] secondTokens = second.split(SEPARATOR);
-                if (!firstTokens[0].equals(secondTokens[0])) {
-                    return firstTokens[0].compareTo(secondTokens[0]);
-                }
-                long firstPrimaryTerm = getPrimaryTerm(firstTokens);
-                long secondPrimaryTerm = getPrimaryTerm(secondTokens);
-                if (firstPrimaryTerm != secondPrimaryTerm) {
-                    return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1;
-                } else {
-                    long firstGeneration = getGeneration(firstTokens);
-                    long secondGeneration = getGeneration(secondTokens);
-                    if (firstGeneration != secondGeneration) {
-                        return firstGeneration > secondGeneration ? 1 : -1;
-                    } else {
-                        return getUuid(firstTokens).compareTo(getUuid(secondTokens));
-                    }
-                }
-            }
-        }
-
         static String getMetadataFilePrefixForCommit(long primaryTerm, long generation) {
-            return String.join(SEPARATOR, METADATA_PREFIX, Long.toString(primaryTerm), Long.toString(generation, Character.MAX_RADIX));
+            return String.join(
+                SEPARATOR,
+                METADATA_PREFIX,
+                RemoteStoreUtils.invertLong(primaryTerm),
+                RemoteStoreUtils.invertLong(generation)
+            );
         }
 
         // Visible for testing
-        static String getMetadataFilename(long primaryTerm, long generation, String uuid) {
-            return String.join(SEPARATOR, getMetadataFilePrefixForCommit(primaryTerm, generation), uuid);
+        static String getMetadataFilename(
+            long primaryTerm,
+            long generation,
+            long translogGeneration,
+            long uploadCounter,
+            int metadataVersion
+        ) {
+            return String.join(
+                SEPARATOR,
+                METADATA_PREFIX,
+                RemoteStoreUtils.invertLong(primaryTerm),
+                RemoteStoreUtils.invertLong(generation),
+                RemoteStoreUtils.invertLong(translogGeneration),
+                RemoteStoreUtils.invertLong(uploadCounter),
+                RemoteStoreUtils.invertLong(System.currentTimeMillis()),
+                String.valueOf(metadataVersion)
+            );
         }
 
         // Visible for testing
         static long getPrimaryTerm(String[] filenameTokens) {
-            return Long.parseLong(filenameTokens[1]);
+            return RemoteStoreUtils.invertLong(filenameTokens[1]);
         }
 
         // Visible for testing
         static long getGeneration(String[] filenameTokens) {
-            return Long.parseLong(filenameTokens[2], Character.MAX_RADIX);
-        }
-
-        // Visible for testing
-        static String getUuid(String[] filenameTokens) {
-            return filenameTokens[3];
+            return RemoteStoreUtils.invertLong(filenameTokens[2]);
         }
     }
 
@@ -379,7 +360,6 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
     @Override
     public void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException {
         String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
-
         mdLockManager.acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build());
     }
 
@@ -408,13 +388,19 @@ public void releaseLock(long primaryTerm, long generation, String acquirerId) th
     @Override
     public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException {
         String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
+        return isLockAcquired(metadataFile);
+    }
+
+    // Visible for testing
+    Boolean isLockAcquired(String metadataFile) throws IOException {
         return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build());
     }
 
     // Visible for testing
     String getMetadataFileForCommit(long primaryTerm, long generation) throws IOException {
-        Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(
-            MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation)
+        List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+            MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation),
+            1
         );
 
         if (metadataFiles.isEmpty()) {
@@ -432,33 +418,24 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce
                     + metadataFiles.size()
             );
         }
-        return metadataFiles.iterator().next();
+        return metadataFiles.get(0);
     }
 
-    public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum)
-        throws IOException {
+    public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum) throws IOException {
         String remoteFilename;
-        if (useCommonSuffix) {
-            remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix;
-        } else {
-            remoteFilename = getNewRemoteSegmentFilename(dest);
-        }
+        remoteFilename = getNewRemoteSegmentFilename(dest);
         remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
         UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src));
         segmentsUploadedToRemoteStore.put(src, segmentMetadata);
     }
 
-    public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
-        copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src));
-    }
-
     /**
      * Copies an existing src file from directory from to a non-existent file dest in this directory.
      * Once the segment is uploaded to remote segment store, update the cache accordingly.
      */
     @Override
     public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
-        copyFrom(from, src, dest, context, false);
+        copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src));
     }
 
     /**
@@ -486,13 +463,16 @@ public void uploadMetadata(
         Collection<String> segmentFiles,
         SegmentInfos segmentInfosSnapshot,
         Directory storeDirectory,
-        long primaryTerm
+        long primaryTerm,
+        long translogGeneration
     ) throws IOException {
         synchronized (this) {
             String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
                 primaryTerm,
                 segmentInfosSnapshot.getGeneration(),
-                this.commonFilenameSuffix
+                translogGeneration,
+                metadataUploadCounter.incrementAndGet(),
+                RemoteSegmentMetadata.CURRENT_VERSION
             );
             try {
                 IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
@@ -569,15 +549,6 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
         return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
     }
 
-    public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore(long primaryTerm, long generation) throws IOException {
-        String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
-
-        Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(
-            readMetadataFile(metadataFile).getMetadata()
-        );
-        return Collections.unmodifiableMap(segmentsUploadedToRemoteStore);
-    }
-
     /**
      * Delete stale segment and metadata files
      * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
@@ -585,9 +556,11 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore(lon
      * @param lastNMetadataFilesToKeep number of metadata files to keep
      * @throws IOException in case of I/O error while reading from / writing to remote segment store
      */
-    private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
-        Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
-        List<String> sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList());
+    public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
+        List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+            MetadataFilenameUtils.METADATA_PREFIX,
+            Integer.MAX_VALUE
+        );
         if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) {
             logger.info(
                 "Number of commits in remote segment store={}, lastNMetadataFilesToKeep={}",
@@ -598,21 +571,12 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOExceptio
         }
 
         List<String> metadataFilesEligibleToDelete = sortedMetadataFileList.subList(
-            0,
-            sortedMetadataFileList.size() - lastNMetadataFilesToKeep
+            lastNMetadataFilesToKeep,
+            sortedMetadataFileList.size()
         );
         List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> {
             try {
-                // TODO: add snapshot interop feature flag here as that will be the first feature to use lock
-                // manager.
-                boolean lockManagerEnabled = false;
-                if (!lockManagerEnabled) {
-                    return true;
-                }
-                return !isLockAcquired(
-                    MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)),
-                    MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR))
-                );
+                return !isLockAcquired(metadataFile);
             } catch (IOException e) {
                 logger.error(
                     "skipping metadata file ("
@@ -699,7 +663,10 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
     Return true if it deleted it successfully
      */
     private boolean deleteIfEmpty() throws IOException {
-        Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
+        Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+            MetadataFilenameUtils.METADATA_PREFIX,
+            1
+        );
         if (metadataFiles.size() != 0) {
             logger.info("Remote directory still has files , not deleting the path");
             return false;
diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
index 0105a54a9430d..c583e2245ff00 100644
--- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
@@ -299,6 +299,11 @@ public void onDelete() {
         translog.onDelete();
     }
 
+    @Override
+    public Translog.TranslogGeneration getTranslogGeneration() {
+        return translog.getGeneration();
+    }
+
     /**
      * Reads operations from the translog
      * @param location location of translog
diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
index 3be63113cc667..c274b8c9fcec8 100644
--- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
@@ -117,4 +117,9 @@ public Translog.Location add(Translog.Operation operation) throws IOException {
     }
 
     public void onDelete() {}
+
+    @Override
+    public Translog.TranslogGeneration getTranslogGeneration() {
+        return null;
+    }
 }
diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
index 5c91f6cb7b345..303e84dc2b228 100644
--- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
@@ -126,4 +126,6 @@ public interface TranslogManager {
     Clean up if any needed on deletion of index
      */
     void onDelete();
+
+    Translog.TranslogGeneration getTranslogGeneration();
 }
diff --git a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java
index f139a5d4e3bb1..4a2eeabeb7e58 100644
--- a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java
+++ b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java
@@ -131,7 +131,7 @@ private void testListBlobsByPrefixInSortedOrder(int limit, BlobContainer.BlobNam
 
         List<String> blobsInFileSystem = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
+            final String blobName = randomAlphaOfLengthBetween(10, 20).toLowerCase(Locale.ROOT);
             final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
             Files.write(path.resolve(blobName), blobData);
             blobsInFileSystem.add(blobName);
diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java
index 15f1585bd1477..8ee5fcf0da9d7 100644
--- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java
+++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java
@@ -12,6 +12,8 @@
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.junit.Before;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.LatchedActionListener;
 import org.opensearch.common.blobstore.BlobContainer;
 import org.opensearch.common.blobstore.BlobMetadata;
 import org.opensearch.common.blobstore.support.PlainBlobMetadata;
@@ -23,15 +25,19 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.mockito.Mockito.mock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doAnswer;
 
 public class RemoteDirectoryTests extends OpenSearchTestCase {
     private BlobContainer blobContainer;
@@ -146,6 +152,54 @@ public void testFileLengthIOException() throws IOException {
         assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1"));
     }
 
+    public void testListFilesByPrefixInLexicographicOrder() throws IOException {
+        doAnswer(invocation -> {
+            LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
+            latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1)));
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(
+                eq("metadata"),
+                eq(1),
+                eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
+                any(ActionListener.class)
+            );
+
+        assertEquals(List.of("metadata_1"), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
+    }
+
+    public void testListFilesByPrefixInLexicographicOrderEmpty() throws IOException {
+        doAnswer(invocation -> {
+            LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
+            latchedActionListener.onResponse(List.of());
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(
+                eq("metadata"),
+                eq(1),
+                eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
+                any(ActionListener.class)
+            );
+
+        assertEquals(List.of(), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
+    }
+
+    public void testListFilesByPrefixInLexicographicOrderException() {
+        doAnswer(invocation -> {
+            LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
+            latchedActionListener.onFailure(new IOException("Error"));
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(
+                eq("metadata"),
+                eq(1),
+                eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
+                any(ActionListener.class)
+            );
+
+        assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
+    }
+
     public void testGetPendingDeletions() {
         assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions());
     }
@@ -165,5 +219,4 @@ public void testRename() {
     public void testObtainLock() {
         assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1"));
     }
-
 }
diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java
index 324315505987b..bf4b2a14f2567 100644
--- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java
+++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java
@@ -11,8 +11,11 @@
 import org.apache.lucene.store.Directory;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.LatchedActionListener;
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.blobstore.BlobMetadata;
 import org.opensearch.common.blobstore.BlobPath;
 import org.opensearch.common.blobstore.BlobStore;
 import org.opensearch.common.settings.Settings;
@@ -28,15 +31,16 @@
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.doAnswer;
 
 public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase {
 
@@ -68,7 +72,12 @@ public void testNewDirectory() throws IOException {
         when(repository.blobStore()).thenReturn(blobStore);
         when(repository.basePath()).thenReturn(new BlobPath().add("base_path"));
         when(blobStore.blobContainer(any())).thenReturn(blobContainer);
-        when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());
+        doAnswer(invocation -> {
+            LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
+            latchedActionListener.onResponse(List.of());
+            return null;
+        }).when(blobContainer)
+            .listBlobsByPrefixInSortedOrder(any(), eq(1), eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), any(ActionListener.class));
 
         when(repositoriesService.repository("remote_store_repository")).thenReturn(repository);
 
@@ -81,7 +90,12 @@ public void testNewDirectory() throws IOException {
             assertEquals("base_path/uuid_1/0/segments/metadata/", blobPaths.get(1).buildAsString());
             assertEquals("base_path/uuid_1/0/segments/lock_files/", blobPaths.get(2).buildAsString());
 
-            verify(blobContainer).listBlobsByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX);
+            verify(blobContainer).listBlobsByPrefixInSortedOrder(
+                eq(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX),
+                eq(1),
+                eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
+                any()
+            );
             verify(repositoriesService, times(2)).repository("remote_store_repository");
         }
     }
diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
index 66e4b9a357b85..c37893877253e 100644
--- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
+++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
@@ -32,6 +32,7 @@
 import org.opensearch.common.settings.Settings;
 import org.opensearch.common.util.concurrent.OpenSearchExecutors;
 import org.opensearch.index.engine.NRTReplicationEngineFactory;
+import org.opensearch.index.remote.RemoteStoreUtils;
 import org.opensearch.index.shard.IndexShard;
 import org.opensearch.index.shard.IndexShardTestCase;
 import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
@@ -71,6 +72,10 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
     private SegmentInfos segmentInfos;
     private ThreadPool threadPool;
 
+    private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1);
+    private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1);
+    private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1);
+
     @Before
     public void setup() throws IOException {
         remoteDataDirectory = mock(RemoteDirectory.class);
@@ -119,50 +124,16 @@ public void testUploadedSegmentMetadataFromString() {
         assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString());
     }
 
-    public void testGetMetadataFilename() {
-        // Generation 23 is replaced by n due to radix 32
-        assertEquals(
-            RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX + "__12__n__uuid1",
-            RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, "uuid1")
-        );
-    }
-
     public void testGetPrimaryTermGenerationUuid() {
-        String[] filenameTokens = "abc__12__n__uuid_xyz".split(RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR);
+        String[] filenameTokens = "abc__9223372036854775795__9223372036854775784__uuid_xyz".split(
+            RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR
+        );
         assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm(filenameTokens));
         assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration(filenameTokens));
-        assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid(filenameTokens));
-    }
-
-    public void testMetadataFilenameComparator() {
-        List<String> metadataFilenames = new ArrayList<>(
-            List.of(
-                "abc__10__20__uuid1",
-                "abc__12__2__uuid2",
-                "pqr__1__1__uuid0",
-                "abc__3__n__uuid3",
-                "abc__10__8__uuid8",
-                "abc__3__a__uuid4",
-                "abc__3__a__uuid5"
-            )
-        );
-        metadataFilenames.sort(RemoteSegmentStoreDirectory.METADATA_FILENAME_COMPARATOR);
-        assertEquals(
-            List.of(
-                "abc__3__a__uuid4",
-                "abc__3__a__uuid5",
-                "abc__3__n__uuid3",
-                "abc__10__8__uuid8",
-                "abc__10__20__uuid1",
-                "abc__12__2__uuid2",
-                "pqr__1__1__uuid0"
-            ),
-            metadataFilenames
-        );
     }
 
     public void testInitException() throws IOException {
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow(
+        when(remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, 1)).thenThrow(
             new IOException("Error")
         );
 
@@ -262,29 +233,42 @@ private ByteArrayIndexInput createMetadataFileBytes(Map<String, String> segmentF
     }
 
     private Map<String, Map<String, String>> populateMetadata() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = new ArrayList<>();
+
+        metadataFiles.add(metadataFilename);
+        metadataFiles.add(metadataFilename2);
+        metadataFiles.add(metadataFilename3);
+
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(List.of(metadataFilename));
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                Integer.MAX_VALUE
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, Map<String, String>> metadataFilenameContentMapping = Map.of(
-            "metadata__1__5__abc",
+            metadataFilename,
             getDummyMetadata("_0", 1),
-            "metadata__1__6__pqr",
+            metadataFilename2,
             getDummyMetadata("_0", 1),
-            "metadata__2__1__zxv",
+            metadataFilename3,
             getDummyMetadata("_0", 1)
         );
 
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(
-            createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5)
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer(
+            I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), 23, 12)
         );
-        when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn(
-            createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__6__pqr"), 1, 6)
+        when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer(
+            I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), 13, 12)
         );
-        when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(
-            createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2),
-            createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2)
+        when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer(
+            I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), 38, 10)
         );
 
         return metadataFilenameContentMapping;
@@ -293,9 +277,12 @@ private Map<String, Map<String, String>> populateMetadata() throws IOException {
     public void testInit() throws IOException {
         populateMetadata();
 
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv")
-        );
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(List.of(metadataFilename));
 
         remoteSegmentStoreDirectory.init();
 
@@ -399,15 +386,15 @@ public void testOpenInputException() throws IOException {
     public void testAcquireLock() throws IOException {
         populateMetadata();
         remoteSegmentStoreDirectory.init();
-        String mdFile = "xyz";
         String acquirerId = "test-acquirer";
         long testPrimaryTerm = 1;
         long testGeneration = 5;
 
         List<String> metadataFiles = List.of("metadata__1__5__abc");
         when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
+                1
             )
         ).thenReturn(metadataFiles);
 
@@ -437,8 +424,9 @@ public void testReleaseLock() throws IOException {
 
         List<String> metadataFiles = List.of("metadata__1__5__abc");
         when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
+                1
             )
         ).thenReturn(metadataFiles);
 
@@ -454,8 +442,9 @@ public void testIsAcquired() throws IOException {
 
         List<String> metadataFiles = List.of("metadata__1__5__abc");
         when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
+                1
             )
         ).thenReturn(metadataFiles);
 
@@ -471,8 +460,9 @@ public void testIsAcquiredException() throws IOException {
 
         List<String> metadataFiles = new ArrayList<>();
         when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
+                1
             )
         ).thenReturn(metadataFiles);
 
@@ -482,14 +472,10 @@ public void testIsAcquiredException() throws IOException {
     public void testGetMetadataFileForCommit() throws IOException {
         long testPrimaryTerm = 2;
         long testGeneration = 3;
-        List<String> metadataFiles = List.of(
-            "metadata__1__5__abc",
-            "metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr",
-            "metadata__2__1__zxv"
-        );
         when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
+                1
             )
         ).thenReturn(List.of("metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr"));
 
@@ -498,33 +484,6 @@ public void testGetMetadataFileForCommit() throws IOException {
 
     }
 
-    public void testGetSegmentsUploadedToRemoteStore() throws IOException {
-        long testPrimaryTerm = 1;
-        long testGeneration = 5;
-
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(
-            remoteMetadataDirectory.listFilesByPrefix(
-                RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration)
-            )
-        ).thenReturn(metadataFiles);
-
-        Map<String, Map<String, String>> metadataFilenameContentMapping = Map.of(
-            "metadata__1__5__abc",
-            getDummyMetadata("_0", 5),
-            "metadata__1__6__pqr",
-            getDummyMetadata("_0", 6),
-            "metadata__2__1__zxv",
-            getDummyMetadata("_0", 1)
-        );
-
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(
-            createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5)
-        );
-
-        assert (remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore(testPrimaryTerm, testGeneration).containsKey("segments_5"));
-    }
-
     public void testCopyFrom() throws IOException {
         String filename = "_100.si";
         populateMetadata();
@@ -556,46 +515,20 @@ public void testCopyFromException() throws IOException {
         storeDirectory.close();
     }
 
-    public void testCopyFromOverride() throws IOException {
-        String filename = "_100.si";
-        populateMetadata();
-        remoteSegmentStoreDirectory.init();
-
-        Directory storeDirectory = LuceneTestCase.newDirectory();
-        IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT);
-        indexOutput.writeString("Hello World!");
-        CodecUtil.writeFooter(indexOutput);
-        indexOutput.close();
-        storeDirectory.sync(List.of(filename));
-
-        assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));
-        remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true);
-        RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteSegmentStoreDirectory
-            .getSegmentsUploadedToRemoteStore()
-            .get(filename);
-        assertNotNull(uploadedSegmentMetadata);
-        remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true);
-        assertEquals(
-            uploadedSegmentMetadata.toString(),
-            remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get(filename).toString()
-        );
-
-        storeDirectory.close();
-    }
-
     public void testContainsFile() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512");
         metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024");
 
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(
-            createMetadataFileBytes(metadata, 1, 5)
-        );
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1, 5));
 
         remoteSegmentStoreDirectory.init();
 
@@ -625,7 +558,7 @@ public void testUploadMetadataEmpty() throws IOException {
         Collection<String> segmentFiles = List.of("s1", "s2", "s3");
         assertThrows(
             NoSuchFileException.class,
-            () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L)
+            () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L)
         );
     }
 
@@ -637,16 +570,19 @@ public void testUploadMetadataNonEmpty() throws IOException {
         BytesStreamOutput output = new BytesStreamOutput();
         IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096);
 
-        long generation = segmentInfos.getGeneration();
-        when(storeDirectory.createOutput(startsWith("metadata__12__" + generation), eq(IOContext.DEFAULT))).thenReturn(indexOutput);
+        String generation = RemoteStoreUtils.invertLong(segmentInfos.getGeneration());
+        String primaryTerm = RemoteStoreUtils.invertLong(12);
+        when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT))).thenReturn(
+            indexOutput
+        );
 
         Collection<String> segmentFiles = List.of("_0.si", "_0.cfe", "_0.cfs", "segments_1");
-        remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L);
+        remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L);
 
         verify(remoteMetadataDirectory).copyFrom(
             eq(storeDirectory),
-            startsWith("metadata__12__" + generation),
-            startsWith("metadata__12__" + generation),
+            startsWith("metadata__" + primaryTerm + "__" + generation),
+            startsWith("metadata__" + primaryTerm + "__" + generation),
             eq(IOContext.DEFAULT)
         );
 
@@ -669,10 +605,13 @@ public void testUploadMetadataNonEmpty() throws IOException {
     }
 
     public void testNoMetadataHeaderCorruptIndexException() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
@@ -683,16 +622,19 @@ public void testNoMetadataHeaderCorruptIndexException() throws IOException {
         indexOutput.writeMapOfStrings(metadata);
         indexOutput.close();
         ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
 
         assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
     }
 
     public void testInvalidCodecHeaderCorruptIndexException() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
@@ -705,16 +647,19 @@ public void testInvalidCodecHeaderCorruptIndexException() throws IOException {
         CodecUtil.writeFooter(indexOutput);
         indexOutput.close();
         ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
 
         assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
     }
 
     public void testHeaderMinVersionCorruptIndexException() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
@@ -727,16 +672,19 @@ public void testHeaderMinVersionCorruptIndexException() throws IOException {
         CodecUtil.writeFooter(indexOutput);
         indexOutput.close();
         ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
 
         assertThrows(IndexFormatTooOldException.class, () -> remoteSegmentStoreDirectory.init());
     }
 
     public void testHeaderMaxVersionCorruptIndexException() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234");
@@ -749,16 +697,19 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException {
         CodecUtil.writeFooter(indexOutput);
         indexOutput.close();
         ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
 
         assertThrows(IndexFormatTooNewException.class, () -> remoteSegmentStoreDirectory.init());
     }
 
     public void testIncorrectChecksumCorruptIndexException() throws IOException {
-        List<String> metadataFiles = List.of("metadata__1__5__abc");
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn(
-            metadataFiles
-        );
+        List<String> metadataFiles = List.of(metadataFilename);
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                1
+            )
+        ).thenReturn(metadataFiles);
 
         Map<String, String> metadata = new HashMap<>();
         metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512");
@@ -775,16 +726,19 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException {
         indexOutputSpy.close();
 
         ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
-        when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
+        when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
 
         assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
     }
 
     public void testDeleteStaleCommitsException() throws Exception {
         populateMetadata();
-        when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow(
-            new IOException("Error reading")
-        );
+        when(
+            remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+                RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+                Integer.MAX_VALUE
+            )
+        ).thenThrow(new IOException("Error reading"));
 
         // popluateMetadata() adds stub to return 3 metadata files
         // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not
@@ -840,20 +794,20 @@ public void testDeleteStaleCommitsActualDelete() throws Exception {
         // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
         remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
 
-        for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) {
+        for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
             String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
             verify(remoteDataDirectory).deleteFile(uploadedFilename);
         }
         ;
         assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
-        verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc");
+        verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
     }
 
     public void testDeleteStaleCommitsActualDeleteIOException() throws Exception {
         Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
         remoteSegmentStoreDirectory.init();
 
-        String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc")
+        String segmentFileWithException = metadataFilenameContentMapping.get(metadataFilename3)
             .values()
             .stream()
             .findAny()
@@ -864,20 +818,19 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws Exception {
         // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
         remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
 
-        for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) {
+        for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
             String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
             verify(remoteDataDirectory).deleteFile(uploadedFilename);
         }
-        ;
         assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
-        verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc");
+        verify(remoteMetadataDirectory, times(0)).deleteFile(metadataFilename3);
     }
 
     public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception {
         Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
         remoteSegmentStoreDirectory.init();
 
-        String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc")
+        String segmentFileWithException = metadataFilenameContentMapping.get(metadataFilename)
             .values()
             .stream()
             .findAny()
@@ -888,13 +841,12 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Excep
         // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
         remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
 
-        for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) {
+        for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
             String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
             verify(remoteDataDirectory).deleteFile(uploadedFilename);
         }
-        ;
         assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
-        verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc");
+        verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
     }
 
     public void testSegmentMetadataCurrentVersion() {
@@ -909,6 +861,20 @@ public void testSegmentMetadataCurrentVersion() {
         assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 1);
     }
 
+    public void testMetadataFileNameOrder() {
+        String file1 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 21, 23, 1, 1);
+        String file2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 38, 1, 1);
+        String file3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(18, 12, 26, 1, 1);
+        String file4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 10, 1);
+        String file5 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 1, 1);
+        String file6 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 5, 1);
+
+        List<String> actualList = new ArrayList<>(List.of(file1, file2, file3, file4, file5, file6));
+        actualList.sort(String::compareTo);
+
+        assertEquals(List.of(file3, file2, file4, file6, file5, file1), actualList);
+    }
+
     private static class WrapperIndexOutput extends IndexOutput {
         public IndexOutput indexOutput;