Skip to content

Commit

Permalink
check the label state when label already exists
Browse files Browse the repository at this point in the history
  • Loading branch information
hffariel committed Jan 24, 2022
1 parent 83242f1 commit f5cedbe
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class StarRocksWriterOptions implements Serializable {
private static final long serialVersionUID = 1l;
private static final long KILO_BYTES_SCALE = 1024l;
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
private static final int MAX_RETRIES = 1;
private static final int MAX_RETRIES = 3;
private static final int BATCH_ROWS = 500000;
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
private static final long FLUSH_INTERVAL = 300000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public StarRocksFlushTuple(String label, Long bytes, List<byte[]> rows) {
}

public String getLabel() { return label; }
public void setLabel(String label) { this.label = label; }
public Long getBytes() { return bytes; }
public List<byte[]> getRows() { return rows; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;

import java.io.IOException;
import java.util.Map;


public class StarRocksStreamLoadFailedException extends IOException {

static final long serialVersionUID = 1L;

private final Map<String, Object> response;
private boolean reCreateLabel;

public StarRocksStreamLoadFailedException(String message, Map<String, Object> response) {
super(message);
this.response = response;
}

public StarRocksStreamLoadFailedException(String message, Map<String, Object> response, boolean reCreateLabel) {
super(message);
this.response = response;
this.reCreateLabel = reCreateLabel;
}

public Map<String, Object> getFailedResponse() {
return response;
}

public boolean needReCreateLabel() {
return reCreateLabel;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand All @@ -26,6 +27,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;


Expand All @@ -35,6 +37,13 @@ public class StarRocksStreamLoadVisitor {

private final StarRocksWriterOptions writerOptions;
private long pos;
private static final String RESULT_FAILED = "Fail";
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
private static final String RESULT_LABEL_PREPARE = "PREPARE";
private static final String RESULT_LABEL_ABORTED = "ABORTED";
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";

public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) {
this.writerOptions = writerOptions;
Expand All @@ -59,10 +68,14 @@ public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
throw new IOException("Unable to flush data to StarRocks: unknown result status.");
}
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
if (loadResult.get(keyStatus).equals("Fail")) {
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
throw new IOException(
new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString()
);
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
// has to block-checking the state to get the final result
checkLabelState(host, flushData.getLabel());
}
}

Expand Down Expand Up @@ -122,6 +135,52 @@ private byte[] joinRows(List<byte[]> rows, int totalBytes) {
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
}

@SuppressWarnings("unchecked")
private void checkLabelState(String host, String label) throws IOException {
int idx = 0;
while(true) {
try {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
} catch (InterruptedException ex) {
break;
}
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(writerOptions.getDatabase()).append("/get_load_state?label=").append(label).toString());
httpGet.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword()));
httpGet.setHeader("Connection", "close");

try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null) {
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s].\n", label), null);
}
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
String labelState = (String)result.get("state");
if (null == labelState) {
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
switch(labelState) {
case LAEBL_STATE_VISIBLE:
case LAEBL_STATE_COMMITTED:
return;
case RESULT_LABEL_PREPARE:
continue;
case RESULT_LABEL_ABORTED:
throw new StarRocksStreamLoadFailedException(String.format("Failed to flush data to StarRocks, Error " +
"label[%s] state[%s]\n", label, labelState), null, true);
case RESULT_LABEL_UNKNOWN:
default:
throw new IOException(String.format("Failed to flush data to StarRocks, Error " +
"label[%s] state[%s]\n", label, labelState), null);
}
}
}
}
}

@SuppressWarnings("unchecked")
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
Expand Down Expand Up @@ -150,16 +209,9 @@ protected boolean isRedirectable(String method) {
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
int code = resp.getStatusLine().getStatusCode();
if (200 != code) {
LOG.warn("Request failed with code:{}", code);
HttpEntity respEntity = getHttpEntity(resp);
if (respEntity == null)
return null;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
LOG.warn("Request failed with empty response.");
return null;
}
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
}
}
Expand All @@ -171,4 +223,18 @@ private String getBasicAuthHeader(String username, String password) {
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
}

private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (200 != code) {
LOG.warn("Request failed with code:{}", code);
return null;
}
HttpEntity respEntity = resp.getEntity();
if (null == respEntity) {
LOG.warn("Request failed with empty response.");
return null;
}
return respEntity;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void run() {
flushException = e;
}
}
}
}
});
flushThread.setDaemon(true);
flushThread.start();
Expand Down Expand Up @@ -167,8 +167,13 @@ private void asyncFlush() throws Exception {
if (i >= writerOptions.getMaxRetries()) {
throw new IOException(e);
}
if (e instanceof StarRocksStreamLoadFailedException && ((StarRocksStreamLoadFailedException)e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
}
try {
Thread.sleep(1000l * (i + 1));
Thread.sleep(1000l * Math.min(i + 1, 10));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
Expand Down

0 comments on commit f5cedbe

Please sign in to comment.