Skip to content

Commit

Permalink
[Enhencement](Export) support export with outfile syntax (apache#18325)
Browse files Browse the repository at this point in the history
`Export` syntax provides asynchronous export function, but `Export` does not achieve vectorization.
`Outfile` syntax provides synchronous export function`.
So we can reimplement the export syntax with oufile syntax.
  • Loading branch information
BePPPower authored and gnehil committed Apr 21, 2023
1 parent f9f9f10 commit 0fb45c9
Show file tree
Hide file tree
Showing 22 changed files with 638 additions and 1,008 deletions.
9 changes: 9 additions & 0 deletions be/src/vec/runtime/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,15 @@ Status VFileResultWriter::_send_result() {
std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());

std::map<std::string, string> attach_infos;
attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx)));
attach_infos.insert(
std::make_pair("TotalRows", std::to_string(_written_rows_counter->value())));
attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value())));
attach_infos.insert(std::make_pair("URL", file_url));

result->result_batch.__set_attached_infos(attach_infos);
RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result");
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ EXPORT TABLE testTbl TO "file:///home/data/a" PROPERTIES ("columns" = "k1,v1");
8. 将 testTbl 表中的所有数据导出到 s3 上,以不可见字符 "\x07" 作为列或者行分隔符。

```sql
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c"
EXPORT TABLE testTbl TO "s3://hdfs_host:port/a/b/c"
PROPERTIES (
"column_separator"="\\x07",
"line_delimiter" = "\\x07"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ public final class FeMetaVersion {
// TablePropertyInfo add db id
public static final int VERSION_119 = 119;

public static final int VERSION_120 = 120;

// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_119;
public static final int VERSION_CURRENT = VERSION_120;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
81 changes: 58 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -75,6 +76,17 @@ public class ExportStmt extends StatementBase {

private TableRef tableRef;

private String format;

private String label;

private String maxFileSize;
private SessionVariable sessionVariables;

private String qualifiedUser;

private UserIdentity userIdentity;

public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this.tableRef = tableRef;
Expand All @@ -87,6 +99,7 @@ public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
this.columns = DEFAULT_COLUMNS;
this.sessionVariables = ConnectContext.get().getSessionVariable();
}

public String getColumns() {
Expand All @@ -113,10 +126,6 @@ public BrokerDesc getBrokerDesc() {
return brokerDesc;
}

public Map<String, String> getProperties() {
return properties;
}

public String getColumnSeparator() {
return this.columnSeparator;
}
Expand All @@ -125,6 +134,30 @@ public String getLineDelimiter() {
return this.lineDelimiter;
}

public TableRef getTableRef() {
return this.tableRef;
}

public String getFormat() {
return format;
}

public String getLabel() {
return label;
}

public SessionVariable getSessionVariables() {
return sessionVariables;
}

public String getQualifiedUser() {
return qualifiedUser;
}

public UserIdentity getUserIdentity() {
return this.userIdentity;
}

@Override
public boolean needAuditEncryption() {
if (brokerDesc != null) {
Expand Down Expand Up @@ -162,6 +195,8 @@ public void analyze(Analyzer analyzer) throws UserException {
ConnectContext.get().getRemoteIP(),
tblName.getDb() + ": " + tblName.getTbl());
}
qualifiedUser = ConnectContext.get().getQualifiedUser();
userIdentity = ConnectContext.get().getCurrentUserIdentity();

// check table && partitions whether exist
checkTable(analyzer.getEnv());
Expand All @@ -171,8 +206,6 @@ public void analyze(Analyzer analyzer) throws UserException {
brokerDesc = new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null);
}

// where expr will be checked in export job

// check path is valid
path = checkPath(path, brokerDesc.getStorageType());
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
Expand Down Expand Up @@ -261,7 +294,6 @@ public static String checkPath(String path, StorageBackend.StorageType type) thr
throw new AnalysisException(
"Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
}
path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length() - 1);
}
return path;
}
Expand All @@ -271,31 +303,26 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
properties, ExportStmt.DEFAULT_LINE_DELIMITER));
this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
// exec_mem_limit
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
} catch (NumberFormatException e) {
throw new DdlException("Invalid exec_mem_limit value: " + e.getMessage());
}
} else {
// use session variables
properties.put(LoadStmt.EXEC_MEM_LIMIT,
String.valueOf(ConnectContext.get().getSessionVariable().getMaxExecMemByte()));
}
this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, DEFAULT_COLUMNS);

// timeout
if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
try {
Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY));
} catch (NumberFormatException e) {
throw new DdlException("Invalid timeout value: " + e.getMessage());
}
} else {
// use session variables
properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(Config.export_task_default_timeout_second));
}

// format
if (properties.containsKey(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE)) {
this.format = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE).toLowerCase();
} else {
this.format = "csv";
}

// tablet num per task
if (properties.containsKey(TABLET_NUMBER_PER_TASK_PROP)) {
try {
Expand All @@ -308,13 +335,17 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio
properties.put(TABLET_NUMBER_PER_TASK_PROP, String.valueOf(Config.export_tablet_num_per_task));
}

// max_file_size
this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");

if (properties.containsKey(LABEL)) {
FeNameFormat.checkLabel(properties.get(LABEL));
} else {
// generate a random label
String label = "export_" + UUID.randomUUID().toString();
String label = "export_" + UUID.randomUUID();
properties.put(LABEL, label);
}
label = properties.get(LABEL);
}

@Override
Expand Down Expand Up @@ -361,4 +392,8 @@ public RedirectStatus getRedirectStatus() {
public String toString() {
return toSql();
}

public String getMaxFileSize() {
return maxFileSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ public class OutFileClause {
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
public static final String FILE_NUMBER = "FileNumber";
public static final String TOTAL_ROWS = "TotalRows";
public static final String FILE_SIZE = "FileSize";
public static final String URL = "URL";

static {
RESULT_COL_NAMES.add("FileNumber");
RESULT_COL_NAMES.add("TotalRows");
RESULT_COL_NAMES.add("FileSize");
RESULT_COL_NAMES.add("URL");
RESULT_COL_NAMES.add(FILE_NUMBER);
RESULT_COL_NAMES.add(TOTAL_ROWS);
RESULT_COL_NAMES.add(FILE_SIZE);
RESULT_COL_NAMES.add(URL);

RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
Expand Down Expand Up @@ -122,9 +126,9 @@ public class OutFileClause {
private static final String HADOOP_PROP_PREFIX = "hadoop.";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
private static final String PROP_LINE_DELIMITER = "line_delimiter";
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
public static final String PROP_LINE_DELIMITER = "line_delimiter";
public static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String ORC_PROP_PREFIX = "orc.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public SelectStmt(ValueList valueList, ArrayList<OrderByElement> orderByElement,
this.colLabels = Lists.newArrayList();
}

SelectStmt(
public SelectStmt(
SelectList selectList,
FromClause fromClause,
Expr wherePredicate,
Expand Down
6 changes: 2 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportChecker;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
Expand Down Expand Up @@ -1402,9 +1401,8 @@ private void startMasterOnlyDaemonThreads() {
loadJobScheduler.start();
loadEtlChecker.start();
loadLoadingChecker.start();
// Export checker
ExportChecker.init(Config.export_checker_interval_second * 1000L);
ExportChecker.startAll();
// export task
exportMgr.start();
// Tablet checker and scheduler
tabletChecker.start();
tabletScheduler.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ExportProcNode implements ProcNodeInterface {
.add("JobId").add("Label").add("State").add("Progress")
.add("TaskInfo").add("Path")
.add("CreateTime").add("StartTime").add("FinishTime")
.add("Timeout").add("ErrorMsg")
.add("Timeout").add("ErrorMsg").add("OutfileInfo")
.build();

// label and state column index of result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
case OperationType.OP_EXPORT_UPDATE_STATE:
data = new ExportJob.StateTransfer();
((ExportJob.StateTransfer) data).readFields(in);
data = ExportJob.StateTransfer.read(in);
isRead = true;
break;
case OperationType.OP_FINISH_DELETE: {
Expand Down
Loading

0 comments on commit 0fb45c9

Please sign in to comment.