Skip to content

Commit

Permalink
[IOTDB-6179] Load: remove uncessary create schema logic when loading …
Browse files Browse the repository at this point in the history
…tsfiles (which allows users without create schema permission to execute load) (#11256)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
MiniSho and SteveYurongSu authored Oct 10, 2023
1 parent 7c976f1 commit 6f83353
Showing 1 changed file with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,25 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.VerifyMetadataException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
Expand All @@ -43,10 +52,12 @@
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
Expand All @@ -57,12 +68,14 @@
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -78,6 +91,9 @@ public class LoadTsfileAnalyzer {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();

private final LoadTsFileStatement loadTsFileStatement;
private final MPPQueryContext context;

Expand Down Expand Up @@ -253,13 +269,14 @@ public Pair<String, TimeseriesMetadata> next() {
private final class SchemaAutoCreatorAndVerifier {

private final Map<String, Boolean> tsfileDevice2IsAligned = new HashMap<>();
private final Set<PartialPath> alreadySetDatabases = new HashSet<>();

private final int maxTimeseriesNumberPerBatch = CONFIG.getMaxLoadingTimeseriesNumber();
private final Map<String, Set<MeasurementSchema>> currentBatchDevice2TimeseriesSchemas =
new HashMap<>();
private int currentBatchTimeseriesCount = 0;

private final Set<PartialPath> alreadySetDatabases = new HashSet<>();

private SchemaAutoCreatorAndVerifier() {}

public void autoCreateAndVerify(
Expand Down Expand Up @@ -394,7 +411,7 @@ private void makeSureNoDuplicatedMeasurementsInDevices() throws VerifyMetadataEx
private void autoCreateDatabase()
throws VerifyMetadataException, LoadFileException, IllegalPathException {
final int databasePrefixNodesLength = loadTsFileStatement.getDatabaseLevel() + 1;
final Set<PartialPath> databaseSet = new HashSet<>();
final Set<PartialPath> databasesNeededToBeSet = new HashSet<>();

for (final String device : currentBatchDevice2TimeseriesSchemas.keySet()) {
final PartialPath devicePath = new PartialPath(device);
Expand All @@ -410,28 +427,54 @@ private void autoCreateDatabase()
final String[] databasePrefixNodes = new String[databasePrefixNodesLength];
System.arraycopy(devicePrefixNodes, 0, databasePrefixNodes, 0, databasePrefixNodesLength);

databaseSet.add(new PartialPath(databasePrefixNodes));
databasesNeededToBeSet.add(new PartialPath(databasePrefixNodes));
}

// 1. filter out the databases that already exist
if (alreadySetDatabases.isEmpty()) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetDatabaseReq req =
new TGetDatabaseReq(
Arrays.asList(
new ShowDatabaseStatement(new PartialPath(SqlConstant.getSingleRootArray()))
.getPathPattern()
.getNodes()),
SchemaConstant.ALL_MATCH_SCOPE.serialize());
final TShowDatabaseResp resp = configNodeClient.showDatabase(req);

for (final String databaseName : resp.getDatabaseInfoMap().keySet()) {
alreadySetDatabases.add(new PartialPath(databaseName));
}
} catch (IOException | TException | ClientManagerException e) {
throw new LoadFileException(e);
}
}
databasesNeededToBeSet.removeAll(alreadySetDatabases);

databaseSet.removeAll(alreadySetDatabases);
for (final PartialPath databasePath : databaseSet) {
// 2. create the databases that do not exist
for (final PartialPath databasePath : databasesNeededToBeSet) {
final DatabaseSchemaStatement statement =
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(databasePath);
// do not print exception log because it is not an error
statement.setEnablePrintExceptionLog(false);
executeSetDatabaseStatement(statement);

alreadySetDatabases.add(databasePath);
}
alreadySetDatabases.addAll(databaseSet);
}

private void executeSetDatabaseStatement(Statement statement) throws LoadFileException {
final long queryId = SessionManager.getInstance().requestQueryId();
// 1.check Authority
TSStatus status =
AuthorityChecker.checkAuthority(statement, context.getSession().getUserName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
}

// 2.execute setDatabase statement
final long queryId = SessionManager.getInstance().requestQueryId();
final ExecutionResult result =
Coordinator.getInstance()
.execute(
Expand Down

0 comments on commit 6f83353

Please sign in to comment.