Skip to content

Commit

Permalink
Require freshest state to have the freshest current term
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 5, 2019
1 parent 79fc46b commit 85a7424
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ private OnDiskState loadBestOnDiskState() throws IOException {
long maxAcceptedTerm = bestOnDiskState.metaData.coordinationMetaData().term();
if (acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
&& onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion)) {

&& (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
bestOnDiskState = onDiskState;
}
} catch (IndexNotFoundException e) {
Expand All @@ -213,7 +214,12 @@ private OnDiskState loadBestOnDiskState() throws IOException {
}
}

return new OnDiskState(bestOnDiskState.nodeId, maxCurrentTerm, bestOnDiskState.lastAcceptedVersion, bestOnDiskState.metaData);
if (bestOnDiskState.currentTerm != maxCurrentTerm) {
throw new IllegalStateException("inconsistent terms found: best state is in term [" + bestOnDiskState.currentTerm +
"] but there is a stale state with greater term [" + maxCurrentTerm + "]");
}

return bestOnDiskState;
}

private static Path getMetaDataIndexPath(Path path, int majorVersion) {
Expand All @@ -234,7 +240,7 @@ private OnDiskState loadOnDiskState(DirectoryReader reader) throws IOException {
{
final MetaData metaData = MetaData.fromXContent(XContentFactory.xContent(XContentType.SMILE)
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes));
logger.trace("found global metadata");
logger.trace("found global metadata with last-accepted term [{}]", metaData.coordinationMetaData().term());
builderReference.set(MetaData.builder(metaData));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,60 @@ public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException {
}
}

public void testFailsIfFreshestStateIsInStaleTerm() throws IOException {
final Path[] dataPaths1 = createDataPaths();
final Path[] dataPaths2 = createDataPaths();
final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new);

final long staleCurrentTerm = randomLongBetween(1L, Long.MAX_VALUE - 1);
final long freshCurrentTerm = randomLongBetween(staleCurrentTerm + 1, Long.MAX_VALUE);

final long freshTerm = randomLongBetween(1L, Long.MAX_VALUE);
final long staleTerm = randomBoolean() ? freshTerm : randomLongBetween(1L, freshTerm);
final long freshVersion = randomLongBetween(2L, Long.MAX_VALUE);
final long staleVersion = staleTerm == freshTerm ? randomLongBetween(1L, freshVersion - 1) : randomLongBetween(1L, Long.MAX_VALUE);

try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) {
try (CoordinationState.PersistedState persistedState
= loadPersistedState(new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry()))) {
final ClusterState clusterState = persistedState.getLastAcceptedState();
persistedState.setCurrentTerm(staleCurrentTerm);
persistedState.setLastAcceptedState(ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData()).version(1)
.coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build()))
.version(staleVersion)
.build());
}
}

try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) {
try (CoordinationState.PersistedState persistedState
= loadPersistedState(new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry()))) {
persistedState.setCurrentTerm(freshCurrentTerm);
}
}

try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) {
try (CoordinationState.PersistedState persistedState
= loadPersistedState(new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry()))) {
final ClusterState clusterState = persistedState.getLastAcceptedState();
persistedState.setLastAcceptedState(ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData()).version(2)
.coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build()))
.version(freshVersion)
.build());
}
}

try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) {
assertThat(expectThrows(IllegalStateException.class,
() -> loadPersistedState(new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry()))).getMessage(), allOf(
containsString("inconsistent terms found"),
containsString(Long.toString(staleCurrentTerm)),
containsString(Long.toString(freshCurrentTerm))));
}
}

public void testFailsGracefullyOnExceptionDuringFlush() throws IOException {
final AtomicBoolean throwException = new AtomicBoolean();

Expand Down

0 comments on commit 85a7424

Please sign in to comment.