Skip to content

Commit

Permalink
Fix issue of red index on close for remote enabled clusters (opensear…
Browse files Browse the repository at this point in the history
…ch-project#15990)

* Fix red index on close for remote translog

Signed-off-by: Ashish Singh <ssashish@amazon.com>

* Add UTs

Signed-off-by: Ashish Singh <ssashish@amazon.com>

---------

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored Sep 25, 2024
1 parent 0617d95 commit f1acc7a
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
Expand Down Expand Up @@ -1011,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog {
// min generation referred by last uploaded translog
protected volatile long minRemoteGenReferenced;

// the max global checkpoint that has been synced
protected volatile long globalCheckpointSynced;

// clean up translog folder uploaded by previous primaries once
protected final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

Expand Down Expand Up @@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
config.getNodeId()
).build()
) {
Checkpoint checkpoint = current.getLastSyncedCheckpoint();
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
);
} finally {
syncPermit.release(SYNC_PERMIT);
Expand Down Expand Up @@ -474,7 +478,10 @@ public void sync() throws IOException {
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded()
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0);
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0)
// The below condition on GCP exists to handle global checkpoint updates during close index.
// Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989
|| (current.getLastSyncedCheckpoint().globalCheckpoint > globalCheckpointSynced);
}
}

Expand Down Expand Up @@ -674,17 +681,25 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

private final long maxSeqNo;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) {
private final long globalCheckpoint;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) {
this.generation = generation;
this.primaryTerm = primaryTerm;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
minRemoteGenReferenced = getMinFileGeneration();
// Update the global checkpoint only if the supplied global checkpoint is greater than it
// When a new writer is created the
if (globalCheckpoint > globalCheckpointSynced) {
globalCheckpointSynced = globalCheckpoint;
}
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
onMinRemoteGenReferencedChange();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,83 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
}

public void testSyncWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 }));

// Set a global checkpoint
long initialGlobalCheckpoint = 1L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that the globalCheckpointSynced is updated
assertEquals(initialGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);

// Update global checkpoint
long newGlobalCheckpoint = 2L;
globalCheckpoint.set(newGlobalCheckpoint);

// Add a new operation and sync
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 }));
translog.sync();

// Verify that the globalCheckpointSynced is updated to the new value
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public void testSyncNeededWithGlobalCheckpointUpdate() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Verify that sync is not needed
assertFalse(translog.syncNeeded());

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Verify that sync is now needed due to global checkpoint update
assertTrue(translog.syncNeeded());

// Sync again
translog.sync();

// Verify that sync is not needed after syncing
assertFalse(translog.syncNeeded());
}

public void testGlobalCheckpointUpdateDuringClose() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }));

// Set initial global checkpoint
long initialGlobalCheckpoint = 0L;
globalCheckpoint.set(initialGlobalCheckpoint);

// Sync the translog
translog.sync();

// Update global checkpoint
long newGlobalCheckpoint = 1L;
globalCheckpoint.set(newGlobalCheckpoint);

// Close the translog
translog.close();

// Verify that the last synced checkpoint includes the updated global checkpoint
assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint);
}

public class ThrowingBlobRepository extends FsRepository {

private final Environment environment;
Expand Down

0 comments on commit f1acc7a

Please sign in to comment.