Skip to content

Commit

Permalink
Pipe: Support pipe's endpoint & global connector transfer rate limit …
Browse files Browse the repository at this point in the history
…& Fix RPC compression not enabled in stream batch mode (apache#12543)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
2 people authored and SzyWilliam committed Nov 22, 2024
1 parent bc2d4f9 commit 515ba63
Show file tree
Hide file tree
Showing 29 changed files with 583 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Objects;

Expand All @@ -56,9 +55,8 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector {

@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
return compressIfNeeded(
PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}

@Override
Expand All @@ -71,7 +69,7 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

return compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

@Override
Expand Down Expand Up @@ -107,7 +105,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc
@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
final Socket socket = sockets.get(socketIndex);
final AirGapSocket socket = sockets.get(socketIndex);

try {
if (event instanceof PipeConfigRegionWritePlanEvent) {
Expand All @@ -131,7 +129,8 @@ public void transfer(final Event event) throws Exception {
}

private void doTransferWrapper(
final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -147,13 +146,14 @@ private void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeConfigRegionWritePlanEvent.getPipeName(),
socket,
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
Expand All @@ -170,7 +170,7 @@ private void doTransfer(
}

private void doTransferWrapper(
final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -186,29 +186,30 @@ private void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();

// 1. Transfer snapshotFile, and template file if exists
transferFilePieces(snapshot, socket, true);
transferFilePieces(pipeName, snapshot, socket, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(templateFile, socket, true);
transferFilePieces(pipeName, templateFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
if (!send(
pipeName,
socket,
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getPatternString(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getPatternString(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.", snapshot, socket);
// Send handshake because we don't know whether the receiver side configNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -123,13 +124,15 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri

final TPipeTransferResp resp;
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferReq(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferReq(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
rateLimitIfNeeded(
pipeConfigRegionWritePlanEvent.getPipeName(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
Expand Down Expand Up @@ -177,32 +180,35 @@ private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigReg

private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
throws PipeException, IOException {
final String pipeName = snapshotEvent.getPipeName();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();

// 1. Transfer snapshotFile, and template File if exists
transferFilePieces(snapshotFile, clientAndStatus, true);
transferFilePieces(pipeName, snapshotFile, clientAndStatus, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(templateFile, clientAndStatus, true);
transferFilePieces(pipeName, templateFile, clientAndStatus, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
final TPipeTransferResp resp;
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
// The pattern is surely Non-null
snapshotEvent.getPatternString(),
snapshotFile.getName(),
snapshotFile.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
snapshotEvent.getFileType(),
snapshotEvent.toSealTypeString())));
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
// The pattern is surely Non-null
snapshotEvent.getPatternString(),
snapshotFile.getName(),
snapshotFile.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
snapshotEvent.getFileType(),
snapshotEvent.toSealTypeString()));
rateLimitIfNeeded(
snapshotEvent.getPipeName(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ public class IoTDBConfig {

private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min

private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; // Bytes/s
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s

/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during querying */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"load_write_throughput_bytes_per_sec",
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));

conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
Expand Down Expand Up @@ -1718,9 +1718,19 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"load_write_throughput_bytes_per_sec",
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));

// update pipe config
commonDescriptor
.getConfig()
.setPipeAllSinksRateLimitBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"pipe_all_sinks_rate_limit_bytes_per_second",
String.valueOf(
commonDescriptor.getConfig().getPipeAllSinksRateLimitBytesPerSecond()))));

// update merge_threshold_of_explain_analyze
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeEventBatch implements AutoCloseable {
Expand All @@ -61,6 +63,9 @@ public class PipeEventBatch implements AutoCloseable {
private final PipeMemoryBlock allocatedMemoryBlock;
private long totalBufferSize = 0;

// Used to rate limit when transferring data
private final Map<String, Long> pipeName2BytesAccumulated = new HashMap<>();

public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
this.maxDelayInMs = maxDelayInMs;
this.allocatedMemoryBlock =
Expand Down Expand Up @@ -112,6 +117,10 @@ public synchronized boolean onEvent(final TabletInsertionEvent event)

final int bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
pipeName2BytesAccumulated.compute(
((EnrichedEvent) event).getPipeName(),
(pipeName, bytesAccumulated) ->
bytesAccumulated == null ? bufferSize : bytesAccumulated + bufferSize);

if (firstEventProcessingTime == Long.MIN_VALUE) {
firstEventProcessingTime = System.currentTimeMillis();
Expand All @@ -137,6 +146,7 @@ public synchronized void onSuccess() {
firstEventProcessingTime = Long.MIN_VALUE;

totalBufferSize = 0;
pipeName2BytesAccumulated.clear();
}

public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
Expand All @@ -160,6 +170,14 @@ public List<Long> deepCopyRequestCommitIds() {
return new ArrayList<>(requestCommitIds);
}

public Map<String, Long> deepCopyPipeName2BytesAccumulated() {
return new HashMap<>(pipeName2BytesAccumulated);
}

public Map<String, Long> getPipeName2BytesAccumulated() {
return pipeName2BytesAccumulated;
}

private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
final ByteBuffer buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Set;
Expand Down Expand Up @@ -88,9 +87,8 @@ protected boolean mayNeedHandshakeWhenFail() {

@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
return compressIfNeeded(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}

@Override
Expand All @@ -103,11 +101,12 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

return compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

protected void doTransferWrapper(
final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -123,13 +122,14 @@ protected void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeSchemaRegionWritePlanEvent.getPipeName(),
socket,
compressIfNeeded(
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
Expand Down
Loading

0 comments on commit 515ba63

Please sign in to comment.