From 1e81559cd53be169a2f57f409fa83490407284b8 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 6 Jul 2023 19:46:36 +0530 Subject: [PATCH] [Remote Segment Store] Make metadata file immutable (#8363) Authored-by: Sachin Kale --- .../replication/SegmentReplicationIT.java | 3 +- .../opensearch/remotestore/RemoteStoreIT.java | 2 +- .../shard/RemoteStoreRefreshListener.java | 20 +- .../index/store/RemoteDirectory.java | 41 +++ .../store/RemoteSegmentStoreDirectory.java | 167 ++++----- .../translog/InternalTranslogManager.java | 5 + .../index/translog/NoOpTranslogManager.java | 5 + .../index/translog/TranslogManager.java | 2 + .../blobstore/fs/FsBlobContainerTests.java | 2 +- .../index/store/RemoteDirectoryTests.java | 57 +++- ...moteSegmentStoreDirectoryFactoryTests.java | 20 +- .../RemoteSegmentStoreDirectoryTests.java | 318 ++++++++---------- 12 files changed, 352 insertions(+), 290 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index ddb046139d5cd..4ee15acf3e6c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -801,7 +801,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { public void testPressureServiceStats() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 2b9e38157fb39..e0f052ae75b2e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -316,6 +316,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); - assertEquals(1, getFileCount(indexPath)); + assertEquals(numberOfIterations, getFileCount(indexPath)); } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 93c0aad8c41c5..2396c7445ab60 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -30,6 +30,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.Scheduler; @@ -350,12 +351,19 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); segmentInfosSnapshot.setUserData(userData, false); - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - segmentInfosSnapshot, - storeDirectory, - indexShard.getOperationPrimaryTerm() - ); + Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration(); + if (translogGeneration == null) { + throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store"); + } else { + long translogFileGeneration = translogGeneration.translogFileGeneration; + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + segmentInfosSnapshot, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + translogFileGeneration + ); + } } private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index be4b4e910bb4d..8782808c070ab 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -13,6 +13,8 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -20,10 +22,15 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -61,6 +68,40 @@ public Collection listFilesByPrefix(String filenamePrefix) throws IOExce return blobContainer.listBlobsByPrefix(filenamePrefix).keySet(); } + public List listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException { + List sortedBlobList = new ArrayList<>(); + AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener> actionListener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + sortedBlobList.addAll(blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList())); + } + + @Override + public void onFailure(Exception e) { + exception.set(e); + } + }, latch); + + try { + blobContainer.listBlobsByPrefixInSortedOrder( + filenamePrefix, + limit, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC, + actionListener + ); + latch.await(); + } catch (InterruptedException e) { + throw new IOException("Exception in listFilesByPrefixInLexicographicOrder with prefix: " + filenamePrefix, e); + } + if (exception.get() != null) { + throw new IOException(exception.get()); + } else { + return sortedBlobList; + } + } + /** * Removes an existing file in the directory. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index ac129aca8baf7..e7602203440d2 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -22,6 +22,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -34,15 +35,14 @@ import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** @@ -62,9 +62,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; - public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR = - new MetadataFilenameUtils.MetadataFilenameComparator(); - /** * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data */ @@ -78,12 +75,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - /** - * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation - * This is achieved by uploading refresh metadata file with the same UUID suffix. - */ - private String commonFilenameSuffix; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -105,6 +96,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + private final AtomicLong metadataUploadCounter = new AtomicLong(0); + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, @@ -127,7 +120,6 @@ public RemoteSegmentStoreDirectory( * @throws IOException if there were any failures in reading the metadata file */ public RemoteSegmentMetadata init() throws IOException { - this.commonFilenameSuffix = UUIDs.base64UUID(); RemoteSegmentMetadata remoteSegmentMetadata = readLatestMetadataFile(); if (remoteSegmentMetadata != null) { this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata()); @@ -170,12 +162,15 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c public RemoteSegmentMetadata readLatestMetadataFile() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = null; - Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); - Optional latestMetadataFile = metadataFiles.stream().max(METADATA_FILENAME_COMPARATOR); + List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.METADATA_PREFIX, + 1 + ); - if (latestMetadataFile.isPresent()) { - logger.info("Reading latest Metadata file {}", latestMetadataFile.get()); - remoteSegmentMetadata = readMetadataFile(latestMetadataFile.get()); + if (metadataFiles.isEmpty() == false) { + String latestMetadataFile = metadataFiles.get(0); + logger.info("Reading latest Metadata file {}", latestMetadataFile); + remoteSegmentMetadata = readMetadataFile(latestMetadataFile); } else { logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store"); } @@ -187,8 +182,7 @@ private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws I try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) { byte[] metadataBytes = new byte[(int) indexInput.length()]; indexInput.readBytes(metadataBytes, 0, (int) indexInput.length()); - RemoteSegmentMetadata metadata = metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes)); - return metadata; + return metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes)); } } @@ -242,56 +236,43 @@ static class MetadataFilenameUtils { public static final String SEPARATOR = "__"; public static final String METADATA_PREFIX = "metadata"; - /** - * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID - * Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames. - */ - static class MetadataFilenameComparator implements Comparator { - @Override - public int compare(String first, String second) { - String[] firstTokens = first.split(SEPARATOR); - String[] secondTokens = second.split(SEPARATOR); - if (!firstTokens[0].equals(secondTokens[0])) { - return firstTokens[0].compareTo(secondTokens[0]); - } - long firstPrimaryTerm = getPrimaryTerm(firstTokens); - long secondPrimaryTerm = getPrimaryTerm(secondTokens); - if (firstPrimaryTerm != secondPrimaryTerm) { - return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1; - } else { - long firstGeneration = getGeneration(firstTokens); - long secondGeneration = getGeneration(secondTokens); - if (firstGeneration != secondGeneration) { - return firstGeneration > secondGeneration ? 1 : -1; - } else { - return getUuid(firstTokens).compareTo(getUuid(secondTokens)); - } - } - } - } - static String getMetadataFilePrefixForCommit(long primaryTerm, long generation) { - return String.join(SEPARATOR, METADATA_PREFIX, Long.toString(primaryTerm), Long.toString(generation, Character.MAX_RADIX)); + return String.join( + SEPARATOR, + METADATA_PREFIX, + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation) + ); } // Visible for testing - static String getMetadataFilename(long primaryTerm, long generation, String uuid) { - return String.join(SEPARATOR, getMetadataFilePrefixForCommit(primaryTerm, generation), uuid); + static String getMetadataFilename( + long primaryTerm, + long generation, + long translogGeneration, + long uploadCounter, + int metadataVersion + ) { + return String.join( + SEPARATOR, + METADATA_PREFIX, + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(translogGeneration), + RemoteStoreUtils.invertLong(uploadCounter), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(metadataVersion) + ); } // Visible for testing static long getPrimaryTerm(String[] filenameTokens) { - return Long.parseLong(filenameTokens[1]); + return RemoteStoreUtils.invertLong(filenameTokens[1]); } // Visible for testing static long getGeneration(String[] filenameTokens) { - return Long.parseLong(filenameTokens[2], Character.MAX_RADIX); - } - - // Visible for testing - static String getUuid(String[] filenameTokens) { - return filenameTokens[3]; + return RemoteStoreUtils.invertLong(filenameTokens[2]); } } @@ -379,7 +360,6 @@ public IndexInput openInput(String name, IOContext context) throws IOException { @Override public void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException { String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - mdLockManager.acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build()); } @@ -408,13 +388,19 @@ public void releaseLock(long primaryTerm, long generation, String acquirerId) th @Override public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException { String metadataFile = getMetadataFileForCommit(primaryTerm, generation); + return isLockAcquired(metadataFile); + } + + // Visible for testing + Boolean isLockAcquired(String metadataFile) throws IOException { return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build()); } // Visible for testing String getMetadataFileForCommit(long primaryTerm, long generation) throws IOException { - Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix( - MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation) + List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation), + 1 ); if (metadataFiles.isEmpty()) { @@ -432,33 +418,24 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce + metadataFiles.size() ); } - return metadataFiles.iterator().next(); + return metadataFiles.get(0); } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum) - throws IOException { + public void copyFrom(Directory from, String src, String dest, IOContext context, String checksum) throws IOException { String remoteFilename; - if (useCommonSuffix) { - remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; - } else { - remoteFilename = getNewRemoteSegmentFilename(dest); - } + remoteFilename = getNewRemoteSegmentFilename(dest); remoteDataDirectory.copyFrom(from, src, remoteFilename, context); UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { - copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src)); - } - /** * Copies an existing src file from directory from to a non-existent file dest in this directory. * Once the segment is uploaded to remote segment store, update the cache accordingly. */ @Override public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { - copyFrom(from, src, dest, context, false); + copyFrom(from, src, dest, context, getChecksumOfLocalFile(from, src)); } /** @@ -486,13 +463,16 @@ public void uploadMetadata( Collection segmentFiles, SegmentInfos segmentInfosSnapshot, Directory storeDirectory, - long primaryTerm + long primaryTerm, + long translogGeneration ) throws IOException { synchronized (this) { String metadataFilename = MetadataFilenameUtils.getMetadataFilename( primaryTerm, segmentInfosSnapshot.getGeneration(), - this.commonFilenameSuffix + translogGeneration, + metadataUploadCounter.incrementAndGet(), + RemoteSegmentMetadata.CURRENT_VERSION ); try { IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); @@ -569,15 +549,6 @@ public Map getSegmentsUploadedToRemoteStore() { return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore); } - public Map getSegmentsUploadedToRemoteStore(long primaryTerm, long generation) throws IOException { - String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - - Map segmentsUploadedToRemoteStore = new ConcurrentHashMap<>( - readMetadataFile(metadataFile).getMetadata() - ); - return Collections.unmodifiableMap(segmentsUploadedToRemoteStore); - } - /** * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, @@ -585,9 +556,11 @@ public Map getSegmentsUploadedToRemoteStore(lon * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { - Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); - List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); + public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + List sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.METADATA_PREFIX, + Integer.MAX_VALUE + ); if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { logger.info( "Number of commits in remote segment store={}, lastNMetadataFilesToKeep={}", @@ -598,21 +571,12 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOExceptio } List metadataFilesEligibleToDelete = sortedMetadataFileList.subList( - 0, - sortedMetadataFileList.size() - lastNMetadataFilesToKeep + lastNMetadataFilesToKeep, + sortedMetadataFileList.size() ); List metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> { try { - // TODO: add snapshot interop feature flag here as that will be the first feature to use lock - // manager. - boolean lockManagerEnabled = false; - if (!lockManagerEnabled) { - return true; - } - return !isLockAcquired( - MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)), - MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR)) - ); + return !isLockAcquired(metadataFile); } catch (IOException e) { logger.error( "skipping metadata file (" @@ -699,7 +663,10 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { Return true if it deleted it successfully */ private boolean deleteIfEmpty() throws IOException { - Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + MetadataFilenameUtils.METADATA_PREFIX, + 1 + ); if (metadataFiles.size() != 0) { logger.info("Remote directory still has files , not deleting the path"); return false; diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 0105a54a9430d..c583e2245ff00 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -299,6 +299,11 @@ public void onDelete() { translog.onDelete(); } + @Override + public Translog.TranslogGeneration getTranslogGeneration() { + return translog.getGeneration(); + } + /** * Reads operations from the translog * @param location location of translog diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 3be63113cc667..c274b8c9fcec8 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -117,4 +117,9 @@ public Translog.Location add(Translog.Operation operation) throws IOException { } public void onDelete() {} + + @Override + public Translog.TranslogGeneration getTranslogGeneration() { + return null; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 5c91f6cb7b345..303e84dc2b228 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -126,4 +126,6 @@ public interface TranslogManager { Clean up if any needed on deletion of index */ void onDelete(); + + Translog.TranslogGeneration getTranslogGeneration(); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java index f139a5d4e3bb1..4a2eeabeb7e58 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java @@ -131,7 +131,7 @@ private void testListBlobsByPrefixInSortedOrder(int limit, BlobContainer.BlobNam List blobsInFileSystem = new ArrayList<>(); for (int i = 0; i < 10; i++) { - final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final String blobName = randomAlphaOfLengthBetween(10, 20).toLowerCase(Locale.ROOT); final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb Files.write(path.resolve(blobName), blobData); blobsInFileSystem.add(blobName); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 15f1585bd1477..8ee5fcf0da9d7 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -12,6 +12,8 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.junit.Before; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; @@ -23,15 +25,19 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.doAnswer; public class RemoteDirectoryTests extends OpenSearchTestCase { private BlobContainer blobContainer; @@ -146,6 +152,54 @@ public void testFileLengthIOException() throws IOException { assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1")); } + public void testListFilesByPrefixInLexicographicOrder() throws IOException { + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(3); + latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1))); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + eq("metadata"), + eq(1), + eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), + any(ActionListener.class) + ); + + assertEquals(List.of("metadata_1"), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); + } + + public void testListFilesByPrefixInLexicographicOrderEmpty() throws IOException { + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(3); + latchedActionListener.onResponse(List.of()); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + eq("metadata"), + eq(1), + eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), + any(ActionListener.class) + ); + + assertEquals(List.of(), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); + } + + public void testListFilesByPrefixInLexicographicOrderException() { + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(3); + latchedActionListener.onFailure(new IOException("Error")); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + eq("metadata"), + eq(1), + eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), + any(ActionListener.class) + ); + + assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); + } + public void testGetPendingDeletions() { assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions()); } @@ -165,5 +219,4 @@ public void testRename() { public void testObtainLock() { assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1")); } - } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 324315505987b..bf4b2a14f2567 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -11,8 +11,11 @@ import org.apache.lucene.store.Directory; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.Settings; @@ -28,15 +31,16 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; import java.util.List; import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doAnswer; public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase { @@ -68,7 +72,12 @@ public void testNewDirectory() throws IOException { when(repository.blobStore()).thenReturn(blobStore); when(repository.basePath()).thenReturn(new BlobPath().add("base_path")); when(blobStore.blobContainer(any())).thenReturn(blobContainer); - when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(3); + latchedActionListener.onResponse(List.of()); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder(any(), eq(1), eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), any(ActionListener.class)); when(repositoriesService.repository("remote_store_repository")).thenReturn(repository); @@ -81,7 +90,12 @@ public void testNewDirectory() throws IOException { assertEquals("base_path/uuid_1/0/segments/metadata/", blobPaths.get(1).buildAsString()); assertEquals("base_path/uuid_1/0/segments/lock_files/", blobPaths.get(2).buildAsString()); - verify(blobContainer).listBlobsByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); + verify(blobContainer).listBlobsByPrefixInSortedOrder( + eq(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX), + eq(1), + eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), + any() + ); verify(repositoriesService, times(2)).repository("remote_store_repository"); } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 66e4b9a357b85..c37893877253e 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -32,6 +32,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; @@ -71,6 +72,10 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private SegmentInfos segmentInfos; private ThreadPool threadPool; + private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1); + private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1); + private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1); + @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); @@ -119,50 +124,16 @@ public void testUploadedSegmentMetadataFromString() { assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString()); } - public void testGetMetadataFilename() { - // Generation 23 is replaced by n due to radix 32 - assertEquals( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX + "__12__n__uuid1", - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, "uuid1") - ); - } - public void testGetPrimaryTermGenerationUuid() { - String[] filenameTokens = "abc__12__n__uuid_xyz".split(RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR); + String[] filenameTokens = "abc__9223372036854775795__9223372036854775784__uuid_xyz".split( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR + ); assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm(filenameTokens)); assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration(filenameTokens)); - assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid(filenameTokens)); - } - - public void testMetadataFilenameComparator() { - List metadataFilenames = new ArrayList<>( - List.of( - "abc__10__20__uuid1", - "abc__12__2__uuid2", - "pqr__1__1__uuid0", - "abc__3__n__uuid3", - "abc__10__8__uuid8", - "abc__3__a__uuid4", - "abc__3__a__uuid5" - ) - ); - metadataFilenames.sort(RemoteSegmentStoreDirectory.METADATA_FILENAME_COMPARATOR); - assertEquals( - List.of( - "abc__3__a__uuid4", - "abc__3__a__uuid5", - "abc__3__n__uuid3", - "abc__10__8__uuid8", - "abc__10__20__uuid1", - "abc__12__2__uuid2", - "pqr__1__1__uuid0" - ), - metadataFilenames - ); } public void testInitException() throws IOException { - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( + when(remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, 1)).thenThrow( new IOException("Error") ); @@ -262,29 +233,42 @@ private ByteArrayIndexInput createMetadataFileBytes(Map segmentF } private Map> populateMetadata() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = new ArrayList<>(); + + metadataFiles.add(metadataFilename); + metadataFiles.add(metadataFilename2); + metadataFiles.add(metadataFilename3); + + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(List.of(metadataFilename)); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + Integer.MAX_VALUE + ) + ).thenReturn(metadataFiles); Map> metadataFilenameContentMapping = Map.of( - "metadata__1__5__abc", + metadataFilename, getDummyMetadata("_0", 1), - "metadata__1__6__pqr", + metadataFilename2, getDummyMetadata("_0", 1), - "metadata__2__1__zxv", + metadataFilename3, getDummyMetadata("_0", 1) ); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5) + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer( + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), 23, 12) ); - when(remoteMetadataDirectory.openInput("metadata__1__6__pqr", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__6__pqr"), 1, 6) + when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer( + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), 13, 12) ); - when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2), - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__2__1__zxv"), 1, 2) + when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer( + I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), 38, 10) ); return metadataFilenameContentMapping; @@ -293,9 +277,12 @@ private Map> populateMetadata() throws IOException { public void testInit() throws IOException { populateMetadata(); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv") - ); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(List.of(metadataFilename)); remoteSegmentStoreDirectory.init(); @@ -399,15 +386,15 @@ public void testOpenInputException() throws IOException { public void testAcquireLock() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - String mdFile = "xyz"; String acquirerId = "test-acquirer"; long testPrimaryTerm = 1; long testGeneration = 5; List metadataFiles = List.of("metadata__1__5__abc"); when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), + 1 ) ).thenReturn(metadataFiles); @@ -437,8 +424,9 @@ public void testReleaseLock() throws IOException { List metadataFiles = List.of("metadata__1__5__abc"); when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), + 1 ) ).thenReturn(metadataFiles); @@ -454,8 +442,9 @@ public void testIsAcquired() throws IOException { List metadataFiles = List.of("metadata__1__5__abc"); when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), + 1 ) ).thenReturn(metadataFiles); @@ -471,8 +460,9 @@ public void testIsAcquiredException() throws IOException { List metadataFiles = new ArrayList<>(); when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), + 1 ) ).thenReturn(metadataFiles); @@ -482,14 +472,10 @@ public void testIsAcquiredException() throws IOException { public void testGetMetadataFileForCommit() throws IOException { long testPrimaryTerm = 2; long testGeneration = 3; - List metadataFiles = List.of( - "metadata__1__5__abc", - "metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr", - "metadata__2__1__zxv" - ); when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), + 1 ) ).thenReturn(List.of("metadata__" + testPrimaryTerm + "__" + testGeneration + "__pqr")); @@ -498,33 +484,6 @@ public void testGetMetadataFileForCommit() throws IOException { } - public void testGetSegmentsUploadedToRemoteStore() throws IOException { - long testPrimaryTerm = 1; - long testGeneration = 5; - - List metadataFiles = List.of("metadata__1__5__abc"); - when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) - ) - ).thenReturn(metadataFiles); - - Map> metadataFilenameContentMapping = Map.of( - "metadata__1__5__abc", - getDummyMetadata("_0", 5), - "metadata__1__6__pqr", - getDummyMetadata("_0", 6), - "metadata__2__1__zxv", - getDummyMetadata("_0", 1) - ); - - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get("metadata__1__5__abc"), 1, 5) - ); - - assert (remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore(testPrimaryTerm, testGeneration).containsKey("segments_5")); - } - public void testCopyFrom() throws IOException { String filename = "_100.si"; populateMetadata(); @@ -556,46 +515,20 @@ public void testCopyFromException() throws IOException { storeDirectory.close(); } - public void testCopyFromOverride() throws IOException { - String filename = "_100.si"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = LuceneTestCase.newDirectory(); - IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); - indexOutput.writeString("Hello World!"); - CodecUtil.writeFooter(indexOutput); - indexOutput.close(); - storeDirectory.sync(List.of(filename)); - - assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true); - RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteSegmentStoreDirectory - .getSegmentsUploadedToRemoteStore() - .get(filename); - assertNotNull(uploadedSegmentMetadata); - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true); - assertEquals( - uploadedSegmentMetadata.toString(), - remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get(filename).toString() - ); - - storeDirectory.close(); - } - public void testContainsFile() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadata, 1, 5) - ); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1, 5)); remoteSegmentStoreDirectory.init(); @@ -625,7 +558,7 @@ public void testUploadMetadataEmpty() throws IOException { Collection segmentFiles = List.of("s1", "s2", "s3"); assertThrows( NoSuchFileException.class, - () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L) + () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L) ); } @@ -637,16 +570,19 @@ public void testUploadMetadataNonEmpty() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); - long generation = segmentInfos.getGeneration(); - when(storeDirectory.createOutput(startsWith("metadata__12__" + generation), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + String generation = RemoteStoreUtils.invertLong(segmentInfos.getGeneration()); + String primaryTerm = RemoteStoreUtils.invertLong(12); + when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT))).thenReturn( + indexOutput + ); Collection segmentFiles = List.of("_0.si", "_0.cfe", "_0.cfs", "segments_1"); - remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L); + remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L); verify(remoteMetadataDirectory).copyFrom( eq(storeDirectory), - startsWith("metadata__12__" + generation), - startsWith("metadata__12__" + generation), + startsWith("metadata__" + primaryTerm + "__" + generation), + startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT) ); @@ -669,10 +605,13 @@ public void testUploadMetadataNonEmpty() throws IOException { } public void testNoMetadataHeaderCorruptIndexException() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); @@ -683,16 +622,19 @@ public void testNoMetadataHeaderCorruptIndexException() throws IOException { indexOutput.writeMapOfStrings(metadata); indexOutput.close(); ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } public void testInvalidCodecHeaderCorruptIndexException() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); @@ -705,16 +647,19 @@ public void testInvalidCodecHeaderCorruptIndexException() throws IOException { CodecUtil.writeFooter(indexOutput); indexOutput.close(); ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } public void testHeaderMinVersionCorruptIndexException() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); @@ -727,16 +672,19 @@ public void testHeaderMinVersionCorruptIndexException() throws IOException { CodecUtil.writeFooter(indexOutput); indexOutput.close(); ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); assertThrows(IndexFormatTooOldException.class, () -> remoteSegmentStoreDirectory.init()); } public void testHeaderMaxVersionCorruptIndexException() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); @@ -749,16 +697,19 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException { CodecUtil.writeFooter(indexOutput); indexOutput.close(); ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); assertThrows(IndexFormatTooNewException.class, () -> remoteSegmentStoreDirectory.init()); } public void testIncorrectChecksumCorruptIndexException() throws IOException { - List metadataFiles = List.of("metadata__1__5__abc"); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( - metadataFiles - ); + List metadataFiles = List.of(metadataFilename); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); @@ -775,16 +726,19 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { indexOutputSpy.close(); ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - when(remoteMetadataDirectory.openInput("metadata__1__5__abc", IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); + when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput); assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } public void testDeleteStaleCommitsException() throws Exception { populateMetadata(); - when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( - new IOException("Error reading") - ); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + Integer.MAX_VALUE + ) + ).thenThrow(new IOException("Error reading")); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not @@ -840,20 +794,20 @@ public void testDeleteStaleCommitsActualDelete() throws Exception { // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); - for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); - verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + verify(remoteMetadataDirectory).deleteFile(metadataFilename3); } public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); - String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc") + String segmentFileWithException = metadataFilenameContentMapping.get(metadataFilename3) .values() .stream() .findAny() @@ -864,20 +818,19 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); - for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } - ; assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); - verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); + verify(remoteMetadataDirectory, times(0)).deleteFile(metadataFilename3); } public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); - String segmentFileWithException = metadataFilenameContentMapping.get("metadata__1__5__abc") + String segmentFileWithException = metadataFilenameContentMapping.get(metadataFilename) .values() .stream() .findAny() @@ -888,13 +841,12 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Excep // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); - for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { + for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } - ; assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); - verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + verify(remoteMetadataDirectory).deleteFile(metadataFilename3); } public void testSegmentMetadataCurrentVersion() { @@ -909,6 +861,20 @@ public void testSegmentMetadataCurrentVersion() { assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 1); } + public void testMetadataFileNameOrder() { + String file1 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 21, 23, 1, 1); + String file2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 38, 1, 1); + String file3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(18, 12, 26, 1, 1); + String file4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 10, 1); + String file5 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 1, 1); + String file6 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 32, 5, 1); + + List actualList = new ArrayList<>(List.of(file1, file2, file3, file4, file5, file6)); + actualList.sort(String::compareTo); + + assertEquals(List.of(file3, file2, file4, file6, file5, file1), actualList); + } + private static class WrapperIndexOutput extends IndexOutput { public IndexOutput indexOutput;