Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add export retry #176

Merged
merged 9 commits into from
Jan 11, 2018
7 changes: 6 additions & 1 deletion be/src/runtime/export_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,13 @@ Status ExportSink::open_file_writer() {
// TODO(lingbin): add some other info to file name, like partition
std::string ExportSink::gen_file_name() {
const TUniqueId& id = _state->fragment_instance_id();

struct timeval tv;
gettimeofday(&tv, NULL);

std::stringstream file_name;
file_name << "export_data_" << id.hi << "_" << id.lo;
file_name << "export_data_" << id.hi << "_" << id.lo << "_"
<< (tv.tv_sec * 1000 + tv.tv_usec / 1000);
return file_name.str();
}

Expand Down
36 changes: 34 additions & 2 deletions fe/src/com/baidu/palo/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.baidu.palo.common.io.Text;
import com.baidu.palo.common.io.Writable;
import com.baidu.palo.common.Pair;
import com.baidu.palo.common.Status;
import com.baidu.palo.common.util.TimeUtils;
import com.baidu.palo.planner.DataPartition;
import com.baidu.palo.planner.ExportSink;
Expand All @@ -49,7 +50,11 @@
import com.baidu.palo.planner.PlanNodeId;
import com.baidu.palo.planner.ScanNode;
import com.baidu.palo.qe.Coordinator;
import com.baidu.palo.system.Backend;
import com.baidu.palo.task.AgentClient;
import com.baidu.palo.thrift.TAgentResult;
import com.baidu.palo.thrift.TNetworkAddress;
import com.baidu.palo.thrift.TStatusCode;
import com.baidu.palo.thrift.TScanRangeLocation;
import com.baidu.palo.thrift.TScanRangeLocations;
import com.baidu.palo.thrift.TUniqueId;
Expand Down Expand Up @@ -489,8 +494,8 @@ public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
return this.snapshotPaths;
}

public void setSnapshotPaths(List<Pair<TNetworkAddress, String>> snapshotPaths) {
this.snapshotPaths = snapshotPaths;
public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
this.snapshotPaths.add(snapshotPath);
}

public String getSql() {
Expand All @@ -502,6 +507,7 @@ public void setSql(String sql) {
}

public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
releaseSnapshotPaths();
failMsg = new ExportFailMsg(type, msg);
updateState(ExportJob.JobState.CANCELLED, false);
}
Expand Down Expand Up @@ -534,6 +540,32 @@ public synchronized boolean updateState(ExportJob.JobState newState, boolean isR
return true;
}

public Status releaseSnapshotPaths() {
List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
LOG.debug("snapshotPaths:{}", snapshotPaths);
for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
TNetworkAddress address = snapshotPath.first;
String host = address.getHostname();
int port = address.getPort();
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
continue;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
continue;
}

AgentClient client = new AgentClient(host, port);
TAgentResult result = client.releaseSnapshot(snapshotPath.second);
if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
continue;
}
}
snapshotPaths.clear();
return Status.OK;
}

@Override
public String toString() {
return "ExportJob [jobId=" + id
Expand Down
15 changes: 15 additions & 0 deletions fe/src/com/baidu/palo/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ public void setTimeout(int timeout) {
this.queryOptions.setQuery_timeout(timeout);
}

public void clearExportStatus() {
lock.lock();
try {
this.backendExecStates.clear();
this.backendExecStateMap.clear();
this.queryStatus.setStatus(new Status());
if (this.exportFiles == null) {
this.exportFiles = Lists.newArrayList();
}
this.exportFiles.clear();
} finally {
lock.unlock();
}
}

// Initiate
private void prepare() {
for (PlanFragment fragment : fragments) {
Expand Down
59 changes: 21 additions & 38 deletions fe/src/com/baidu/palo/task/ExportExportingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

public class ExportExportingTask extends MasterTask {
private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class);
private static final int RETRY_NUM = 3;

protected final ExportJob job;

Expand Down Expand Up @@ -92,15 +93,26 @@ protected void exec() {
}

// if one instance finished, we send request to BE to exec next instance
// TODO(lingbin): add retry sending logic if send fail
List<Coordinator> coords = job.getCoordList();
int coordSize = coords.size();
for (int i = 0; i < coordSize; i++) {
if (isCancelled) {
break;
}
Coordinator coord = coords.get(i);
execOneCoord(coord);
for (int j = 0; j < RETRY_NUM; ++j) {
execOneCoord(coord);
if (coord.getExecStatus().ok()) {
break;
}
if (j < RETRY_NUM - 1) {
coord.clearExportStatus();
LOG.info("export exporting job fail. job: {}. Retry.", job);
}
}
if (!coord.getExecStatus().ok()) {
onFailed(coord.getExecStatus());
}
int progress = (int) (i + 1) * 100 / coordSize;
if (progress >= 100) {
progress = 99;
Expand All @@ -122,7 +134,7 @@ protected void exec() {
}

// release snapshot
Status releaseSnapshotStatus = releaseSnapshotPaths();
Status releaseSnapshotStatus = job.releaseSnapshotPaths();
if (!releaseSnapshotStatus.ok()) {
String failMsg = "release snapshot fail.";
failMsg += releaseSnapshotStatus.getErrorMsg();
Expand Down Expand Up @@ -161,7 +173,7 @@ private Status execOneCoord(Coordinator coord) {
needUnregister = true;
actualExecCoord(queryId, coord);
} catch (InternalException e) {
onFailed(new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()));
LOG.warn("export exporting internal error. {}", e.getMessage());
} finally {
if (needUnregister) {
QeProcessor.unregisterQuery(queryId);
Expand All @@ -181,22 +193,20 @@ private void actualExecCoord(TUniqueId queryId, Coordinator coord) {
try {
coord.exec();
} catch (Exception e) {
onFailed(new Status(TStatusCode.INTERNAL_ERROR, "export Coordinator execute failed."));
LOG.warn("export Coordinator execute failed.");
}

if (coord.join(waitSecond)) {
Status status = coord.getExecStatus();
if (status.ok()) {
onFinished(coord.getExportFiles());
} else {
onFailed(status);
onSubTaskFinished(coord.getExportFiles());
}
} else {
onTimeout();
coord.cancel();
}
}

private synchronized void onFinished(List<String> exportFiles) {
private synchronized void onSubTaskFinished(List<String> exportFiles) {
job.addExportedFiles(exportFiles);
}

Expand All @@ -206,7 +216,7 @@ private synchronized void onFailed(Status failStatus) {
cancelType = ExportFailMsg.CancelType.RUN_FAIL;
String failMsg = "export exporting job fail. ";
failMsg += failStatus.getErrorMsg();
job.cancel(cancelType, failMsg);
job.setFailMsg(new ExportFailMsg(cancelType, failMsg));
LOG.warn("export exporting job fail. job: {}", job);
}

Expand All @@ -215,7 +225,6 @@ public synchronized void onTimeout() {
this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout");
cancelType = ExportFailMsg.CancelType.TIMEOUT;
String failMsg = "export exporting job timeout";
job.cancel(cancelType, failMsg);
LOG.warn("export exporting job timeout. job: {}", job);
}

Expand Down Expand Up @@ -248,32 +257,6 @@ private void registerProfile() {
ProfileManager.getInstance().pushProfile(profile);
}

private Status releaseSnapshotPaths() {
List<Pair<TNetworkAddress, String>> snapshotPaths = job.getSnapshotPaths();
LOG.debug("snapshotPaths:{}", snapshotPaths);
for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
TNetworkAddress address = snapshotPath.first;
String host = address.getHostname();
int port = address.getPort();
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
return Status.CANCELLED;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
return Status.CANCELLED;
}

AgentClient client = new AgentClient(host, port);
TAgentResult result = client.releaseSnapshot(snapshotPath.second);
if (result.getStatus().getStatus_code() != TStatusCode.OK) {
return Status.CANCELLED;
}
}
snapshotPaths.clear();
return Status.OK;
}

private Status moveTmpFiles() {
BrokerMgr.BrokerAddress brokerAddress = null;
try {
Expand Down
5 changes: 1 addition & 4 deletions fe/src/com/baidu/palo/task/ExportPendingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ protected void exec() {

// make snapshots
Status snapshotStatus = makeSnapshots();
// TODO(pengyubing): if export job fail, release snapshot
if (!snapshotStatus.ok()) {
String failMsg = "make snapshot failed.";
failMsg += snapshotStatus.getErrorMsg();
Expand All @@ -94,7 +93,6 @@ private Status makeSnapshots() {
if (tabletLocations == null) {
return Status.OK;
}
List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
for (TScanRangeLocations tablet : tabletLocations) {
TScanRange scanRange = tablet.getScan_range();
if (!scanRange.isSetPalo_scan_range()) {
Expand Down Expand Up @@ -125,11 +123,10 @@ private Status makeSnapshots() {
if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
return Status.CANCELLED;
}
snapshotPaths.add(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
job.addSnapshotPath(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
LOG.debug("snapshot address:{}, path:{}", address, result.getSnapshot_path());
}
}
job.setSnapshotPaths(snapshotPaths);
return Status.OK;
}
}