From 72e2e7b15e7cc3d981f956c8b015e92ba7ccf19d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 30 Jan 2019 14:54:51 -0500 Subject: [PATCH] Feedback --- .../index/seqno/ReplicationTracker.java | 41 ++++++++++--------- .../elasticsearch/index/shard/IndexShard.java | 6 +-- ...ReplicationTrackerRetentionLeaseTests.java | 4 ++ .../seqno/ReplicationTrackerTestCase.java | 1 + .../index/seqno/ReplicationTrackerTests.java | 7 ++-- .../index/engine/EngineTestCase.java | 1 + 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 43e909afc93ee..bc51bc7b67164 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -99,25 +99,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private volatile long operationPrimaryTerm; - /** - * Returns the current operation primary term. - * - * @return the primary term - */ - public long getOperationPrimaryTerm() { - return operationPrimaryTerm; - } - - /** - * Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That - * is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance. - * - * @param operationPrimaryTerm the new operation primary term - */ - public void setOperationPrimaryTerm(final long operationPrimaryTerm) { - this.operationPrimaryTerm = operationPrimaryTerm; - } - /** * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff} * and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the @@ -434,6 +415,25 @@ public boolean isPrimaryMode() { return primaryMode; } + /** + * Returns the current operation primary term. + * + * @return the primary term + */ + public long getOperationPrimaryTerm() { + return operationPrimaryTerm; + } + + /** + * Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That + * is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance. + * + * @param operationPrimaryTerm the new operation primary term + */ + public void setOperationPrimaryTerm(final long operationPrimaryTerm) { + this.operationPrimaryTerm = operationPrimaryTerm; + } + /** * Returns whether the replication tracker has relocated away to another shard copy. */ @@ -553,6 +553,7 @@ private static long inSyncCheckpointStates( * @param shardId the shard ID * @param allocationId the allocation ID * @param indexSettings the index settings + * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires */ @@ -560,6 +561,7 @@ public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, + final long operationPrimaryTerm, final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, @@ -568,6 +570,7 @@ public ReplicationTracker( assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; this.primaryMode = false; + this.operationPrimaryTerm = operationPrimaryTerm; this.handoffInProgress = false; this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 61b7917ff501e..42822942e3adf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -306,6 +306,8 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); + final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); + this.pendingPrimaryTerm = primaryTerm; this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger); final ReplicationTracker replicationTracker = @@ -313,6 +315,7 @@ public IndexShard( shardId, aId, indexSettings, + primaryTerm, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, @@ -337,9 +340,6 @@ public boolean shouldCache(Query query) { } indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool); searcherWrapper = indexSearcherWrapper; - final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); - this.pendingPrimaryTerm = primaryTerm; - replicationTracker.setOperationPrimaryTerm(primaryTerm); refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 90eb162374469..d7f135ffe4816 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, @@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, @@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, @@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { new ShardId("test", "_na", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index a36006a5fc4c1..5165f2e8dc9e4 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -45,6 +45,7 @@ ReplicationTracker newTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + randomNonNegativeLong(), UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index b61e3f647b9d2..7731f3cbf1d5f 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -683,15 +683,16 @@ public void testPrimaryContextHandoff() throws IOException { final ShardId shardId = new ShardId("test", "_na_", 0); FakeClusterState clusterState = initialState(); - final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); + final AllocationId aId = clusterState.routingTable.primaryShard().allocationId(); final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; + final long primaryTerm = randomNonNegativeLong(); final long globalCheckpoint = UNASSIGNED_SEQ_NO; final BiConsumer, ActionListener> onNewRetentionLease = (leases, listener) -> {}; ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); ReplicationTracker newPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 35667b0f87a1c..d893168b08205 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -655,6 +655,7 @@ public EngineConfig config( shardId, allocationId.getId(), indexSettings, + randomNonNegativeLong(), SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L,