Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Lucene-based metadata persistence #48733

Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
17ec60e
Introduce Lucene-based metadata persistence
DaveCTurner Oct 31, 2019
669d3b2
Revert whitespace
DaveCTurner Oct 31, 2019
9ac94cb
Use IOUtils.close
DaveCTurner Oct 31, 2019
7d65358
Support persistent storage in coordinator tests as long as there is n…
DaveCTurner Nov 1, 2019
dea94dd
Proper awaits fixes
DaveCTurner Nov 1, 2019
b431a1d
Fix up GatewayIndexStateIT
DaveCTurner Nov 1, 2019
6526b82
Precommit
DaveCTurner Nov 1, 2019
ce19ee0
Merge branch 'reduce-metadata-writes-master' into 2019-10-31-lucene-p…
DaveCTurner Nov 1, 2019
79fc46b
Merge branch 'master' into 2019-10-31-lucene-persisted-state
DaveCTurner Nov 5, 2019
85a7424
Require freshest state to have the freshest current term
DaveCTurner Nov 5, 2019
2a736d5
Merge branch 'master' into 2019-10-31-lucene-persisted-state
DaveCTurner Nov 6, 2019
39b0358
Fix up test for previous commit
DaveCTurner Nov 6, 2019
fa22424
Check that updating the state also works
DaveCTurner Nov 6, 2019
fc80816
Use stored fields instead of docvalues
DaveCTurner Nov 6, 2019
9fbf557
SerialMergeScheduler
DaveCTurner Nov 8, 2019
aa02a22
No query cache
DaveCTurner Nov 8, 2019
3d34c6d
Reduce scope
DaveCTurner Nov 8, 2019
1027c8c
Fix doc ID handling
DaveCTurner Nov 8, 2019
8767e91
Add comment about SimpleFSDirectory
DaveCTurner Nov 8, 2019
02761e6
Start with an empty index
DaveCTurner Nov 8, 2019
2bf23b6
Write each doc to all indices at once
DaveCTurner Nov 8, 2019
49bf7b4
Merge branch 'master' into 2019-10-31-lucene-persisted-state
DaveCTurner Nov 11, 2019
bb562f8
1MB buffer (and comments)
DaveCTurner Nov 11, 2019
bc2545c
FSDirectory.open instead of SimpleFSDirectory
DaveCTurner Nov 11, 2019
983825d
Use BigArrays etc to avoid excessive allocation/copying
DaveCTurner Nov 11, 2019
e2cd38d
Keep releasable bytes alive after closing the XContentBuilder
DaveCTurner Nov 11, 2019
263f0bd
Allow for sliced BytesRefs
DaveCTurner Nov 11, 2019
7afabca
Detect bad closing behaviour using MockBigArrays
DaveCTurner Nov 11, 2019
79e36ed
Unused GatewayPersistedState
DaveCTurner Nov 12, 2019
0639b33
IOUtils.close not null check
DaveCTurner Nov 12, 2019
7c1831d
MetaDataIndexWriter
DaveCTurner Nov 12, 2019
4aa99f9
Use proper settings not just their names
DaveCTurner Nov 12, 2019
314cadb
Capitalize D in MetaData in identifiers
DaveCTurner Nov 12, 2019
455fd10
Comment
DaveCTurner Nov 12, 2019
3c5d079
Add test showing behaviour across multiple indices
DaveCTurner Nov 12, 2019
492b3a7
No need for a versioned folder (yet?)
DaveCTurner Nov 12, 2019
d2e4b70
Record paths in ISE messages
DaveCTurner Nov 12, 2019
773b9f1
Reject duplicate/missing bits even if assertions disabled
DaveCTurner Nov 12, 2019
3578297
Right size for hashmap
DaveCTurner Nov 12, 2019
e6e4d32
Use SimpleFSDirectory again
DaveCTurner Nov 12, 2019
7fa29f9
Use updateDocument instead of delete/add
DaveCTurner Nov 12, 2019
21e154f
super
DaveCTurner Nov 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/issues/48701" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -432,15 +434,14 @@ public void invariant() {
assert publishVotes.isEmpty() || electionWon();
}

public void close() {
public void close() throws IOException {
persistedState.close();
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
*/
public interface PersistedState {
public interface PersistedState extends Closeable {

/**
* Returns the current term
Expand Down Expand Up @@ -497,7 +498,8 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default void close() {}
default void close() throws IOException {
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -702,7 +703,7 @@ protected void doStop() {
}

@Override
protected void doClose() {
protected void doClose() throws IOException {
final CoordinationState coordinationState = this.coordinationState.get();
if (coordinationState != null) {
// This looks like a race that might leak an unclosed CoordinationState if it's created while execution is here, but this method
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.gateway.LucenePersistedStateFactory;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -380,15 +381,24 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings

// determine folders to move and check that there are no extra files/folders
final Set<String> folderNames = new HashSet<>();
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(

// node state directory, also containing MetaDataStateFormat-based global metadata
MetaDataStateFormat.STATE_DIR_NAME,

// Lucene-based metadata folder
LucenePersistedStateFactory.getMetaDataIndexDirectoryName(Version.CURRENT.major),
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

// indices
INDICES_FOLDER));

try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
for (Path subFolderPath : stream) {
final String fileName = subFolderPath.getFileName().toString();
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
// ignore
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
if (fileName.equals(INDICES_FOLDER) == false && // indices folder
fileName.equals(MetaDataStateFormat.STATE_DIR_NAME) == false) { // global metadata & node state folder
if (expectedFolderNames.contains(fileName) == false) {
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
subFolderPath);
}
Expand All @@ -406,7 +416,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings
}
}

assert Sets.difference(folderNames, Sets.newHashSet(INDICES_FOLDER, MetaDataStateFormat.STATE_DIR_NAME)).isEmpty() :
assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
"expected indices and/or state dir folder but was " + folderNames;

upgradeActions.add(() -> {
Expand Down
127 changes: 46 additions & 81 deletions server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand All @@ -43,10 +42,12 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -63,7 +64,7 @@
* ClusterState#metaData()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
* non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
*/
public class GatewayMetaState {
public class GatewayMetaState implements Closeable {
private static final Logger logger = LogManager.getLogger(GatewayMetaState.class);

// Set by calling start()
Expand All @@ -81,49 +82,46 @@ public MetaData getMetaData() {

public void start(Settings settings, TransportService transportService, ClusterService clusterService,
MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) {
MetaDataUpgrader metaDataUpgrader, LucenePersistedStateFactory lucenePersistedStateFactory) {
assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();

final Tuple<Manifest, ClusterState> manifestClusterStateTuple;
try {
upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader);
manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
if (DiscoveryNode.isMasterNode(settings)) {
try {
persistedState.set(lucenePersistedStateFactory.loadPersistedState((version, metadata) ->
prepareInitialClusterState(transportService, clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.version(version)
.metaData(upgradeMetaDataForMasterEligibleNode(metadata, metaDataIndexUpgradeService, metaDataUpgrader))
.build())));
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}
}

final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
if (DiscoveryNode.isDataNode(settings)) {
final Tuple<Manifest, ClusterState> manifestClusterStateTuple;
try {
upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader);
manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}

final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
transportService.getThreadPool()::relativeTimeInMillis);
if (DiscoveryNode.isMasterNode(settings) == false) {
if (DiscoveryNode.isDataNode(settings)) {
// Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
// vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when
// _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool.
//
// In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of
// an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory,
// including the metadata, and does so on the cluster applier thread.
//
// This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race
// between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated
// metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted
// state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is
// inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the
// cluster state, which is what this does:
clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter));
}

// Master-ineligible nodes do not need to persist the cluster state when accepting it because they are not in the voting
// configuration, so it's ok if they have a stale or incomplete cluster state when restarted. We track the latest cluster state
// in memory instead.
persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2()));
} else {
// Master-ineligible nodes must persist the cluster state when accepting it because they must reload the (complete, fresh)
// last-accepted cluster state when restarted.
persistedState.set(new GatewayPersistedState(incrementalClusterStateWriter));
clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter));

if (DiscoveryNode.isMasterNode(settings) == false) {
persistedState.set(
new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2()));
}
} else if (DiscoveryNode.isMasterNode(settings) == false) {
persistedState.set(
new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build()));
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -139,6 +137,13 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust
.apply(clusterState);
}

// exposed so it can be overridden by tests
MetaData upgradeMetaDataForMasterEligibleNode(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) {
return upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
}

// exposed so it can be overridden by tests
void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws IOException {
Expand Down Expand Up @@ -252,6 +257,10 @@ private static boolean applyPluginUpgraders(ImmutableOpenMap<String, IndexTempla
return false;
}

@Override
public void close() throws IOException {
IOUtils.close(persistedState.get());
}

private static class GatewayClusterApplier implements ClusterStateApplier {

Expand Down Expand Up @@ -285,48 +294,4 @@ public void applyClusterState(ClusterChangedEvent event) {

}

private static class GatewayPersistedState implements PersistedState {

private final IncrementalClusterStateWriter incrementalClusterStateWriter;

GatewayPersistedState(IncrementalClusterStateWriter incrementalClusterStateWriter) {
this.incrementalClusterStateWriter = incrementalClusterStateWriter;
}

@Override
public long getCurrentTerm() {
return incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm();
}

@Override
public ClusterState getLastAcceptedState() {
final ClusterState previousClusterState = incrementalClusterStateWriter.getPreviousClusterState();
assert previousClusterState.nodes().getLocalNode() != null : "Cluster state is not fully built yet";
return previousClusterState;
}

@Override
public void setCurrentTerm(long currentTerm) {
try {
incrementalClusterStateWriter.setCurrentTerm(currentTerm);
} catch (WriteStateException e) {
logger.error(new ParameterizedMessage("Failed to set current term to {}", currentTerm), e);
e.rethrowAsErrorOrUncheckedException();
}
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
incrementalClusterStateWriter.setIncrementalWrite(
incrementalClusterStateWriter.getPreviousClusterState().term() == clusterState.term());
incrementalClusterStateWriter.updateClusterState(clusterState);
} catch (WriteStateException e) {
logger.error(new ParameterizedMessage("Failed to set last accepted state with version {}", clusterState.version()), e);
e.rethrowAsErrorOrUncheckedException();
}
}

}

}
Loading