diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java index e6e3bd419ac6..6a64e7cc7e1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java @@ -22,16 +22,25 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.auth.entity.PrivilegeType; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.VerifyMetadataException; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo; @@ -43,10 +52,12 @@ import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -57,12 +68,14 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -78,6 +91,9 @@ public class LoadTsfileAnalyzer { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final IClientManager CONFIG_NODE_CLIENT_MANAGER = + ConfigNodeClientManager.getInstance(); + private final LoadTsFileStatement loadTsFileStatement; private final MPPQueryContext context; @@ -253,13 +269,14 @@ public Pair next() { private final class SchemaAutoCreatorAndVerifier { private final Map tsfileDevice2IsAligned = new HashMap<>(); - private final Set alreadySetDatabases = new HashSet<>(); private final int maxTimeseriesNumberPerBatch = CONFIG.getMaxLoadingTimeseriesNumber(); private final Map> currentBatchDevice2TimeseriesSchemas = new HashMap<>(); private int currentBatchTimeseriesCount = 0; + private final Set alreadySetDatabases = new HashSet<>(); + private SchemaAutoCreatorAndVerifier() {} public void autoCreateAndVerify( @@ -394,7 +411,7 @@ private void makeSureNoDuplicatedMeasurementsInDevices() throws VerifyMetadataEx private void autoCreateDatabase() throws VerifyMetadataException, LoadFileException, IllegalPathException { final int databasePrefixNodesLength = loadTsFileStatement.getDatabaseLevel() + 1; - final Set databaseSet = new HashSet<>(); + final Set databasesNeededToBeSet = new HashSet<>(); for (final String device : currentBatchDevice2TimeseriesSchemas.keySet()) { final PartialPath devicePath = new PartialPath(device); @@ -410,28 +427,54 @@ private void autoCreateDatabase() final String[] databasePrefixNodes = new String[databasePrefixNodesLength]; System.arraycopy(devicePrefixNodes, 0, databasePrefixNodes, 0, databasePrefixNodesLength); - databaseSet.add(new PartialPath(databasePrefixNodes)); + databasesNeededToBeSet.add(new PartialPath(databasePrefixNodes)); + } + + // 1. filter out the databases that already exist + if (alreadySetDatabases.isEmpty()) { + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TGetDatabaseReq req = + new TGetDatabaseReq( + Arrays.asList( + new ShowDatabaseStatement(new PartialPath(SqlConstant.getSingleRootArray())) + .getPathPattern() + .getNodes()), + SchemaConstant.ALL_MATCH_SCOPE.serialize()); + final TShowDatabaseResp resp = configNodeClient.showDatabase(req); + + for (final String databaseName : resp.getDatabaseInfoMap().keySet()) { + alreadySetDatabases.add(new PartialPath(databaseName)); + } + } catch (IOException | TException | ClientManagerException e) { + throw new LoadFileException(e); + } } + databasesNeededToBeSet.removeAll(alreadySetDatabases); - databaseSet.removeAll(alreadySetDatabases); - for (final PartialPath databasePath : databaseSet) { + // 2. create the databases that do not exist + for (final PartialPath databasePath : databasesNeededToBeSet) { final DatabaseSchemaStatement statement = new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); statement.setDatabasePath(databasePath); // do not print exception log because it is not an error statement.setEnablePrintExceptionLog(false); executeSetDatabaseStatement(statement); + + alreadySetDatabases.add(databasePath); } - alreadySetDatabases.addAll(databaseSet); } private void executeSetDatabaseStatement(Statement statement) throws LoadFileException { - final long queryId = SessionManager.getInstance().requestQueryId(); + // 1.check Authority TSStatus status = AuthorityChecker.checkAuthority(statement, context.getSession().getUserName()); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode())); } + + // 2.execute setDatabase statement + final long queryId = SessionManager.getInstance().requestQueryId(); final ExecutionResult result = Coordinator.getInstance() .execute(