diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f5a9ed8ed9362..6e9c62aeadb22 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -254,14 +254,67 @@ private static void downloadOnce( Files.createDirectories(location); } - // Delete translog files on local before downloading from remote + Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); + Map generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper(); + long maxGeneration = translogMetadata.getGeneration(); + long minGeneration = translogMetadata.getMinTranslogGeneration(); + + // Delete any translog and checkpoint file which is not part of the current generation range. for (Path file : FileSystemUtils.files(location)) { - Files.delete(file); + try { + long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN); + if (generation < minGeneration || generation > maxGeneration) { + // If the generation is outside the required range, then we delete the same. + Files.delete(file); + } + } catch (IllegalStateException | IllegalArgumentException e) { + // Delete any file which does not conform to Translog or Checkpoint filename patterns. + Files.delete(file); + } } - Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); - for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { + // For incremental downloads, we will check if the local translog matches the one present in + // remote store. If so, we will skip its download. + for (long i = maxGeneration; i >= minGeneration; i--) { String generation = Long.toString(i); + String translogFilename = Translog.getFilename(i); + Path targetTranslogPath = location.resolve(translogFilename); + + // If we have the translog available for the generation locally, then we need to + // compare the checksum with that in remote obtained via metadata. + // For backward compatibility, we consider the following cases here- + // - Remote metadata does not have the mapping for generation + // - Local translog file lacks the checksum value in footer + // In both these cases, we will download the files for the generation. + if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) { + try { + final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation)); + final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath); + + // If the local and remote checksum are same, then continue. + // Else exit the loop and download the translog. + if (actualChecksum != null && actualChecksum == expectedChecksum) { + logger.info( + "Download skipped for translog and checkpoint files for generation={} due to them being locally present", + generation + ); + + // Mark the translog and checkpoint file as available in the file tracker. + translogTransferManager.markFileAsDownloaded(translogFilename); + translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i)); + continue; + } + } catch (IOException e) { + // The exception can occur if the remote translog files were uploaded without footer. + logger.info( + "Exception occurred during reconciliation of translog state between local and remote. " + + "Reverting to downloading the translog and checksum files for generation={}", + generation + ); + } + } + + logger.info("Downloading translog and checkpoint files for generation={}", generation); translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } logger.info( diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 4b4ceb7444471..c1f7860cf5716 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -134,6 +134,8 @@ public abstract class Translog extends AbstractIndexShardComponent implements In public static final String CHECKPOINT_SUFFIX = ".ckp"; public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; + // STRICT_TLOG_OR_CKP_PATTERN is the strict pattern for Translog or Checkpoint file. + static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$"); static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID()); @@ -320,14 +322,18 @@ public static long parseIdFromFileName(Path translogFile) { return parseIdFromFileName(fileName); } - public static long parseIdFromFileName(String fileName) { - final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName); + public static long parseIdFromFileName(String translogFile) { + return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN); + } + + public static long parseIdFromFileName(String fileName, Pattern pattern) { + final Matcher matcher = pattern.matcher(fileName); if (matcher.matches()) { try { return Long.parseLong(matcher.group(1)); } catch (NumberFormatException e) { throw new IllegalStateException( - "number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]", + "number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]", e ); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java new file mode 100644 index 0000000000000..c4d6af131af0b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.opensearch.common.io.Channels; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Handles the writing and reading of the translog footer, which contains the checksum of the translog data. + * + * Each translog file is structured as follows: + * + * 1. Translog header + * 2. Translog operations + * 3. Translog footer + * + * The footer contains the following information: + * + * - Magic number (int): A constant value that identifies the start of the footer. + * - Algorithm ID (int): The identifier of the checksum algorithm used. Currently, this is always 0. + * - Checksum (long): The checksum of the entire translog data, calculated using the specified algorithm. + */ +public class TranslogFooter { + + /** + * FOOTER_LENGTH is the length of the present footer added to translog files. + * We write 4 bytes for the magic number, 4 bytes for algorithm ID and 8 bytes for the checksum. + * Therefore, the footer length as 16. + * */ + private static final int FOOTER_LENGTH = 16; + + /** + * Returns the length of the translog footer in bytes. + * + * @return the length of the translog footer in bytes + */ + static int footerLength() { + return FOOTER_LENGTH; + } + + /** + * Writes the translog footer to the provided `FileChannel`. + * + * @param channel the `FileChannel` to write the footer to + * @param checksum the checksum value to be written in the footer + * @param toSync whether to force a sync of the written data to the underlying storage + * @return the byte array containing the written footer data + * @throws IOException if an I/O error occurs while writing the footer + */ + static byte[] write(FileChannel channel, long checksum, boolean toSync) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream)); + + CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC); + CodecUtil.writeBEInt(out, 0); + CodecUtil.writeBELong(out, checksum); + + Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel); + if (toSync) { + channel.force(false); + } + + return byteArrayOutputStream.toByteArray(); + } + + /** + * Reads the checksum value from the footer of the translog file located at the provided `Path`. + * + * If the translog file is of an older version and does not have a footer, this method returns `null`. + * + * @param path the `Path` to the translog file + * @return the checksum value from the translog footer, or `null` if the footer is not present + * @throws IOException if an I/O error occurs while reading the footer + */ + static Long readChecksum(Path path) throws IOException { + try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { + // Read the header and find out if the footer is supported. + final TranslogHeader header = TranslogHeader.read(path, channel); + if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) { + return null; + } + + // Read the footer. + final long fileSize = channel.size(); + final long footerStart = fileSize - TranslogFooter.footerLength(); + ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength()); + int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer); + if (bytesRead != TranslogFooter.footerLength()) { + throw new IOException( + "Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes" + ); + } + footer.flip(); + + // Validate the footer and return the checksum. + int magic = footer.getInt(); + if (magic != CodecUtil.FOOTER_MAGIC) { + throw new IOException("Invalid footer magic number: " + magic); + } + + int algorithmId = footer.getInt(); + if (algorithmId != 0) { + throw new IOException("Unsupported checksum algorithm ID: " + algorithmId); + } + + return footer.getLong(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 66a9fe08d06b5..b2cae8f209f6f 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -65,11 +65,13 @@ public final class TranslogHeader { public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported public static final int VERSION_CHECKPOINTS = 2; // added checkpoints public static final int VERSION_PRIMARY_TERM = 3; // added primary term - public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM; + public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog + public static final int CURRENT_VERSION = VERSION_WITH_FOOTER; private final String translogUUID; private final long primaryTerm; private final int headerSizeInBytes; + private final int translogHeaderVersion; /** * Creates a new translog header with the given uuid and primary term. @@ -80,14 +82,15 @@ public final class TranslogHeader { * All operations' terms in this translog file are enforced to be at most this term. */ TranslogHeader(String translogUUID, long primaryTerm) { - this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID)); + this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION); assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]"; } - private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) { + private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) { this.translogUUID = translogUUID; this.primaryTerm = primaryTerm; this.headerSizeInBytes = headerSizeInBytes; + this.translogHeaderVersion = headerVersion; } public String getTranslogUUID() { @@ -110,6 +113,13 @@ public int sizeInBytes() { return headerSizeInBytes; } + /** + * Returns the version of the translog header. + * */ + public int getTranslogHeaderVersion() { + return translogHeaderVersion; + } + static int headerSizeInBytes(String translogUUID) { return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length); } @@ -127,7 +137,7 @@ private static int headerSizeInBytes(int version, int uuidLength) { static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException { final int version; try { - version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM); + version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { tryReportOldVersionError(path, channel); throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e); @@ -183,7 +193,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th in.read(uuid.bytes, uuid.offset, uuid.length); // Read the primary term final long primaryTerm; - if (version == VERSION_PRIMARY_TERM) { + if (version >= VERSION_PRIMARY_TERM) { primaryTerm = in.readLong(); } else { assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; @@ -202,7 +212,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th + channel.position() + "]"; - return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes); + return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version); } catch (EOFException e) { throw new TranslogCorruptedException(path.toString(), "translog header truncated", e); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index d590663670b28..b50adc5bdc034 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -64,8 +64,13 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { @Nullable private final Long translogChecksum; + + // fullTranslogChecksum is the checksum for translog which includes header, content, and footer. + @Nullable + private final Long fullTranslogChecksum; @Nullable private final Long checkpointChecksum; + private final Boolean hasFooter; /** * Create a translog writer against the specified translog file channel. @@ -80,14 +85,18 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { final FileChannel channel, final Path path, final TranslogHeader header, - final Long translogChecksum + final Long translogChecksum, + final Long fullTranslogChecksum, + final Boolean hasFooter ) throws IOException { super(checkpoint.generation, channel, path, header); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; this.translogChecksum = translogChecksum; + this.fullTranslogChecksum = fullTranslogChecksum; this.checkpointChecksum = (translogChecksum != null) ? calculateCheckpointChecksum(checkpoint, path) : null; + this.hasFooter = hasFooter; } private static Long calculateCheckpointChecksum(Checkpoint checkpoint, Path path) throws IOException { @@ -101,6 +110,14 @@ public Long getTranslogChecksum() { return translogChecksum; } + /** + * getFullTranslogChecksum returns the complete checksum of the translog which includes + * header, content and footer. + * */ + public Long getFullTranslogChecksum() { + return fullTranslogChecksum; + } + public Long getCheckpointChecksum() { return checkpointChecksum; } @@ -118,7 +135,18 @@ public Long getCheckpointChecksum() { public static TranslogReader open(final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { final TranslogHeader header = TranslogHeader.read(translogUUID, path, channel); - return new TranslogReader(checkpoint, channel, path, header, null); + + // When we open a reader to Translog from a path, we want to fetch the checksum + // as that would be needed later on while creating the metadata map for + // generation to checksum. + Long translogChecksum = null; + try { + translogChecksum = TranslogFooter.readChecksum(path); + } catch (IOException ignored) {} + + boolean hasFooter = translogChecksum != null; + + return new TranslogReader(checkpoint, channel, path, header, translogChecksum, null, hasFooter); } /** @@ -146,9 +174,9 @@ TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFac IOUtils.fsync(checkpointFile.getParent(), true); - newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum, fullTranslogChecksum, hasFooter); } else { - newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum, fullTranslogChecksum, hasFooter); } toCloseOnFailure = null; return newReader; @@ -177,6 +205,23 @@ final public Checkpoint getCheckpoint() { * reads an operation at the given position into the given buffer. */ protected void readBytes(ByteBuffer buffer, long position) throws IOException { + if (hasFooter && header.getTranslogHeaderVersion() == TranslogHeader.VERSION_WITH_FOOTER) { + // Ensure that the read request does not overlap with footer. + long translogLengthWithoutFooter = length - TranslogFooter.footerLength(); + if (position >= translogLengthWithoutFooter && position < length) { + throw new EOFException( + "read requested past last ops into footer. pos [" + position + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + // If we are trying to read beyond the last Ops, we need to return EOF error. + long lastPositionToRead = position + buffer.limit(); + if (lastPositionToRead > translogLengthWithoutFooter) { + throw new EOFException( + "trying to read past last ops into footer. pos [" + lastPositionToRead + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + } + if (position >= length) { throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index b0c7d51c3e43b..8a9846f29c54d 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -189,12 +189,12 @@ public static TranslogWriter create( checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled)); - TranslogCheckedContainer translogCheckedContainer = null; - if (Boolean.TRUE.equals(remoteTranslogEnabled)) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - header.write(byteArrayOutputStream); - translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); - } + + // Enable translogCheckedContainer for remote as well as local translog. + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + header.write(byteArrayOutputStream); + TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint( header.sizeInBytes(), fileGeneration, @@ -438,8 +438,41 @@ public TranslogReader closeIntoReader() throws IOException { synchronized (syncLock) { try (ReleasableLock toClose = writeLock.acquire()) { synchronized (this) { + Long translogContentChecksum = null; + Long fullTranslogChecksum = null; try { + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // If we are adding a footer, change the totalOffset. + // This will ensure that footer length is included in the checkpoint. + totalOffset += TranslogFooter.footerLength(); + } + sync(); // sync before we close.. + + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // Post sync, we will add the footer to the translog. + assert translogCheckedContainer != null : "checksum has not been calculated for the translog"; + // add footer to the translog file. + // The checksum in the footer consists of header + body. + byte[] footer = TranslogFooter.write( + channel, + translogCheckedContainer.getChecksum(), + !Boolean.TRUE.equals(remoteTranslogEnabled) + ); + // Store the checksum without footer. + translogContentChecksum = translogCheckedContainer.getChecksum(); + + // update the checked container for translog to account for the footer. + // This is needed because the checksum from the container will be used for + // comparison during remote store upload. + translogCheckedContainer.updateFromBytes(footer, 0, footer.length); + fullTranslogChecksum = translogCheckedContainer.getChecksum(); + } else { + // If we reach here then it means we are using older header and therefore, no footer. + // So, both translogContentChecksum and fullTranslogChecksum are same. + translogContentChecksum = (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null; + fullTranslogChecksum = translogContentChecksum; + } } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -460,7 +493,9 @@ public TranslogReader closeIntoReader() throws IOException { channel, path, header, - (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null + translogContentChecksum, + fullTranslogChecksum, + true ); } else { throw new AlreadyClosedException( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 86f042af0584b..d64ff447c4b8e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -163,16 +163,30 @@ public boolean equals(Object o) { public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; + // translogContentChecksum is the checksum of Translog which does not include footer. + // In contrast, the checksum class variable has the checksum of Translog which includes footer. + private Long translogContentChecksum; public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { super(path, primaryTerm, checksum); this.generation = generation; + this.translogContentChecksum = checksum; + } + + public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Long fullTranslogChecksum) + throws IOException { + this(primaryTerm, generation, path, fullTranslogChecksum); + this.translogContentChecksum = checksum; } public long getGeneration() { return generation; } + public Long getTranslogContentChecksum() { + return translogContentChecksum; + } + @Override public int hashCode() { return Objects.hash(generation, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index ae007c0c33e1e..62d435ec418ce 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -168,7 +168,13 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); generations.add(readerGeneration); translogTransferSnapshot.add( - new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()), + new TranslogFileSnapshot( + readerPrimaryTerm, + readerGeneration, + translogPath, + reader.getTranslogChecksum(), + reader.getFullTranslogChecksum() + ), new CheckpointFileSnapshot( readerPrimaryTerm, checkpointGeneration, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 291218ea47499..ecc3c17541918 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -269,6 +269,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } + /** + * markFileAsDownloaded marks a file as already downloaded in the file transfer tracker. + * This is needed because we want to ensure that even if the file is skipped from being downloaded, + * TranslogTransferManager knows about it and does not re-upload the same to remote store. + * */ + public void markFileAsDownloaded(String filename) { + fileTransferTracker.add(filename, true); + } + /** * Process the provided metadata and tries to recover translog.ckp file to the FS. */ @@ -293,8 +302,8 @@ private void recoverCkpFileUsingMetadata(Map metadata, Path loca private Map downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException { Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - // We need to change this logic when we introduce incremental download + // downloadToFS method will be called only when we want to download the file. + // Therefore, we delete the file if it exists. deleteFileIfExists(filePath); Map metadata = null; @@ -451,8 +460,23 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) snapshot -> String.valueOf(snapshot.getPrimaryTerm()) ) ); + + // generationChecksumMap is the mapping between the generation and the checksum of associated translog file. + Map generationChecksumMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TranslogFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .filter(snapshot -> snapshot.getTranslogContentChecksum() != null) + .collect( + Collectors.toMap( + snapshot -> String.valueOf(snapshot.getGeneration()), + snapshot -> String.valueOf(snapshot.getTranslogContentChecksum()) + ) + ); + TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + translogTransferMetadata.setGenerationToChecksumMapper(new HashMap<>(generationChecksumMap)); return new TransferFileSnapshot( translogTransferMetadata.getFileName(), diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 7fe3305545085..6ecc731235cbc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -42,6 +42,9 @@ public class TranslogTransferMetadata { private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + // generationToChecksumMapper is the mapping between the generation and the checksum of associated translog file. + private final SetOnce> generationToChecksumMapper = new SetOnce<>(); + public static final String METADATA_SEPARATOR = "__"; public static final String METADATA_PREFIX = "metadata"; @@ -96,6 +99,20 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } + /* + * setGenerationToChecksumMapper sets the mapping between the generation and the checksum of associated translog file. + * */ + public void setGenerationToChecksumMapper(Map generationToChecksumMap) { + generationToChecksumMapper.set(generationToChecksumMap); + } + + /* + * getGenerationToChecksumMapper gets the mapping between the generation and the checksum of associated translog file. + * */ + public Map getGenerationToChecksumMapper() { + return generationToChecksumMapper.get(); + } + /* This should be used only at the time of creation. */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java index cea7ef8a4e6dd..8df32a6edb819 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.IndexOutput; import org.opensearch.common.io.IndexIOStreamHandler; +import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -40,6 +41,15 @@ public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOExce TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + // We set the GenerationToChecksumMapper only if it is present in the file. + // Else we initialise it with an empty map. + try { + Map generationToChecksumMapper = indexInput.readMapOfStrings(); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + } catch (EOFException ignored) { + metadata.setGenerationToChecksumMapper(Map.of()); + } + return metadata; } @@ -59,5 +69,11 @@ public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata conte } else { indexOutput.writeMapOfStrings(new HashMap<>()); } + // Write the generation to checksum mapping at the end. + if (content.getGenerationToChecksumMapper() != null) { + indexOutput.writeMapOfStrings(content.getGenerationToChecksumMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } } } diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index cae27d5b259c4..985a8edca7bf4 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -501,9 +501,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(271L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(287L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -513,7 +513,7 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(342L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); @@ -521,9 +521,9 @@ public void testStats() throws IOException { builder.endObject(); assertEquals( "{\"translog\":{\"operations\":4,\"size_in_bytes\":" - + 326 + + 342 + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" - + 271 + + 287 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + ",\"remote_store\":{\"upload\":{" @@ -540,7 +540,7 @@ public void testStats() throws IOException { long lastModifiedAge = System.currentTimeMillis() - translog.getCurrent().getLastModifiedTime(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getEarliestLastModifiedAge(), greaterThanOrEqualTo(lastModifiedAge)); @@ -1754,8 +1754,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 03c77a9a83f57..195e9445a8b7e 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.DataOutput; @@ -19,12 +20,14 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.io.Channels; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; @@ -75,6 +78,7 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -110,6 +114,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -1663,8 +1669,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); @@ -1677,6 +1685,24 @@ public void testCloseIntoReader() throws IOException { final int value = buffer.getInt(); assertEquals(i, value); } + + // Try to read into the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 4); + // Try to read beyond the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 18); + + // Read next 16 bytes directly from the file, which should be the footer. + // This is because for this test, we create a writer which would automatically + // create one with footer. + long translogLengthWithoutFooter = reader.length - TranslogFooter.footerLength(); + ByteBuffer footerBuffer = ByteBuffer.allocate(TranslogFooter.footerLength()); + Channels.readFromFileChannelWithEofException(reader.channel, translogLengthWithoutFooter, footerBuffer); + footerBuffer.flip(); + // Validate the footer. + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); // Algorithm ID + assertEquals(reader.getTranslogChecksum().longValue(), footerBuffer.getLong()); + final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); } finally { @@ -1685,13 +1711,21 @@ public void testCloseIntoReader() throws IOException { } } + // assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader. + private void assertThrowsReadingIntoFooter(TranslogReader reader, int numOps, int bytesToRead) { + final ByteBuffer buffer = ByteBuffer.allocate(bytesToRead); + assertThrows(EOFException.class, () -> reader.readBytes(buffer, reader.getFirstOperationOffset() + numOps * 4)); + } + public void testDownloadWithRetries() throws IOException { long generation = 1, primaryTerm = 1; Path location = createTempDir(); TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); @@ -1801,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); } + /** + * createTranslogFile creates a translog file with the given generation and checksum at the provided location. + * */ + private void createTranslogFile(Path location, long generation, long checksum) throws IOException { + Path translogPath = location.resolve(Translog.getFilename(generation)); + Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(generation)); + Files.createFile(translogPath); + Files.createFile(checkpointPath); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), generation); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + TranslogFooter.write(channel, checksum, true); + } + } + + /** + * testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog + * file present locally. We test if the download logic for the same skips the download of the file. + * */ + public void testIncrementalDownloadWithMatchingChecksum() throws IOException { + // Set up the test scenario + long generation = 1; + long primaryTerm = 1; + long checksum = 1234; + Path location = createTempDir(); + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); + Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); + generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); + generationToChecksumMapper.put(String.valueOf(generation), String.valueOf(checksum)); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file with the same checksum as the remote + createTranslogFile(location, generation, checksum); + + // Verify that the download is skipped + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + verify(mockTransfer, times(0)).downloadTranslog(any(), any(), any()); + } + + /** + * testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations + * in remote but only 1 present locally. We will download only 1 generation in this case. + * */ + public void testIncrementalDownloadWithDifferentChecksum() throws IOException { + // Set up the test scenario + long generation1 = 1, generation2 = 2, primaryTerm = 1; + long checksum1 = 1234, checksum2 = 5678; + Path location = createTempDir(); + + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation2, generation1, 2); + Map generationToPrimaryTermMapper = Map.of( + String.valueOf(generation1), + String.valueOf(primaryTerm), + String.valueOf(generation2), + String.valueOf(primaryTerm) + ); + Map generationToChecksumMapper = Map.of( + String.valueOf(generation1), + String.valueOf(checksum1), + String.valueOf(generation2), + String.valueOf(checksum2) + ); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file for 1 generation. + createTranslogFile(location, generation1, checksum1); + // Download counter to count the files which were downloaded. + AtomicLong downloadCounter = new AtomicLong(); + // Mock the download of second generation. + doAnswer(invocation -> { + downloadCounter.incrementAndGet(); + Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation2))); + return true; + }).when(mockTransfer).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation2), location); + + // Verify that only generation 2 is downloaded. + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + assertEquals(1, downloadCounter.get()); + // verify that generation 1 is not downloaded. + verify(mockTransfer, times(0)).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation1), location); + } + public void testSyncWithGlobalCheckpointUpdate() throws IOException { ArrayList ops = new ArrayList<>(); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java new file mode 100644 index 0000000000000..4d5459ab15c16 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.opensearch.common.UUIDs; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class TranslogFooterTests extends OpenSearchTestCase { + + /** + * testTranslogFooterWrite verifies the functionality of TranslogFooter.write() method + * wherein we write the footer to the translog file. + * */ + public void testTranslogFooterWrite() throws IOException { + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + long expectedChecksum = 0x1234567890ABCDEFL; + byte[] footer = TranslogFooter.write(channel, expectedChecksum, true); + + // Verify the footer contents + ByteBuffer footerBuffer = ByteBuffer.wrap(footer); + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); + assertEquals(expectedChecksum, footerBuffer.getLong()); + + // Verify that the footer was written to the channel + assertEquals(footer.length, channel.size() - (header.sizeInBytes() + operationBytes.length)); + } + } + + /** + * testTranslogFooterReadChecksum verifies the behavior of the TranslogFooter.readChecksum() method, + * which reads the checksum from the footer of a translog file. + * */ + public void testTranslogFooterReadChecksum() throws IOException { + long expectedChecksum = 0x1234567890ABCDEFL; + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer. + TranslogFooter.write(channel, expectedChecksum, true); + } + + // Verify that the checksum can be read correctly + Long actualChecksum = TranslogFooter.readChecksum(translogPath); + assert actualChecksum != null; + assertEquals(expectedChecksum, actualChecksum.longValue()); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java index b99479df9c15e..038c77bf3e6fc 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -29,20 +29,69 @@ public void setUp() throws Exception { handler = new TranslogTransferMetadataHandler(); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ public void testReadContent() throws IOException { TranslogTransferMetadata expectedMetadata = getTestMetadata(); // Operation: Read expected metadata from source input stream. - IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes()); + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); // Verification: Compare actual metadata read from the source input stream. assertEquals(expectedMetadata, actualMetadata); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * that includes the generation-to-checksum map from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ + public void testReadContentForMetadataWithgenerationToChecksumMap() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadataWithGenerationToChecksumMap(); + + // Operation: Read expected metadata from source input stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); + TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); + + // Verification: Compare actual metadata read from the source input stream. + assertEquals(expectedMetadata, actualMetadata); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ public void testWriteContent() throws IOException { - TranslogTransferMetadata expectedMetadata = getTestMetadata(); + verifyWriteContent(getTestMetadata()); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata that includes the generation-to-checksum map to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ + public void testWriteContentWithGeneratonToChecksumMap() throws IOException { + verifyWriteContent(getTestMetadataWithGenerationToChecksumMap()); + } + /** + * Verifies the writeContent method of the TranslogTransferMetadataHandler by writing the provided + * TranslogTransferMetadata to an OutputStreamIndexOutput, and then reading it back and comparing it + * to the original metadata. + * + * @param expectedMetadata the expected TranslogTransferMetadata to be written and verified + * @throws IOException if there is an error writing or reading the metadata + */ + private void verifyWriteContent(TranslogTransferMetadata expectedMetadata) throws IOException { // Operation: Write expected metadata to the target output stream. BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); @@ -55,9 +104,11 @@ public void testWriteContent() throws IOException { long generation = indexInput.readLong(); long minTranslogGeneration = indexInput.readLong(); Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + Map generationToChecksumMapper = indexInput.readMapOfStrings(); int count = generationToPrimaryTermMapper.size(); TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + actualMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); assertEquals(expectedMetadata, actualMetadata); } @@ -76,18 +127,36 @@ private TranslogTransferMetadata getTestMetadata() { return metadata; } - private byte[] getTestMetadataBytes() throws IOException { + private TranslogTransferMetadata getTestMetadataWithGenerationToChecksumMap() { TranslogTransferMetadata metadata = getTestMetadata(); + Map generationToChecksumMapper = Map.of( + String.valueOf(300), + String.valueOf(1234), + String.valueOf(400), + String.valueOf(4567) + ); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + return metadata; + } + /** + * Creates a byte array representation of the provided TranslogTransferMetadata instance, which includes + * the primary term, generation, minimum translog generation, generation-to-primary term mapping, and + * generation-to-checksum mapping (if available). + * + * @param metadata the TranslogTransferMetadata instance to be converted to a byte array + * @return the byte array representation of the TranslogTransferMetadata + * @throws IOException if there is an error writing the metadata to the byte array + */ + private byte[] getTestMetadataBytes(TranslogTransferMetadata metadata) throws IOException { BytesStreamOutput output = new BytesStreamOutput(); - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); - indexOutput.writeLong(metadata.getPrimaryTerm()); - indexOutput.writeLong(metadata.getGeneration()); - indexOutput.writeLong(metadata.getMinTranslogGeneration()); - Map generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper(); - indexOutput.writeMapOfStrings(generationToPrimaryTermMapper); - indexOutput.close(); - + try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096)) { + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + indexOutput.writeMapOfStrings(metadata.getGenerationToPrimaryTermMapper()); + if (metadata.getGenerationToChecksumMapper() != null) indexOutput.writeMapOfStrings(metadata.getGenerationToChecksumMapper()); + } return BytesReference.toBytes(output.bytes()); } }