Skip to content

Commit

Permalink
add export retry (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
yubingpeng authored Jan 11, 2018
1 parent 42cb31e commit 4972923
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 45 deletions.
7 changes: 6 additions & 1 deletion be/src/runtime/export_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,13 @@ Status ExportSink::open_file_writer() {
// TODO(lingbin): add some other info to file name, like partition
std::string ExportSink::gen_file_name() {
const TUniqueId& id = _state->fragment_instance_id();

struct timeval tv;
gettimeofday(&tv, NULL);

std::stringstream file_name;
file_name << "export_data_" << id.hi << "_" << id.lo;
file_name << "export_data_" << id.hi << "_" << id.lo << "_"
<< (tv.tv_sec * 1000 + tv.tv_usec / 1000);
return file_name.str();
}

Expand Down
36 changes: 34 additions & 2 deletions fe/src/com/baidu/palo/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.baidu.palo.common.io.Text;
import com.baidu.palo.common.io.Writable;
import com.baidu.palo.common.Pair;
import com.baidu.palo.common.Status;
import com.baidu.palo.common.util.TimeUtils;
import com.baidu.palo.planner.DataPartition;
import com.baidu.palo.planner.ExportSink;
Expand All @@ -49,7 +50,11 @@
import com.baidu.palo.planner.PlanNodeId;
import com.baidu.palo.planner.ScanNode;
import com.baidu.palo.qe.Coordinator;
import com.baidu.palo.system.Backend;
import com.baidu.palo.task.AgentClient;
import com.baidu.palo.thrift.TAgentResult;
import com.baidu.palo.thrift.TNetworkAddress;
import com.baidu.palo.thrift.TStatusCode;
import com.baidu.palo.thrift.TScanRangeLocation;
import com.baidu.palo.thrift.TScanRangeLocations;
import com.baidu.palo.thrift.TUniqueId;
Expand Down Expand Up @@ -489,8 +494,8 @@ public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
return this.snapshotPaths;
}

public void setSnapshotPaths(List<Pair<TNetworkAddress, String>> snapshotPaths) {
this.snapshotPaths = snapshotPaths;
public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
this.snapshotPaths.add(snapshotPath);
}

public String getSql() {
Expand All @@ -502,6 +507,7 @@ public void setSql(String sql) {
}

public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
releaseSnapshotPaths();
failMsg = new ExportFailMsg(type, msg);
updateState(ExportJob.JobState.CANCELLED, false);
}
Expand Down Expand Up @@ -534,6 +540,32 @@ public synchronized boolean updateState(ExportJob.JobState newState, boolean isR
return true;
}

public Status releaseSnapshotPaths() {
List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
LOG.debug("snapshotPaths:{}", snapshotPaths);
for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
TNetworkAddress address = snapshotPath.first;
String host = address.getHostname();
int port = address.getPort();
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
continue;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
continue;
}

AgentClient client = new AgentClient(host, port);
TAgentResult result = client.releaseSnapshot(snapshotPath.second);
if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
continue;
}
}
snapshotPaths.clear();
return Status.OK;
}

@Override
public String toString() {
return "ExportJob [jobId=" + id
Expand Down
15 changes: 15 additions & 0 deletions fe/src/com/baidu/palo/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ public void setTimeout(int timeout) {
this.queryOptions.setQuery_timeout(timeout);
}

public void clearExportStatus() {
lock.lock();
try {
this.backendExecStates.clear();
this.backendExecStateMap.clear();
this.queryStatus.setStatus(new Status());
if (this.exportFiles == null) {
this.exportFiles = Lists.newArrayList();
}
this.exportFiles.clear();
} finally {
lock.unlock();
}
}

// Initiate
private void prepare() {
for (PlanFragment fragment : fragments) {
Expand Down
59 changes: 21 additions & 38 deletions fe/src/com/baidu/palo/task/ExportExportingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

public class ExportExportingTask extends MasterTask {
private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class);
private static final int RETRY_NUM = 3;

protected final ExportJob job;

Expand Down Expand Up @@ -92,15 +93,26 @@ protected void exec() {
}

// if one instance finished, we send request to BE to exec next instance
// TODO(lingbin): add retry sending logic if send fail
List<Coordinator> coords = job.getCoordList();
int coordSize = coords.size();
for (int i = 0; i < coordSize; i++) {
if (isCancelled) {
break;
}
Coordinator coord = coords.get(i);
execOneCoord(coord);
for (int j = 0; j < RETRY_NUM; ++j) {
execOneCoord(coord);
if (coord.getExecStatus().ok()) {
break;
}
if (j < RETRY_NUM - 1) {
coord.clearExportStatus();
LOG.info("export exporting job fail. job: {}. Retry.", job);
}
}
if (!coord.getExecStatus().ok()) {
onFailed(coord.getExecStatus());
}
int progress = (int) (i + 1) * 100 / coordSize;
if (progress >= 100) {
progress = 99;
Expand All @@ -122,7 +134,7 @@ protected void exec() {
}

// release snapshot
Status releaseSnapshotStatus = releaseSnapshotPaths();
Status releaseSnapshotStatus = job.releaseSnapshotPaths();
if (!releaseSnapshotStatus.ok()) {
String failMsg = "release snapshot fail.";
failMsg += releaseSnapshotStatus.getErrorMsg();
Expand Down Expand Up @@ -161,7 +173,7 @@ private Status execOneCoord(Coordinator coord) {
needUnregister = true;
actualExecCoord(queryId, coord);
} catch (InternalException e) {
onFailed(new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()));
LOG.warn("export exporting internal error. {}", e.getMessage());
} finally {
if (needUnregister) {
QeProcessor.unregisterQuery(queryId);
Expand All @@ -181,22 +193,20 @@ private void actualExecCoord(TUniqueId queryId, Coordinator coord) {
try {
coord.exec();
} catch (Exception e) {
onFailed(new Status(TStatusCode.INTERNAL_ERROR, "export Coordinator execute failed."));
LOG.warn("export Coordinator execute failed.");
}

if (coord.join(waitSecond)) {
Status status = coord.getExecStatus();
if (status.ok()) {
onFinished(coord.getExportFiles());
} else {
onFailed(status);
onSubTaskFinished(coord.getExportFiles());
}
} else {
onTimeout();
coord.cancel();
}
}

private synchronized void onFinished(List<String> exportFiles) {
private synchronized void onSubTaskFinished(List<String> exportFiles) {
job.addExportedFiles(exportFiles);
}

Expand All @@ -206,7 +216,7 @@ private synchronized void onFailed(Status failStatus) {
cancelType = ExportFailMsg.CancelType.RUN_FAIL;
String failMsg = "export exporting job fail. ";
failMsg += failStatus.getErrorMsg();
job.cancel(cancelType, failMsg);
job.setFailMsg(new ExportFailMsg(cancelType, failMsg));
LOG.warn("export exporting job fail. job: {}", job);
}

Expand All @@ -215,7 +225,6 @@ public synchronized void onTimeout() {
this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout");
cancelType = ExportFailMsg.CancelType.TIMEOUT;
String failMsg = "export exporting job timeout";
job.cancel(cancelType, failMsg);
LOG.warn("export exporting job timeout. job: {}", job);
}

Expand Down Expand Up @@ -248,32 +257,6 @@ private void registerProfile() {
ProfileManager.getInstance().pushProfile(profile);
}

private Status releaseSnapshotPaths() {
List<Pair<TNetworkAddress, String>> snapshotPaths = job.getSnapshotPaths();
LOG.debug("snapshotPaths:{}", snapshotPaths);
for (Pair<TNetworkAddress, String> snapshotPath : snapshotPaths) {
TNetworkAddress address = snapshotPath.first;
String host = address.getHostname();
int port = address.getPort();
Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(host, port);
if (backend == null) {
return Status.CANCELLED;
}
long backendId = backend.getId();
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
return Status.CANCELLED;
}

AgentClient client = new AgentClient(host, port);
TAgentResult result = client.releaseSnapshot(snapshotPath.second);
if (result.getStatus().getStatus_code() != TStatusCode.OK) {
return Status.CANCELLED;
}
}
snapshotPaths.clear();
return Status.OK;
}

private Status moveTmpFiles() {
BrokerMgr.BrokerAddress brokerAddress = null;
try {
Expand Down
5 changes: 1 addition & 4 deletions fe/src/com/baidu/palo/task/ExportPendingTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ protected void exec() {

// make snapshots
Status snapshotStatus = makeSnapshots();
// TODO(pengyubing): if export job fail, release snapshot
if (!snapshotStatus.ok()) {
String failMsg = "make snapshot failed.";
failMsg += snapshotStatus.getErrorMsg();
Expand All @@ -94,7 +93,6 @@ private Status makeSnapshots() {
if (tabletLocations == null) {
return Status.OK;
}
List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
for (TScanRangeLocations tablet : tabletLocations) {
TScanRange scanRange = tablet.getScan_range();
if (!scanRange.isSetPalo_scan_range()) {
Expand Down Expand Up @@ -125,11 +123,10 @@ private Status makeSnapshots() {
if (result == null || result.getStatus().getStatus_code() != TStatusCode.OK) {
return Status.CANCELLED;
}
snapshotPaths.add(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
job.addSnapshotPath(new Pair<TNetworkAddress, String>(address, result.getSnapshot_path()));
LOG.debug("snapshot address:{}, path:{}", address, result.getSnapshot_path());
}
}
job.setSnapshotPaths(snapshotPaths);
return Status.OK;
}
}

0 comments on commit 4972923

Please sign in to comment.