Skip to content

Commit

Permalink
[IOTDB-6291] Update RegionGroup configuration when update DataNode co…
Browse files Browse the repository at this point in the history
…nfiguration (apache#11914)
  • Loading branch information
CRZbulabula authored Jan 17, 2024
1 parent c147683 commit 6d46ceb
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@

package org.apache.iotdb.confignode.it.cluster;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
Expand All @@ -33,9 +41,11 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -46,7 +56,9 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;

Expand All @@ -59,6 +71,8 @@ public class IoTDBClusterRestartIT {

private static final int testConfigNodeNum = 3, testDataNodeNum = 2;

private static final long testTimePartitionInterval = 604800000;

@Before
public void setUp() {
EnvFactory.getEnv()
Expand Down Expand Up @@ -114,6 +128,45 @@ public void clusterRestartTest() throws InterruptedException {
@Test
public void clusterRestartAfterUpdateDataNodeTest()
throws InterruptedException, ClientManagerException, IOException, TException {
// Create default Database
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TSStatus status = client.setDatabase(new TDatabaseSchema("root.database"));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}
// Create some DataPartitions to extend 2 DataRegionGroups
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
"root.database", 0, 10, 0, 10, testTimePartitionInterval);
TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
TDataPartitionTableResp dataPartitionTableResp = null;
for (int retry = 0; retry < 5; retry++) {
// Build new Client since it's unstable in Win8 environment
try (SyncConfigNodeIServiceClient configNodeClient =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
if (dataPartitionTableResp != null) {
break;
}
} catch (Exception e) {
// Retry sometimes in order to avoid request timeout
logger.error(e.getMessage());
TimeUnit.SECONDS.sleep(1);
}
}
Assert.assertNotNull(dataPartitionTableResp);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionTableResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
ConfigNodeTestUtils.checkDataPartitionTable(
"root.database",
0,
10,
0,
10,
testTimePartitionInterval,
dataPartitionTableResp.getDataPartitionTable());

// Shutdown all DataNodes
for (int i = 0; i < testDataNodeNum; i++) {
EnvFactory.getEnv().shutdownDataNode(i);
Expand Down Expand Up @@ -151,12 +204,29 @@ public void clusterRestartAfterUpdateDataNodeTest()
// Check DataNode EndPoint
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// Check update in NodeInfo
TShowClusterResp showClusterResp = client.showCluster();
ConfigNodeTestUtils.checkNodeConfig(
showClusterResp.getConfigNodeList(),
showClusterResp.getDataNodeList(),
EnvFactory.getEnv().getConfigNodeWrapperList(),
dataNodeWrapperList);

// Check update in PartitionInfo
TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
showRegionResp
.getRegionInfoList()
.forEach(
regionInfo -> {
AtomicBoolean matched = new AtomicBoolean(false);
dataNodeWrapperList.forEach(
dataNodeWrapper -> {
if (regionInfo.getClientRpcPort() == dataNodeWrapper.getPort()) {
matched.set(true);
}
});
Assert.assertTrue(matched.get());
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,20 @@ public DataSet executeQueryPlan(ConfigPhysicalPlan req)

public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
throws UnknownPhysicalPlanTypeException {
TSStatus status;
switch (physicalPlan.getType()) {
case RegisterDataNode:
return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
case RemoveDataNode:
return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
case UpdateDataNodeConfiguration:
return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
case CreateDatabase:
TSStatus status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan);
status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public void setPreDeleted(boolean preDeleted) {
this.preDeleted = preDeleted;
}

/**
* Update the DataNodeLocation in cached RegionGroups.
*
* @param newDataNodeLocation The new DataNodeLocation.
*/
public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
regionGroupMap.forEach(
(regionGroupId, regionGroup) -> regionGroup.updateDataNode(newDataNodeLocation));
}

/**
* Cache allocation result of new RegionGroups.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
Expand Down Expand Up @@ -139,6 +140,24 @@ public int generateNextRegionGroupId() {
// Consensus read/write interfaces
// ======================================================

/**
* Thread-safely update DataNodeLocation in RegionGroup.
*
* @param updateDataNodePlan UpdateDataNodePlan
* @return {@link TSStatusCode#SUCCESS_STATUS} if the DataNodeLocations are updated successfully.
*/
public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
TDataNodeLocation newDataNodeLocation =
updateDataNodePlan.getDataNodeConfiguration().getLocation();
databasePartitionTables.forEach(
(database, databasePartitionTable) -> {
if (isDatabaseExisted(database)) {
databasePartitionTable.updateDataNode(newDataNodeLocation);
}
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

/**
* Thread-safely create new DatabasePartitionTable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ public TRegionReplicaSet getReplicaSet() {
return replicaSet.deepCopy();
}

/**
* Update the DataNodeLocation in TRegionReplicaSet if necessary.
*
* @param newDataNodeLocation The new DataNodeLocation.
*/
public void updateDataNode(TDataNodeLocation newDataNodeLocation) {
for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
if (replicaSet.getDataNodeLocations().get(i).getDataNodeId()
== newDataNodeLocation.getDataNodeId()) {
replicaSet.getDataNodeLocations().set(i, newDataNodeLocation);
return;
}
}
}

public void addRegionLocation(TDataNodeLocation node) {
replicaSet.addToDataNodeLocations(node);
replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);
Expand Down

0 comments on commit 6d46ceb

Please sign in to comment.