Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Lucene soft-deletes in peer recovery #30522

Merged
merged 46 commits into from
Jun 21, 2018
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b18eb06
Introduce soft-deletes retention policy for peer recovery
dnhatn May 8, 2018
4b2e385
Don’t check local_checkpoint in commit
dnhatn May 12, 2018
176b497
Prefer synchronized methods
dnhatn May 14, 2018
9ae627c
Remove force-merge
dnhatn May 14, 2018
ff2215c
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 14, 2018
188138d
Add +1 in 1 place
dnhatn May 15, 2018
6d901bf
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 15, 2018
8a78f65
acquireTranslogRetentionLock -> acquireRetentionLockForPeerRecovery
dnhatn May 15, 2018
6fe8847
Cut over the translog snapshot
dnhatn May 15, 2018
cc2b3f0
testRecoveryWithOutOfOrderDelete with Lucene history
dnhatn May 15, 2018
0612a05
Adapt PrimaryReplicaSyncerTests test
dnhatn May 15, 2018
a61d00b
Mute two more tests
dnhatn May 16, 2018
f3f1fa2
Add discuss
dnhatn May 16, 2018
65b8458
comment
dnhatn May 16, 2018
fc3d7d1
Update testRecoveryWithOutOfOrderDelete
dnhatn May 16, 2018
bd1b8ac
Minor feedback
dnhatn May 16, 2018
b1e73aa
Fix recovery tests
dnhatn May 16, 2018
f7ea71c
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 17, 2018
04112c6
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 17, 2018
e34154a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 23, 2018
1531024
use retention policy to decide seq# or file-based
dnhatn May 23, 2018
86c3eba
Fix an engine test
dnhatn May 24, 2018
b3d0d5f
fix test - wait for flush
dnhatn May 24, 2018
8320647
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 24, 2018
6b95e21
bootstrap from max_seqno from an empty commit
dnhatn May 24, 2018
3be0e30
harden bootstrap
dnhatn May 24, 2018
ca3f781
naming
dnhatn May 24, 2018
65ede0b
getMaxExposedSeqNoToMergePolicy -> getMaxExposedSeqNo
dnhatn May 24, 2018
a0e58b9
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 25, 2018
c1e03d1
decouple min_retained seqno from merge policy
dnhatn May 28, 2018
c5ba76f
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 28, 2018
33be718
simplify tests
dnhatn May 28, 2018
c717bf7
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
df132e1
minSeqNo -> startingSeqNo
dnhatn May 31, 2018
1bcd443
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
f232c8a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn May 31, 2018
591d521
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 6, 2018
76a035f
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 7, 2018
78c0d92
Adjust testForceMergeWithSoftDeletesRetentionAndRecoverySource
dnhatn Jun 7, 2018
c30de4a
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 11, 2018
5ff18f9
overriddenOperations -> skippedOperations
dnhatn Jun 11, 2018
8a37126
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 15, 2018
88950b3
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 15, 2018
dbe0472
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 19, 2018
f9eeb90
Naming
dnhatn Jun 19, 2018
ccb6e80
Merge branch 'ccr' into safecommit-mergepolicy
dnhatn Jun 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
comment
  • Loading branch information
dnhatn committed May 16, 2018
commit 65b8458ddc79055593773c6eccbcb76014364acf
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
/*
* DISCUSS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, if you acquire the snapshot before you check the isSequenceNumberBasedRecovery lucene will not trim away anything since you prevented the policy from going forward. It has point in time semantics so I don't understand what you mean by stuff get's trimmed. Merged that are in-flight will have seen previous retention limits so that's fine. Pending merges will pick up their retention limits right before they execute which is also fine? I think they only issue I can see is that setCheckpointOfSafeCommit accepts a smaller value than the number of ops we keep around and then things go sideways. Otherwise we should be just fine? I must be missing something

Copy link
Member Author

@dnhatn dnhatn May 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw, I should have explained this better.

The problem is that the method isTranslogReadyForSequenceNumberBasedRecovery may see operations that are not protected by the retention policy, then decide to go with seq# based recovery. However, an actual snapshot for sending ops might not have those "unprotected" ops if the searcher is refreshed to the newly merged segment. I have a test to illustrate this scenario.

    public void testPeerRecoveryRetentionLock() throws Exception {
        AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
        try (Store store = createStore();
             InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
            MapperService mapper = createMapperService("test");

            engine.index(indexForDoc(createParsedDoc("1", null)));
            engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get()));
            engine.flush(); // The first segment

            engine.index(indexForDoc(createParsedDoc("2", null)));
            engine.flush(); // The second segment

            // Schedules a merge that claims the first segment.
            // This merge is scheduled before the retention lock and finished in between snapshot1 and snapshot2
            engine.blockMerges();
            globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
            engine.syncTranslog(); // Advances the safe commit

            Thread mergeThread = new Thread(() -> {
                try {
                    engine.forceMerge(false, 1, true, false, false);
                } catch (IOException e) {
                    throw new AssertionError(e);
                }
            });
            mergeThread.start();

            Closeable retentionLock = engine.acquireRetentionLockForPeerRecovery();
            Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit();

            // Assume the replica starts from seq# 0, isSequenceNumberBasedRecovery is true because the snapshot has seq# 0,1,2
            try (Translog.Snapshot snapshotToDecide = engine.newLuceneChangesSnapshot("test", mapper, 0, 2, false)) {
                assertThat(snapshotToDecide.next().seqNo(), equalTo(0L));
                assertThat(snapshotToDecide.next().seqNo(), equalTo(1L));
                assertThat(snapshotToDecide.next().seqNo(), equalTo(2L));
            }
            engine.unblockMerges();
            mergeThread.join();
            assertBusy(() -> assertThat(engine.getMergeStats().getCurrent(), equalTo(0L)));

            // This searcher opens a newly merged segment which does not have seq#0 and seq#1
            engine.refresh("test");
            try (Translog.Snapshot snapshotToSendOps = engine.newLuceneChangesSnapshot("test", mapper, 0, 2, false)) {
                assertThat(snapshotToSendOps.next().seqNo(), equalTo(0L)); // This fails
                assertThat(snapshotToSendOps.next().seqNo(), equalTo(1L));
                assertThat(snapshotToSendOps.next().seqNo(), equalTo(2L));
            }
            IOUtils.close(retentionLock, safeCommit);
        }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Simon that we should take different approach then the one we had with the translog, now that we reason about trimming soft deletes in terms seq# (the translog uses generation size + time). My suggestion is to use the following logic:

  1. Acquire a retention lock.
  2. Use the current min retained op from the retention policy to check if the request op range is above it. If so, we do a ops based recovery, if not we don't (even though, in theory we can get lucky and have some operations in lucene before the min retained op).

I also think we should not pre-flight the requested range like isTranslogReadyForSequenceNumberBasedRecovery does. If you the numbers indicate that an ops based recovery is possible, we go for it. We should then verify we managed to bring the target shard to where we need to be (local checkpoint >= the end of the ops range we send) and if not we fail the recovery. IMO we can start to rely on retention semantics and simplify the logic.

Note that this brings another interesting discussion: should we even support translog based ops based recovery? We should talk about it but I tend to say that we should but given it a whole different code path than the lucene based one. We will then use the soft delete setting to decide which path to follow and nothing else. Specifically - no fall back from lucene ops to translog ops. It's one or another.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I made this change as you suggested. However, we can use the min retained ops directly as the number of retained operations can be changed - it's a dynamic setting. I used the max_seq_no that has been exposed to the merge policy as the baseline. This value will be baked into the commit and bootstrap when opening an engine.

* There is a major difference when acquiring translog and Lucene retention lock.
* There is a major difference between translog and Lucene retention lock.
* Once translog retention lock is acquired, no translog operations will be trimmed.
* However, this is not true for Lucene retention lock if there are already pending or scheduled merges before the lock is acquired.
* The actual problem is the method `isTranslogReadyForSequenceNumberBasedRecovery` can return true but then the snapshot in phase2,
Expand All @@ -157,7 +157,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
*
* I see two options for this:
*
* 1. We keep the snapshot which is used in isTranslogReadyForSequenceNumberBasedRecovery, then concat it with a new snapshot
* 1. We keep the snapshot which is used in `isTranslogReadyForSequenceNumberBasedRecovery`, then concat it with a new snapshot
* in phase2. The combined snapshot is guaranteed to have all required operations.
* This requires a new method "reset" in Translog#Snapshot. However, I feel this not a clean solution.
*
Expand All @@ -166,8 +166,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
try (Closeable ignored = shard.acquireRetentionLockForPeerRecovery()) {
final long startingSeqNo;
final long requiredSeqNoRangeStart;
// DISCUSS: Most of cases, we will do sequence-based recovery even though file-based recovery will be better
// as we have to replay a large number of operations. Should we add a limit here when making the decision?
// DISCUSS: Most of the cases, we will do sequence-based recovery even though the file-based recovery will be better
// as we have to replay a large number of operations. Should we add a limit here when making this decision?
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecovery) {
Expand All @@ -186,7 +186,7 @@ public RecoveryResponse recoverToTarget() throws IOException {

// DISCUSS: startingSeqNo = 0;
// Operations in translog is limited by 12h or 512MB, but there are no limit on non-deleted documents in Lucene history
// `startingSeqNo` might replay many Lucene operations
// `startingSeqNo = 0` might replay a large number of operations.
startingSeqNo = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to get the min-consecutive-seq-id from lucene here since we should have it? it's the min seqId in the policy, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is to pass -1 and let the impl decide where to start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too I think we should just forgo the translog when we shift to lucene driven ops. That means that we need to set this to the local checkpoint in the lucene commint we copy. All previous ops will be part of the commit. The only reason why this is 0 is to create a "proper" history on the target in the form of a translog that is just as big as the local one. With a lucene backed history, this is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to make this in a follow-up.

// but we must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Expand Down