Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fold node metadata into new node storage #50741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ElasticsearchNodeCommand(String description) {
}

public static PersistedClusterStateService createPersistedClusterStateService(Path[] dataPaths) throws IOException {
final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths);
final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(dataPaths);
if (nodeMetaData == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
}
Expand Down
55 changes: 28 additions & 27 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -301,7 +302,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
ensureNoShardData(nodePaths);
}

this.nodeMetaData = loadOrCreateNodeMetaData(settings, logger, nodePaths);
this.nodeMetaData = loadNodeMetaData(settings, logger, nodePaths);

success = true;
} finally {
Expand Down Expand Up @@ -428,7 +429,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings
}
// now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an
// intermediate state would be pretty disastrous
loadOrCreateNodeMetaData(settings, logger, legacyNodeLock.getNodePaths());
loadNodeMetaData(settings, logger, legacyNodeLock.getNodePaths());
for (CheckedRunnable<IOException> upgradeAction : upgradeActions) {
upgradeAction.run();
}
Expand Down Expand Up @@ -497,36 +498,36 @@ private void maybeLogHeapDetails() {

/**
* scans the node paths and loads existing metaData file. If not found a new meta data will be generated
* and persisted into the nodePaths
*/
private static NodeMetaData loadOrCreateNodeMetaData(Settings settings, Logger logger,
NodePath... nodePaths) throws IOException {
private static NodeMetaData loadNodeMetaData(Settings settings, Logger logger,
NodePath... nodePaths) throws IOException {
final Path[] paths = Arrays.stream(nodePaths).map(np -> np.path).toArray(Path[]::new);

final Set<String> nodeIds = new HashSet<>();
for (final Path path : paths) {
final NodeMetaData metaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (metaData != null) {
nodeIds.add(metaData.nodeId());
}
}
if (nodeIds.size() > 1) {
throw new IllegalStateException(
"data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds);
}

NodeMetaData metaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths);
NodeMetaData metaData = PersistedClusterStateService.nodeMetaData(paths);
if (metaData == null) {
assert nodeIds.isEmpty() : nodeIds;
metaData = new NodeMetaData(generateNodeId(settings), Version.CURRENT);
} else {
assert nodeIds.equals(Collections.singleton(metaData.nodeId())) : nodeIds + " doesn't match " + metaData;
metaData = metaData.upgradeToCurrentVersion();
// load legacy metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we have to keep the node metadata around to prevent invalid downgrades, we may as well validate that it's consistent with the new-format metadata whether metaData is set or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean reading the node metadata even though it might not be required.
I would prefer to keep as is (but can change it if you feel strongly about this), as I would like for the Lucene index to be the only authoritative source. If for example we were to have a tool that changes the node id, it would have to update both Lucene and the old-style node meta here, and doing so atomically would be difficult.

final Set<String> nodeIds = new HashSet<>();
for (final Path path : paths) {
final NodeMetaData oldStyleMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (oldStyleMetaData != null) {
nodeIds.add(oldStyleMetaData.nodeId());
}
}
if (nodeIds.size() > 1) {
throw new IllegalStateException(
"data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds);
}
// load legacy metadata
final NodeMetaData legacyMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths);
if (legacyMetaData == null) {
assert nodeIds.isEmpty() : nodeIds;
metaData = new NodeMetaData(generateNodeId(settings), Version.CURRENT);
} else {
assert nodeIds.equals(Collections.singleton(legacyMetaData.nodeId())) : nodeIds + " doesn't match " + legacyMetaData;
metaData = legacyMetaData;
}
}

// we write again to make sure all paths have the latest state file
metaData = metaData.upgradeToCurrentVersion();
assert metaData.nodeVersion().equals(Version.CURRENT) : metaData.nodeVersion() + " != " + Version.CURRENT;
NodeMetaData.FORMAT.writeAndCleanup(metaData, paths);

return metaData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@

import joptsimple.OptionParser;
import joptsimple.OptionSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.PersistedClusterStateService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;

public class OverrideNodeVersionCommand extends ElasticsearchNodeCommand {
private static final Logger logger = LogManager.getLogger(OverrideNodeVersionCommand.class);

private static final String TOO_NEW_MESSAGE =
DELIMITER +
"\n" +
Expand Down Expand Up @@ -75,8 +71,7 @@ public OverrideNodeVersionCommand() {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException {
final Path[] nodePaths = Arrays.stream(toNodePaths(dataPaths)).map(p -> p.path).toArray(Path[]::new);
final NodeMetaData nodeMetaData
= new NodeMetaData.NodeMetaDataStateFormat(true).loadLatestState(logger, NamedXContentRegistry.EMPTY, nodePaths);
final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths);
if (nodeMetaData == null) {
throw new ElasticsearchException(NO_METADATA_MESSAGE);
}
Expand All @@ -94,7 +89,7 @@ protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet o
.replace("V_NEW", nodeMetaData.nodeVersion().toString())
.replace("V_CUR", Version.CURRENT.toString()));

NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeMetaData.nodeId(), Version.CURRENT), nodePaths);
PersistedClusterStateService.overrideVersion(Version.CURRENT, dataPaths);

terminal.println(SUCCESS_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -114,6 +115,9 @@ public void start(Settings settings, TransportService transportService, ClusterS
} else {
metaStateService.deleteAll(); // delete legacy files
}
// write legacy node metadata to prevent accidental downgrades from spawning empty cluster state
NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(persistedClusterStateService.getNodeId(), Version.CURRENT),
persistedClusterStateService.getDataPaths());
success = true;
} finally {
if (success == false) {
Expand All @@ -126,8 +130,27 @@ public void start(Settings settings, TransportService transportService, ClusterS
throw new ElasticsearchException("failed to load metadata", e);
}
} else {
persistedState.set(
new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build()));
final long currentTerm = 0L;
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build();
if (persistedClusterStateService.getDataPaths().length > 0) {
// write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with
// cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state
try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) {
persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}
try {
// delete legacy cluster state files
metaStateService.deleteAll();
// write legacy node metadata to prevent downgrades from spawning empty cluster state
NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(persistedClusterStateService.getNodeId(), Version.CURRENT),
persistedClusterStateService.getDataPaths());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
persistedState.set(new InMemoryPersistedState(currentTerm, clusterState));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
Expand All @@ -64,6 +65,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.index.Index;

import java.io.Closeable;
Expand Down Expand Up @@ -155,17 +157,7 @@ public Writer createWriter() throws IOException {
final Directory directory = createDirectory(path.resolve(METADATA_DIRECTORY_NAME));
closeables.add(directory);

final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer());
// start empty since we re-write the whole cluster state to ensure it is all using the same format version
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
// only commit when specifically instructed, we must not write any intermediate states
indexWriterConfig.setCommitOnClose(false);
// most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
indexWriterConfig.setRAMBufferSizeMB(1.0);
// merge on the write thread (e.g. while flushing)
indexWriterConfig.setMergeScheduler(new SerialMergeScheduler());

final IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig);
final IndexWriter indexWriter = createIndexWriter(directory, false);
closeables.add(indexWriter);
metaDataIndexWriters.add(new MetaDataIndexWriter(directory, indexWriter));
}
Expand All @@ -178,6 +170,20 @@ public Writer createWriter() throws IOException {
return new Writer(metaDataIndexWriters, nodeId, bigArrays);
}

private static IndexWriter createIndexWriter(Directory directory, boolean openExisting) throws IOException {
final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer());
// start empty since we re-write the whole cluster state to ensure it is all using the same format version
indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE);
// only commit when specifically instructed, we must not write any intermediate states
indexWriterConfig.setCommitOnClose(false);
// most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
indexWriterConfig.setRAMBufferSizeMB(1.0);
// merge on the write thread (e.g. while flushing)
indexWriterConfig.setMergeScheduler(new SerialMergeScheduler());

return new IndexWriter(directory, indexWriterConfig);
}

/**
* Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open
* {@link Writer} on these paths.
Expand All @@ -196,6 +202,10 @@ Directory createDirectory(Path path) throws IOException {
return new SimpleFSDirectory(path);
}

public Path[] getDataPaths() {
return dataPaths;
}

public static class OnDiskState {
private static final OnDiskState NO_ON_DISK_STATE = new OnDiskState(null, null, 0L, 0L, MetaData.EMPTY_META_DATA);

Expand All @@ -218,6 +228,66 @@ public boolean empty() {
}
}

/**
* Returns the node metadata for the given data paths, and checks if the node ids are unique
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
* @param dataPaths the data paths to scan
*/
@Nullable
public static NodeMetaData nodeMetaData(Path... dataPaths) throws IOException {
String nodeId = null;
Version version = null;
for (final Path dataPath : dataPaths) {
final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
if (Files.exists(indexPath)) {
try (DirectoryReader reader = DirectoryReader.open(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) {
final Map<String, String> userData = reader.getIndexCommit().getUserData();
assert userData.get(NODE_VERSION_KEY) != null;

final String thisNodeId = userData.get(NODE_ID_KEY);
assert thisNodeId != null;
if (nodeId != null && nodeId.equals(thisNodeId) == false) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("unexpected node ID in metadata, found [" + thisNodeId +
"] in [" + dataPath + "] but expected [" + nodeId + "]");
} else if (nodeId == null) {
nodeId = thisNodeId;
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
version = Version.fromId(Integer.parseInt(userData.get(NODE_VERSION_KEY)));
}
} catch (IndexNotFoundException e) {
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
}
}
}
if (nodeId == null) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
return new NodeMetaData(nodeId, version);
}

/**
* Overrides the version field for the metadata in the given data path
*/
public static void overrideVersion(Version newVersion, Path... dataPaths) throws IOException {
for (final Path dataPath : dataPaths) {
final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
if (Files.exists(indexPath)) {
try (DirectoryReader reader = DirectoryReader.open(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) {
final Map<String, String> userData = reader.getIndexCommit().getUserData();
assert userData.get(NODE_VERSION_KEY) != null;

try (IndexWriter indexWriter =
createIndexWriter(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)), true)) {
final Map<String, String> commitData = new HashMap<>(userData);
commitData.put(NODE_VERSION_KEY, Integer.toString(newVersion.id));
indexWriter.setLiveCommitData(commitData.entrySet());
indexWriter.commit();
}
} catch (IndexNotFoundException e) {
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
}
}
}
}

/**
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -439,8 +439,7 @@ private void newAllocationId(ShardPath shardPath, Terminal terminal) throws IOEx
private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale)
throws IOException {
final Path nodePath = getNodePath(shardPath);
final NodeMetaData nodeMetaData =
NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodePath);
final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePath);

if (nodeMetaData == null) {
throw new ElasticsearchException("No node meta data at " + nodePath);
Expand All @@ -463,7 +462,8 @@ private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean

private Path getNodePath(ShardPath shardPath) {
final Path nodePath = shardPath.getDataPath().getParent().getParent().getParent();
if (Files.exists(nodePath) == false || Files.exists(nodePath.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) {
if (Files.exists(nodePath) == false ||
Files.exists(nodePath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)) == false) {
throw new ElasticsearchException("Unable to resolve node path for " + shardPath);
}
return nodePath;
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.gateway.GatewayModule;
Expand Down Expand Up @@ -698,6 +700,11 @@ public Node start() throws NodeValidationException {
if (Assertions.ENABLED) {
try {
assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
nodeEnvironment.nodeDataPaths());
assert nodeMetaData != null;
assert nodeMetaData.nodeVersion().equals(Version.CURRENT);
assert nodeMetaData.nodeId().equals(localNodeFactory.getNode().getId());
} catch (IOException e) {
assert false : e;
}
Expand Down
Loading