Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make exec_mem_limit valid for broker load #37

Merged
merged 1 commit into from
Aug 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ class RuntimeState {

#define RETURN_IF_CANCELLED(state) \
do { \
if (UNLIKELY((state)->is_cancelled())) return Status(TStatusCode::CANCELLED); \
if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \
} while (false)

}
Expand Down
6 changes: 3 additions & 3 deletions docs/user_guide/sql_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一

- file_path,broker中的文件路径,可以指定到一个文件,也可以用/*通配符指定某个目录下的所有文件。

- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。
- NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。不支持Broker方式导入

- PARTITION:如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。如果不指定,默认导入table的所有分区。

Expand Down Expand Up @@ -1291,9 +1291,9 @@ load_label是当前导入批次的标签,由用户指定,需要保证在一

- max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。

- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false

- load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false (不支持Broker方式导入)

- exe_mem_limit:在Broker Load方式时生效,指定导入执行时,后端可使用的最大内存。

举例:

Expand Down
2 changes: 2 additions & 0 deletions fe/src/com/baidu/palo/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class LoadStmt extends DdlStmt {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag";
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String CLUSTER_PROPERTY = "cluster";

// for load data from Baidu Object Store(BOS)
Expand Down Expand Up @@ -127,6 +128,7 @@ public static void checkProperties(Map<String, String> properties) throws DdlExc
propertySet.add(LoadStmt.TIMEOUT_PROPERTY);
propertySet.add(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
propertySet.add(LoadStmt.LOAD_DELETE_FLAG_PROPERTY);
propertySet.add(LoadStmt.EXEC_MEM_LIMIT);
propertySet.add(LoadStmt.CLUSTER_PROPERTY);

for (Entry<String, String> entry : properties.entrySet()) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/common/FeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ public class FeConstants {

// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_33;
public static int meta_version = FeMetaVersion.VERSION_34;
}
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/common/FeMetaVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ public final class FeMetaVersion {

// persist decommission type
public static final int VERSION_33 = 33;

// persist LoadJob's execMemLimit
public static final int VERSION_34 = 34;
}
10 changes: 10 additions & 0 deletions fe/src/com/baidu/palo/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
job.setDbId(db.getId());
job.setTimestamp(timestamp);
job.setBrokerDesc(stmt.getBrokerDesc());

// resource info
if (ConnectContext.get() != null) {
job.setResourceInfo(ConnectContext.get().toResourceCtx());
job.setExecMemLimit(ConnectContext.get().getSessionVariable().getMaxExecMemByte());
}

// job properties
Expand Down Expand Up @@ -459,6 +461,14 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
throw new DdlException("Value of delete flag is invalid");
}
}

if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
job.setExecMemLimit(Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)));
} catch (NumberFormatException e) {
throw new DdlException("Execute memory limit is not Long", e);
}
}
}

// job table load info
Expand Down
16 changes: 15 additions & 1 deletion fe/src/com/baidu/palo/load/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public enum EtlJobType {

private static final int DEFAULT_TIMEOUT_S = 0;
private static final double DEFAULT_MAX_FILTER_RATIO = 0;

private static final long DEFAULT_EXEC_MEM_LIMIT = 2147483648L; // 2GB

private long id;
private long dbId;
private String label;
Expand Down Expand Up @@ -103,6 +104,8 @@ public enum EtlJobType {

private TPriority priority;

private long execMemLimit;

public LoadJob() {
this("");
}
Expand Down Expand Up @@ -137,6 +140,7 @@ public LoadJob(String label, int timeoutSecond, double maxFilterRatio) {
this.replicaPersistInfos = Maps.newHashMap();
this.resourceInfo = null;
this.priority = TPriority.NORMAL;
this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT;
}

public long getId() {
Expand Down Expand Up @@ -275,6 +279,10 @@ public PullLoadSourceInfo getPullLoadSourceInfo() {
return pullLoadSourceInfo;
}

public void setExecMemLimit(long execMemLimit) { this.execMemLimit = execMemLimit; }

public long getExecMemLimit() { return execMemLimit; }

public void setEtlJobType(EtlJobType etlJobType) {
this.etlJobType = etlJobType;
switch (etlJobType) {
Expand Down Expand Up @@ -605,6 +613,8 @@ public void write(DataOutput out) throws IOException {
out.writeBoolean(true);
pullLoadSourceInfo.write(out);
}

out.writeLong(execMemLimit);
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -713,6 +723,10 @@ public void readFields(DataInput in) throws IOException {
this.pullLoadSourceInfo = PullLoadSourceInfo.read(in);
}
}

if (version >= FeMetaVersion.VERSION_34) {
this.execMemLimit = in.readLong();
}
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions fe/src/com/baidu/palo/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public String getTrackingUrl() {
return trackingUrl;
}

public void setExecMemoryLimit(long execMemoryLimit) {
this.queryOptions.setMem_limit(execMemoryLimit);
}

public void setTimeout(int timeout) {
this.queryOptions.setQuery_timeout(timeout);
}

// Initiate
private void prepare() {
for (PlanFragment fragment : fragments) {
Expand Down
36 changes: 20 additions & 16 deletions fe/src/com/baidu/palo/task/LoadEtlTask.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.baidu.palo.task;

import com.baidu.palo.catalog.Catalog;
Expand Down Expand Up @@ -64,6 +64,10 @@ public LoadEtlTask(LoadJob job) {
this.load = Catalog.getInstance().getLoadInstance();
}

protected String getErrorMsg() {
return "etl job fail";
}

@Override
protected void exec() {
// check job state
Expand Down Expand Up @@ -117,7 +121,7 @@ private void updateEtlStatus() throws LoadException {
processEtlFinished();
break;
case CANCELLED:
throw new LoadException("etl job fail");
throw new LoadException(getErrorMsg());
case RUNNING:
processEtlRunning();
break;
Expand Down
15 changes: 15 additions & 0 deletions fe/src/com/baidu/palo/task/PullLoadEtlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public PullLoadEtlTask(LoadJob job) {
mgr = Catalog.getInstance().getPullLoadJobMgr();
}

@Override
protected String getErrorMsg() {
String errMsg = null;
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
if (pullLoadJob != null) {
PullLoadTask failureTask = pullLoadJob.getFailureTask();
if (failureTask != null) {
if (failureTask.getExecuteStatus() != null) {
errMsg = "Broker etl failed: " + failureTask.getExecuteStatus().getErrorMsg();
}
}
}
return errMsg != null ? errMsg : super.getErrorMsg();
}

@Override
protected boolean updateJobEtlStatus() {
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/task/PullLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public synchronized void cancel() {
}
}

public PullLoadTask getFailureTask() {
return failureTask;
}

public synchronized void onTaskFinished(PullLoadTask task) {
int taskId = task.taskId;
if (!state.isRunning()) {
Expand Down
2 changes: 1 addition & 1 deletion fe/src/com/baidu/palo/task/PullLoadPendingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void createEtlRequest() throws Exception {
// Generate pull load task, one
PullLoadTask task = new PullLoadTask(
job.getId(), nextTaskId, db, table,
job.getBrokerDesc(), entry.getValue(), jobDeadlineMs);
job.getBrokerDesc(), entry.getValue(), jobDeadlineMs, job.getExecMemLimit());
task.init();
pullLoadTaskList.add(task);
nextTaskId++;
Expand Down
8 changes: 6 additions & 2 deletions fe/src/com/baidu/palo/task/PullLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.baidu.palo.common.InternalException;
import com.baidu.palo.common.Status;
import com.baidu.palo.load.BrokerFileGroup;
import com.baidu.palo.load.LoadJob;
import com.baidu.palo.qe.Coordinator;
import com.baidu.palo.qe.QeProcessor;
import com.baidu.palo.thrift.TQueryType;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class PullLoadTask {
private Map<String, Long> fileMap;
private String trackingUrl;
private Map<String, String> counters;
private final long execMemLimit;

// Runtime variables
private enum State {
Expand All @@ -74,14 +76,15 @@ public PullLoadTask(
long jobId, int taskId,
Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs) {
long jobDeadlineMs, long execMemLimit) {
this.jobId = jobId;
this.taskId = taskId;
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
}

public void init() throws InternalException {
Expand Down Expand Up @@ -117,7 +120,7 @@ public synchronized boolean isFinished() {
}

public Status getExecuteStatus() {
return null;
return executeStatus;
}

public synchronized void onCancelled() {
Expand Down Expand Up @@ -201,6 +204,7 @@ public void executeOnce() throws InternalException {
curCoordinator = new Coordinator(executeId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes());
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
}

boolean needUnregister = false;
Expand Down