diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java index 8b22780f8d21..8e5e17b17bf2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java @@ -19,11 +19,19 @@ package org.apache.iotdb.confignode.it.cluster; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.EnvUtils; import org.apache.iotdb.it.env.cluster.config.MppBaseConfig; @@ -33,9 +41,11 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -46,7 +56,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; @@ -59,6 +71,8 @@ public class IoTDBClusterRestartIT { private static final int testConfigNodeNum = 3, testDataNodeNum = 2; + private static final long testTimePartitionInterval = 604800000; + @Before public void setUp() { EnvFactory.getEnv() @@ -114,6 +128,45 @@ public void clusterRestartTest() throws InterruptedException { @Test public void clusterRestartAfterUpdateDataNodeTest() throws InterruptedException, ClientManagerException, IOException, TException { + // Create default Database + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + TSStatus status = client.setDatabase(new TDatabaseSchema("root.database")); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + // Create some DataPartitions to extend 2 DataRegionGroups + Map> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + "root.database", 0, 10, 0, 10, testTimePartitionInterval); + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + TDataPartitionTableResp dataPartitionTableResp = null; + for (int retry = 0; retry < 5; retry++) { + // Build new Client since it's unstable in Win8 environment + try (SyncConfigNodeIServiceClient configNodeClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); + if (dataPartitionTableResp != null) { + break; + } + } catch (Exception e) { + // Retry sometimes in order to avoid request timeout + logger.error(e.getMessage()); + TimeUnit.SECONDS.sleep(1); + } + } + Assert.assertNotNull(dataPartitionTableResp); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + ConfigNodeTestUtils.checkDataPartitionTable( + "root.database", + 0, + 10, + 0, + 10, + testTimePartitionInterval, + dataPartitionTableResp.getDataPartitionTable()); + // Shutdown all DataNodes for (int i = 0; i < testDataNodeNum; i++) { EnvFactory.getEnv().shutdownDataNode(i); @@ -151,12 +204,29 @@ public void clusterRestartAfterUpdateDataNodeTest() // Check DataNode EndPoint try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + // Check update in NodeInfo TShowClusterResp showClusterResp = client.showCluster(); ConfigNodeTestUtils.checkNodeConfig( showClusterResp.getConfigNodeList(), showClusterResp.getDataNodeList(), EnvFactory.getEnv().getConfigNodeWrapperList(), dataNodeWrapperList); + + // Check update in PartitionInfo + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + showRegionResp + .getRegionInfoList() + .forEach( + regionInfo -> { + AtomicBoolean matched = new AtomicBoolean(false); + dataNodeWrapperList.forEach( + dataNodeWrapper -> { + if (regionInfo.getClientRpcPort() == dataNodeWrapper.getPort()) { + matched.set(true); + } + }); + Assert.assertTrue(matched.get()); + }); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 3a9994848c39..a46a9e213c15 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -286,15 +286,20 @@ public DataSet executeQueryPlan(ConfigPhysicalPlan req) public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) throws UnknownPhysicalPlanTypeException { + TSStatus status; switch (physicalPlan.getType()) { case RegisterDataNode: return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan); case RemoveDataNode: return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan); case UpdateDataNodeConfiguration: - return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan); + status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan); case CreateDatabase: - TSStatus status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan); + status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index 8ba1eac0b078..5a4971b96998 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -89,6 +89,16 @@ public void setPreDeleted(boolean preDeleted) { this.preDeleted = preDeleted; } + /** + * Update the DataNodeLocation in cached RegionGroups. + * + * @param newDataNodeLocation The new DataNodeLocation. + */ + public void updateDataNode(TDataNodeLocation newDataNodeLocation) { + regionGroupMap.forEach( + (regionGroupId, regionGroup) -> regionGroup.updateDataNode(newDataNodeLocation)); + } + /** * Cache allocation result of new RegionGroups. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 979ebd7875df..16317152deed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; @@ -139,6 +140,24 @@ public int generateNextRegionGroupId() { // Consensus read/write interfaces // ====================================================== + /** + * Thread-safely update DataNodeLocation in RegionGroup. + * + * @param updateDataNodePlan UpdateDataNodePlan + * @return {@link TSStatusCode#SUCCESS_STATUS} if the DataNodeLocations are updated successfully. + */ + public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) { + TDataNodeLocation newDataNodeLocation = + updateDataNodePlan.getDataNodeConfiguration().getLocation(); + databasePartitionTables.forEach( + (database, databasePartitionTable) -> { + if (isDatabaseExisted(database)) { + databasePartitionTable.updateDataNode(newDataNodeLocation); + } + }); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + /** * Thread-safely create new DatabasePartitionTable. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java index 81fea013c218..f02b6624d4f3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java @@ -75,6 +75,21 @@ public TRegionReplicaSet getReplicaSet() { return replicaSet.deepCopy(); } + /** + * Update the DataNodeLocation in TRegionReplicaSet if necessary. + * + * @param newDataNodeLocation The new DataNodeLocation. + */ + public void updateDataNode(TDataNodeLocation newDataNodeLocation) { + for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) { + if (replicaSet.getDataNodeLocations().get(i).getDataNodeId() + == newDataNodeLocation.getDataNodeId()) { + replicaSet.getDataNodeLocations().set(i, newDataNodeLocation); + return; + } + } + } + public void addRegionLocation(TDataNodeLocation node) { replicaSet.addToDataNodeLocations(node); replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);