diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index 119edf32f1ddb..7734a61d40ad6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -72,7 +72,7 @@ public ElasticsearchNodeCommand(String description) { } public static PersistedClusterStateService createPersistedClusterStateService(Path[] dataPaths) throws IOException { - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(dataPaths); if (nodeMetaData == null) { throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG); } diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index cbc6247eff785..9164f6f23a8cd 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -301,7 +302,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce ensureNoShardData(nodePaths); } - this.nodeMetaData = loadOrCreateNodeMetaData(settings, logger, nodePaths); + this.nodeMetaData = loadNodeMetaData(settings, logger, nodePaths); success = true; } finally { @@ -428,7 +429,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings } // now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an // intermediate state would be pretty disastrous - loadOrCreateNodeMetaData(settings, logger, legacyNodeLock.getNodePaths()); + loadNodeMetaData(settings, logger, legacyNodeLock.getNodePaths()); for (CheckedRunnable upgradeAction : upgradeActions) { upgradeAction.run(); } @@ -497,36 +498,36 @@ private void maybeLogHeapDetails() { /** * scans the node paths and loads existing metaData file. If not found a new meta data will be generated - * and persisted into the nodePaths */ - private static NodeMetaData loadOrCreateNodeMetaData(Settings settings, Logger logger, - NodePath... nodePaths) throws IOException { + private static NodeMetaData loadNodeMetaData(Settings settings, Logger logger, + NodePath... nodePaths) throws IOException { final Path[] paths = Arrays.stream(nodePaths).map(np -> np.path).toArray(Path[]::new); - - final Set nodeIds = new HashSet<>(); - for (final Path path : paths) { - final NodeMetaData metaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); - if (metaData != null) { - nodeIds.add(metaData.nodeId()); - } - } - if (nodeIds.size() > 1) { - throw new IllegalStateException( - "data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds); - } - - NodeMetaData metaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths); + NodeMetaData metaData = PersistedClusterStateService.nodeMetaData(paths); if (metaData == null) { - assert nodeIds.isEmpty() : nodeIds; - metaData = new NodeMetaData(generateNodeId(settings), Version.CURRENT); - } else { - assert nodeIds.equals(Collections.singleton(metaData.nodeId())) : nodeIds + " doesn't match " + metaData; - metaData = metaData.upgradeToCurrentVersion(); + // load legacy metadata + final Set nodeIds = new HashSet<>(); + for (final Path path : paths) { + final NodeMetaData oldStyleMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); + if (oldStyleMetaData != null) { + nodeIds.add(oldStyleMetaData.nodeId()); + } + } + if (nodeIds.size() > 1) { + throw new IllegalStateException( + "data paths " + Arrays.toString(paths) + " belong to multiple nodes with IDs " + nodeIds); + } + // load legacy metadata + final NodeMetaData legacyMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths); + if (legacyMetaData == null) { + assert nodeIds.isEmpty() : nodeIds; + metaData = new NodeMetaData(generateNodeId(settings), Version.CURRENT); + } else { + assert nodeIds.equals(Collections.singleton(legacyMetaData.nodeId())) : nodeIds + " doesn't match " + legacyMetaData; + metaData = legacyMetaData; + } } - - // we write again to make sure all paths have the latest state file + metaData = metaData.upgradeToCurrentVersion(); assert metaData.nodeVersion().equals(Version.CURRENT) : metaData.nodeVersion() + " != " + Version.CURRENT; - NodeMetaData.FORMAT.writeAndCleanup(metaData, paths); return metaData; } diff --git a/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java b/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java index aa2235e2a252b..52c2a9cb366d5 100644 --- a/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java +++ b/server/src/main/java/org/elasticsearch/env/OverrideNodeVersionCommand.java @@ -20,21 +20,17 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cli.Terminal; import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.gateway.PersistedClusterStateService; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; public class OverrideNodeVersionCommand extends ElasticsearchNodeCommand { - private static final Logger logger = LogManager.getLogger(OverrideNodeVersionCommand.class); - private static final String TOO_NEW_MESSAGE = DELIMITER + "\n" + @@ -75,8 +71,7 @@ public OverrideNodeVersionCommand() { @Override protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException { final Path[] nodePaths = Arrays.stream(toNodePaths(dataPaths)).map(p -> p.path).toArray(Path[]::new); - final NodeMetaData nodeMetaData - = new NodeMetaData.NodeMetaDataStateFormat(true).loadLatestState(logger, NamedXContentRegistry.EMPTY, nodePaths); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths); if (nodeMetaData == null) { throw new ElasticsearchException(NO_METADATA_MESSAGE); } @@ -94,7 +89,7 @@ protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet o .replace("V_NEW", nodeMetaData.nodeVersion().toString()) .replace("V_CUR", Version.CURRENT.toString())); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeMetaData.nodeId(), Version.CURRENT), nodePaths); + PersistedClusterStateService.overrideVersion(Version.CURRENT, dataPaths); terminal.println(SUCCESS_MESSAGE); } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 9daf26a9d3af9..fa1ddbea7a8ff 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.transport.TransportService; @@ -114,6 +115,9 @@ public void start(Settings settings, TransportService transportService, ClusterS } else { metaStateService.deleteAll(); // delete legacy files } + // write legacy node metadata to prevent accidental downgrades from spawning empty cluster state + NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(persistedClusterStateService.getNodeId(), Version.CURRENT), + persistedClusterStateService.getDataPaths()); success = true; } finally { if (success == false) { @@ -126,8 +130,27 @@ public void start(Settings settings, TransportService transportService, ClusterS throw new ElasticsearchException("failed to load metadata", e); } } else { - persistedState.set( - new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build())); + final long currentTerm = 0L; + final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build(); + if (persistedClusterStateService.getDataPaths().length > 0) { + // write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with + // cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state + try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) { + persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState); + } catch (IOException e) { + throw new ElasticsearchException("failed to load metadata", e); + } + try { + // delete legacy cluster state files + metaStateService.deleteAll(); + // write legacy node metadata to prevent downgrades from spawning empty cluster state + NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(persistedClusterStateService.getNodeId(), Version.CURRENT), + persistedClusterStateService.getDataPaths()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + persistedState.set(new InMemoryPersistedState(currentTerm, clusterState)); } } diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index ab7b55056d337..b0f44632d0657 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; @@ -64,6 +65,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.index.Index; import java.io.Closeable; @@ -155,17 +157,7 @@ public Writer createWriter() throws IOException { final Directory directory = createDirectory(path.resolve(METADATA_DIRECTORY_NAME)); closeables.add(directory); - final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); - // start empty since we re-write the whole cluster state to ensure it is all using the same format version - indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); - // only commit when specifically instructed, we must not write any intermediate states - indexWriterConfig.setCommitOnClose(false); - // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer - indexWriterConfig.setRAMBufferSizeMB(1.0); - // merge on the write thread (e.g. while flushing) - indexWriterConfig.setMergeScheduler(new SerialMergeScheduler()); - - final IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig); + final IndexWriter indexWriter = createIndexWriter(directory, false); closeables.add(indexWriter); metaDataIndexWriters.add(new MetaDataIndexWriter(directory, indexWriter)); } @@ -178,6 +170,20 @@ public Writer createWriter() throws IOException { return new Writer(metaDataIndexWriters, nodeId, bigArrays); } + private static IndexWriter createIndexWriter(Directory directory, boolean openExisting) throws IOException { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + // start empty since we re-write the whole cluster state to ensure it is all using the same format version + indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE); + // only commit when specifically instructed, we must not write any intermediate states + indexWriterConfig.setCommitOnClose(false); + // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer + indexWriterConfig.setRAMBufferSizeMB(1.0); + // merge on the write thread (e.g. while flushing) + indexWriterConfig.setMergeScheduler(new SerialMergeScheduler()); + + return new IndexWriter(directory, indexWriterConfig); + } + /** * Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open * {@link Writer} on these paths. @@ -196,6 +202,10 @@ Directory createDirectory(Path path) throws IOException { return new SimpleFSDirectory(path); } + public Path[] getDataPaths() { + return dataPaths; + } + public static class OnDiskState { private static final OnDiskState NO_ON_DISK_STATE = new OnDiskState(null, null, 0L, 0L, MetaData.EMPTY_META_DATA); @@ -218,6 +228,66 @@ public boolean empty() { } } + /** + * Returns the node metadata for the given data paths, and checks if the node ids are unique + * @param dataPaths the data paths to scan + */ + @Nullable + public static NodeMetaData nodeMetaData(Path... dataPaths) throws IOException { + String nodeId = null; + Version version = null; + for (final Path dataPath : dataPaths) { + final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME); + if (Files.exists(indexPath)) { + try (DirectoryReader reader = DirectoryReader.open(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) { + final Map userData = reader.getIndexCommit().getUserData(); + assert userData.get(NODE_VERSION_KEY) != null; + + final String thisNodeId = userData.get(NODE_ID_KEY); + assert thisNodeId != null; + if (nodeId != null && nodeId.equals(thisNodeId) == false) { + throw new IllegalStateException("unexpected node ID in metadata, found [" + thisNodeId + + "] in [" + dataPath + "] but expected [" + nodeId + "]"); + } else if (nodeId == null) { + nodeId = thisNodeId; + version = Version.fromId(Integer.parseInt(userData.get(NODE_VERSION_KEY))); + } + } catch (IndexNotFoundException e) { + logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e); + } + } + } + if (nodeId == null) { + return null; + } + return new NodeMetaData(nodeId, version); + } + + /** + * Overrides the version field for the metadata in the given data path + */ + public static void overrideVersion(Version newVersion, Path... dataPaths) throws IOException { + for (final Path dataPath : dataPaths) { + final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME); + if (Files.exists(indexPath)) { + try (DirectoryReader reader = DirectoryReader.open(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)))) { + final Map userData = reader.getIndexCommit().getUserData(); + assert userData.get(NODE_VERSION_KEY) != null; + + try (IndexWriter indexWriter = + createIndexWriter(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)), true)) { + final Map commitData = new HashMap<>(userData); + commitData.put(NODE_VERSION_KEY, Integer.toString(newVersion.id)); + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.commit(); + } + } catch (IndexNotFoundException e) { + logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e); + } + } + } + } + /** * Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found. */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index 045c1197dc5a8..32e1b84c43131 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -51,7 +51,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetaData; -import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -439,8 +439,7 @@ private void newAllocationId(ShardPath shardPath, Terminal terminal) throws IOEx private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale) throws IOException { final Path nodePath = getNodePath(shardPath); - final NodeMetaData nodeMetaData = - NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodePath); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePath); if (nodeMetaData == null) { throw new ElasticsearchException("No node meta data at " + nodePath); @@ -463,7 +462,8 @@ private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean private Path getNodePath(ShardPath shardPath) { final Path nodePath = shardPath.getDataPath().getParent().getParent().getParent(); - if (Files.exists(nodePath) == false || Files.exists(nodePath.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) { + if (Files.exists(nodePath) == false || + Files.exists(nodePath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME)) == false) { throw new ElasticsearchException("Unable to resolve node path for " + shardPath); } return nodePath; diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index acd8f3f6cfdf3..b4ac9a83db65f 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -27,6 +27,7 @@ import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -90,6 +91,7 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.gateway.GatewayModule; @@ -698,6 +700,11 @@ public Node start() throws NodeValidationException { if (Assertions.ENABLED) { try { assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty(); + final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, + nodeEnvironment.nodeDataPaths()); + assert nodeMetaData != null; + assert nodeMetaData.nodeVersion().equals(Version.CURRENT); + assert nodeMetaData.nodeId().equals(localNodeFactory.getNode().getId()); } catch (IOException e) { assert false : e; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index 539d57400ecc0..f6dbb9f1510f6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.indices.IndicesService; @@ -134,13 +133,9 @@ public void testDetachNodeLocked() throws IOException { } } - public void testBootstrapNoNodeMetaData() throws IOException { + public void testBootstrapNoNodeMetaData() { Settings envSettings = buildEnvSettings(Settings.EMPTY); Environment environment = TestEnvironment.newEnvironment(envSettings); - try (NodeEnvironment nodeEnvironment = new NodeEnvironment(envSettings, environment)) { - NodeMetaData.FORMAT.cleanupOldFiles(-1, nodeEnvironment.nodeDataPaths()); - } - expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG); } @@ -175,7 +170,7 @@ public void testBootstrapNoClusterState() throws IOException { Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths()); - expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.CS_MISSING_MSG); + expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG); } public void testDetachNoClusterState() throws IOException { @@ -189,7 +184,7 @@ public void testDetachNoClusterState() throws IOException { Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build()); PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths()); - expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.CS_MISSING_MSG); + expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG); } public void testBootstrapAbortedByUser() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentIT.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentIT.java index baebd32dffcb0..40ef3a48c048f 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentIT.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESIntegTestCase; @@ -137,14 +138,14 @@ public Settings onNodeStopped(String nodeName) { public void testFailsToStartIfDowngraded() { final IllegalStateException illegalStateException = expectThrowsOnRestart(dataPaths -> - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(randomAlphaOfLength(10), NodeMetaDataTests.tooNewVersion()), dataPaths)); + PersistedClusterStateService.overrideVersion(NodeMetaDataTests.tooNewVersion(), dataPaths)); assertThat(illegalStateException.getMessage(), allOf(startsWith("cannot downgrade a node from version ["), endsWith("] to version [" + Version.CURRENT + "]"))); } public void testFailsToStartIfUpgradedTooFar() { final IllegalStateException illegalStateException = expectThrowsOnRestart(dataPaths -> - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(randomAlphaOfLength(10), NodeMetaDataTests.tooOldVersion()), dataPaths)); + PersistedClusterStateService.overrideVersion(NodeMetaDataTests.tooOldVersion(), dataPaths)); assertThat(illegalStateException.getMessage(), allOf(startsWith("cannot upgrade a node from version ["), endsWith("] directly to version [" + Version.CURRENT + "]"))); } @@ -238,10 +239,15 @@ public void testFailsToStartOnDataPathsFromMultipleNodes() throws IOException { internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(1))); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(0))); - final IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, + () -> PersistedClusterStateService.nodeMetaData(allDataPaths.stream().map(PathUtils::get).toArray(Path[]::new))); + + assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata")); + + illegalStateException = expectThrows(IllegalStateException.class, () -> internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), allDataPaths))); - assertThat(illegalStateException.getMessage(), containsString("belong to multiple nodes with IDs")); + assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata")); final List node0DataPathsPlusOne = new ArrayList<>(node0DataPaths); node0DataPathsPlusOne.add(createTempDir().toString()); diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 19100343d2b15..e273acfafc812 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -384,7 +384,7 @@ public void testCustomDataPaths() throws Exception { env.close(); } - public void testPersistentNodeId() throws IOException { + public void testNodeIdNotPersistedAtInitialization() throws IOException { NodeEnvironment env = newNodeEnvironment(new String[0], Settings.builder() .put("node.local_storage", false) .put("node.master", false) @@ -398,7 +398,7 @@ public void testPersistentNodeId() throws IOException { nodeID = env.nodeId(); env.close(); env = newNodeEnvironment(paths, Settings.EMPTY); - assertThat(env.nodeId(), equalTo(nodeID)); + assertThat(env.nodeId(), not(equalTo(nodeID))); env.close(); env = newNodeEnvironment(Settings.EMPTY); assertThat(env.nodeId(), not(equalTo(nodeID))); diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java index d54fb051cd606..c88a3f36da7a8 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.Index; import org.elasticsearch.node.Node; @@ -67,6 +68,11 @@ public void createNodePaths() throws IOException { environment = TestEnvironment.newEnvironment(dataMasterSettings); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(dataMasterSettings, environment)) { nodePaths = nodeEnvironment.nodeDataPaths(); + final String nodeId = randomAlphaOfLength(10); + try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId, + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { + writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE); + } } dataNoMasterSettings = Settings.builder() .put(dataMasterSettings) diff --git a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java index 08256620e5d57..620a8a365ec05 100644 --- a/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java +++ b/server/src/test/java/org/elasticsearch/env/OverrideNodeVersionCommandTests.java @@ -21,22 +21,21 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cli.MockTerminal; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.gateway.WriteStateException; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.test.ESTestCase; +import org.junit.After; import org.junit.Before; import java.io.IOException; import java.nio.file.Path; -import static org.elasticsearch.env.NodeMetaData.NODE_ID_KEY; -import static org.elasticsearch.env.NodeMetaData.NODE_VERSION_KEY; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -45,6 +44,7 @@ public class OverrideNodeVersionCommandTests extends ESTestCase { private Environment environment; private Path[] nodePaths; + private String nodeId; private final OptionSet noOptions = new OptionParser().parse(); @Before @@ -53,9 +53,23 @@ public void createNodePaths() throws IOException { environment = TestEnvironment.newEnvironment(settings); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment)) { nodePaths = nodeEnvironment.nodeDataPaths(); + nodeId = nodeEnvironment.nodeId(); + + try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId, + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { + writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder() + .persistentSettings(Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), true).build()).build()) + .build()); + } } } + @After + public void checkClusterStateIntact() throws IOException { + assertTrue(MetaData.SETTING_READ_ONLY_SETTING.get(new PersistedClusterStateService(nodePaths, nodeId, + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).loadBestOnDiskState().metaData.persistentSettings())); + } + public void testFailsOnEmptyPath() { final Path emptyPath = createTempDir(); final MockTerminal mockTerminal = new MockTerminal(); @@ -65,9 +79,9 @@ public void testFailsOnEmptyPath() { expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); } - public void testFailsIfUnnecessary() throws WriteStateException { + public void testFailsIfUnnecessary() throws IOException { final Version nodeVersion = Version.fromId(between(Version.CURRENT.minimumIndexCompatibilityVersion().id, Version.CURRENT.id)); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(randomAlphaOfLength(10), nodeVersion), nodePaths); + PersistedClusterStateService.overrideVersion(nodeVersion, nodePaths); final MockTerminal mockTerminal = new MockTerminal(); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment)); @@ -79,9 +93,8 @@ public void testFailsIfUnnecessary() throws WriteStateException { } public void testWarnsIfTooOld() throws Exception { - final String nodeId = randomAlphaOfLength(10); final Version nodeVersion = NodeMetaDataTests.tooOldVersion(); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); + PersistedClusterStateService.overrideVersion(nodeVersion, nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput("n\n"); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> @@ -95,15 +108,13 @@ public void testWarnsIfTooOld() throws Exception { containsString(nodeVersion.toString()))); expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - assertThat(nodeMetaData.nodeId(), equalTo(nodeId)); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths); assertThat(nodeMetaData.nodeVersion(), equalTo(nodeVersion)); } public void testWarnsIfTooNew() throws Exception { - final String nodeId = randomAlphaOfLength(10); final Version nodeVersion = NodeMetaDataTests.tooNewVersion(); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); + PersistedClusterStateService.overrideVersion(nodeVersion, nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("yy", "Yy", "n", "yes", "true", "N", "no")); final ElasticsearchException elasticsearchException = expectThrows(ElasticsearchException.class, () -> @@ -116,15 +127,13 @@ public void testWarnsIfTooNew() throws Exception { containsString(nodeVersion.toString()))); expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - assertThat(nodeMetaData.nodeId(), equalTo(nodeId)); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths); assertThat(nodeMetaData.nodeVersion(), equalTo(nodeVersion)); } public void testOverwritesIfTooOld() throws Exception { - final String nodeId = randomAlphaOfLength(10); final Version nodeVersion = NodeMetaDataTests.tooOldVersion(); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); + PersistedClusterStateService.overrideVersion(nodeVersion, nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("y", "Y")); new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); @@ -137,15 +146,13 @@ public void testOverwritesIfTooOld() throws Exception { containsString(OverrideNodeVersionCommand.SUCCESS_MESSAGE))); expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - assertThat(nodeMetaData.nodeId(), equalTo(nodeId)); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths); assertThat(nodeMetaData.nodeVersion(), equalTo(Version.CURRENT)); } public void testOverwritesIfTooNew() throws Exception { - final String nodeId = randomAlphaOfLength(10); final Version nodeVersion = NodeMetaDataTests.tooNewVersion(); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, nodeVersion), nodePaths); + PersistedClusterStateService.overrideVersion(nodeVersion, nodePaths); final MockTerminal mockTerminal = new MockTerminal(); mockTerminal.addTextInput(randomFrom("y", "Y")); new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); @@ -157,62 +164,7 @@ public void testOverwritesIfTooNew() throws Exception { containsString(OverrideNodeVersionCommand.SUCCESS_MESSAGE))); expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - assertThat(nodeMetaData.nodeId(), equalTo(nodeId)); + final NodeMetaData nodeMetaData = PersistedClusterStateService.nodeMetaData(nodePaths); assertThat(nodeMetaData.nodeVersion(), equalTo(Version.CURRENT)); } - - public void testLenientlyIgnoresExtraFields() throws Exception { - final String nodeId = randomAlphaOfLength(10); - final Version nodeVersion = NodeMetaDataTests.tooNewVersion(); - FutureNodeMetaData.FORMAT.writeAndCleanup(new FutureNodeMetaData(nodeId, nodeVersion, randomLong()), nodePaths); - try { - NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - fail("An exception should have been thrown"); - } catch (ElasticsearchException e) { - assertThat(ExceptionsHelper.stackTrace(e), containsString("unknown field [future_field]")); - } - - final MockTerminal mockTerminal = new MockTerminal(); - mockTerminal.addTextInput(randomFrom("y", "Y")); - new OverrideNodeVersionCommand().processNodePaths(mockTerminal, nodePaths, noOptions, environment); - assertThat(mockTerminal.getOutput(), allOf( - containsString("data loss"), - containsString("You should not use this tool"), - containsString(Version.CURRENT.toString()), - containsString(nodeVersion.toString()), - containsString(OverrideNodeVersionCommand.SUCCESS_MESSAGE))); - expectThrows(IllegalStateException.class, () -> mockTerminal.readText("")); - - final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), nodePaths); - assertThat(nodeMetaData.nodeId(), equalTo(nodeId)); - assertThat(nodeMetaData.nodeVersion(), equalTo(Version.CURRENT)); - } - - private static class FutureNodeMetaData { - private final String nodeId; - private final Version nodeVersion; - private final long futureValue; - - FutureNodeMetaData(String nodeId, Version nodeVersion, long futureValue) { - this.nodeId = nodeId; - this.nodeVersion = nodeVersion; - this.futureValue = futureValue; - } - - static final MetaDataStateFormat FORMAT - = new MetaDataStateFormat(NodeMetaData.FORMAT.getPrefix()) { - @Override - public void toXContent(XContentBuilder builder, FutureNodeMetaData state) throws IOException { - builder.field(NODE_ID_KEY, state.nodeId); - builder.field(NODE_VERSION_KEY, state.nodeVersion.id); - builder.field("future_field", state.futureValue); - } - - @Override - public FutureNodeMetaData fromXContent(XContentParser parser) { - throw new AssertionError("shouldn't be loading a FutureNodeMetaData"); - } - }; - } } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index f113657ebbb94..cf16f21f288db 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -206,14 +206,20 @@ public void testFailsOnMismatchedNodeIds() throws IOException { final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); - try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { - final String message = expectThrows(IllegalStateException.class, - () -> newPersistedClusterStateService(nodeEnvironment).loadBestOnDiskState()).getMessage(); - assertThat(message, - allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); - assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), - Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); - } + final String failure = expectThrows(IllegalStateException.class, () -> newNodeEnvironment(combinedPaths)).getMessage(); + assertThat(failure, + allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); + assertTrue("[" + failure + "] should match " + Arrays.toString(dataPaths2), + Arrays.stream(dataPaths2).anyMatch(p -> failure.contains(p.toString()))); + + // verify that loadBestOnDiskState has same check + final String message = expectThrows(IllegalStateException.class, + () -> new PersistedClusterStateService(combinedPaths, nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + randomBoolean()).loadBestOnDiskState()).getMessage(); + assertThat(message, + allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), + Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); } public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index f84a41131717d..4911ff934377a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.cli.Terminal; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.coordination.ElasticsearchNodeCommand; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -45,9 +44,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.index.IndexSettings; @@ -150,12 +149,9 @@ public void setup() throws IOException { clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder().put(indexMetaData, false).build()).build(); try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) { - final Path[] dataPaths = - Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); - NodeMetaData.FORMAT.writeAndCleanup(new NodeMetaData(nodeId, Version.CURRENT), dataPaths); - - try (PersistedClusterStateService.Writer writer = - ElasticsearchNodeCommand.createPersistedClusterStateService(dataPaths).createWriter()) { + final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); + try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId, + xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, true).createWriter()) { writer.writeFullStateAndCommit(1L, clusterState); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index fe9cc9f8449f6..98911fe3f4cb1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -69,6 +69,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; @@ -1548,7 +1549,9 @@ private synchronized void startAndPublishNodesAndClients(List nod } catch (InterruptedException e) { throw new AssertionError("interrupted while starting nodes", e); } catch (ExecutionException e) { - throw new RuntimeException("failed to start nodes", e); + RuntimeException re = FutureUtils.rethrowExecutionException(e); + re.addSuppressed(new RuntimeException("failed to start nodes")); + throw re; } nodeAndClients.forEach(this::publishNode);