Skip to content

Commit

Permalink
Address PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Sep 1, 2024
1 parent bdda0cb commit 3a31765
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand All @@ -25,6 +29,8 @@
*/
public class TranslogTransferMetadata {

public static final Logger logger = LogManager.getLogger(TranslogTransferMetadata.class);

private final long primaryTerm;

private final long generation;
Expand Down Expand Up @@ -130,12 +136,20 @@ public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename

public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length != 7) {
if (tokens.length < 7) {
// For versions < 2.17, we don't have min translog generation.
return null;
}

return new Tuple<>(RemoteStoreUtils.invertLong(tokens[5]), RemoteStoreUtils.invertLong(tokens[2]));
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -56,6 +57,7 @@

import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -121,6 +123,59 @@ public void setUp() throws Exception {
remoteStorePinnedTimestampServiceSpy.start();
}

public void testGetMinMaxTranslogGenerationFromFilename() throws Exception {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
ArrayList<Translog.Operation> ops = new ArrayList<>();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 }));

CountDownLatch latch = new CountDownLatch(1);
blobStoreTransferService.listAllInSortedOrder(
getTranslogDirectory().add(METADATA_DIR),
"metadata",
Integer.MAX_VALUE,
new LatchedActionListener<>(new ActionListener<List<BlobMetadata>>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadataList) {
Long minGen = 1L;
Long maxGen = 6L;
for (BlobMetadata blobMetadata : blobMetadataList) {
Tuple<Long, Long> minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(blobMetadata.name());
assertEquals(minGen, minMaxGen.v1());
assertEquals(maxGen, minMaxGen.v2());
maxGen -= 1;
}
}

@Override
public void onFailure(Exception e) {
// This means test failure
fail();
}
}, latch)
);
latch.await();

// Old format metadata file
String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1";
assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(oldFormatMdFilename));

// Node id containing separator
String nodeIdWithSeparator =
"metadata__9223372036438563903__9223372036854774799__9223370311919910393__node__1__9223372036438563958__1";
Tuple<Long, Long> minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator);
Long minGen = Long.MAX_VALUE - 9223372036438563958L;
assertEquals(minGen, minMaxGen.v1());

// Malformed md filename
String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__1";
assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName));
}

public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Exception {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
ArrayList<Translog.Operation> ops = new ArrayList<>();
Expand Down

0 comments on commit 3a31765

Please sign in to comment.