Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Support pipe's endpoint & global connector transfer rate limit & Fix RPC compression not enabled in stream batch mode #12543

Merged
merged 30 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8b385a6
init
MiniSho May 17, 2024
c459966
Merge branch 'master' into connector-rate-limiter
MiniSho May 22, 2024
0db94ca
resolve conflicts
MiniSho May 22, 2024
794781d
fix
MiniSho May 22, 2024
3d0eb2d
Merge branch 'master' into connector-rate-limiter
MiniSho May 22, 2024
b15a71c
add allConnectorsRateLimiter
MiniSho May 23, 2024
3fa71d9
fix
MiniSho May 23, 2024
b0da2ae
improve
MiniSho May 23, 2024
d454742
fix
MiniSho May 23, 2024
785d625
Merge branch 'master' into connector-rate-limiter
MiniSho May 24, 2024
c4c388e
fix
MiniSho May 24, 2024
7afbe4e
refactor
SteveYurongSu May 24, 2024
366c754
Merge branch 'master' of https://github.com/apache/iotdb into pr/12543
SteveYurongSu May 24, 2024
981e231
refactor
SteveYurongSu May 24, 2024
ab97e2d
fix compression
SteveYurongSu May 24, 2024
ff61b62
refactor
SteveYurongSu May 24, 2024
863d12d
refactor
SteveYurongSu May 24, 2024
a7b49b7
Update IoTDBDataRegionSyncConnector.java
SteveYurongSu May 24, 2024
ea2ac8d
Update LoadTsFileRateLimiter.java
SteveYurongSu May 24, 2024
22d2af1
refactor
SteveYurongSu May 24, 2024
8484c92
Update PipeEndPointRateLimiter.java
SteveYurongSu May 24, 2024
6780147
refactor
SteveYurongSu May 24, 2024
4092025
...
SteveYurongSu May 24, 2024
e14ea37
fix
SteveYurongSu May 24, 2024
75f4d2c
fix
SteveYurongSu May 24, 2024
d9ed381
fix
SteveYurongSu May 24, 2024
8c8228d
fix
SteveYurongSu May 24, 2024
4460e4a
fix
SteveYurongSu May 27, 2024
1a7e955
Merge branch 'master' of https://github.com/apache/iotdb into pr/12543
SteveYurongSu May 27, 2024
8107cfb
refactor
SteveYurongSu May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Objects;

Expand All @@ -56,9 +55,8 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector {

@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
return compressIfNeeded(
PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}

@Override
Expand All @@ -71,7 +69,7 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

return compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

@Override
Expand Down Expand Up @@ -107,7 +105,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc
@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
final Socket socket = sockets.get(socketIndex);
final AirGapSocket socket = sockets.get(socketIndex);

try {
if (event instanceof PipeConfigRegionWritePlanEvent) {
Expand All @@ -131,7 +129,8 @@ public void transfer(final Event event) throws Exception {
}

private void doTransferWrapper(
final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -147,13 +146,14 @@ private void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeConfigRegionWritePlanEvent.getPipeName(),
socket,
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
Expand All @@ -170,7 +170,7 @@ private void doTransfer(
}

private void doTransferWrapper(
final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -186,29 +186,30 @@ private void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();

// 1. Transfer snapshotFile, and template file if exists
transferFilePieces(snapshot, socket, true);
transferFilePieces(pipeName, snapshot, socket, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(templateFile, socket, true);
transferFilePieces(pipeName, templateFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
if (!send(
pipeName,
socket,
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getPatternString(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getPatternString(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.", snapshot, socket);
// Send handshake because we don't know whether the receiver side configNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -123,13 +124,15 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri

final TPipeTransferResp resp;
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferReq(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferReq(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
rateLimitIfNeeded(
pipeConfigRegionWritePlanEvent.getPipeName(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
Expand Down Expand Up @@ -177,32 +180,35 @@ private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigReg

private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
throws PipeException, IOException {
final String pipeName = snapshotEvent.getPipeName();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient();

// 1. Transfer snapshotFile, and template File if exists
transferFilePieces(snapshotFile, clientAndStatus, true);
transferFilePieces(pipeName, snapshotFile, clientAndStatus, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(templateFile, clientAndStatus, true);
transferFilePieces(pipeName, templateFile, clientAndStatus, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
final TPipeTransferResp resp;
try {
resp =
clientAndStatus
.getLeft()
.pipeTransfer(
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
// The pattern is surely Non-null
snapshotEvent.getPatternString(),
snapshotFile.getName(),
snapshotFile.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
snapshotEvent.getFileType(),
snapshotEvent.toSealTypeString())));
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
// The pattern is surely Non-null
snapshotEvent.getPatternString(),
snapshotFile.getName(),
snapshotFile.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
snapshotEvent.getFileType(),
snapshotEvent.toSealTypeString()));
rateLimitIfNeeded(
snapshotEvent.getPipeName(),
clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ public class IoTDBConfig {

private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min

private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; // Bytes/s
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s

/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during querying */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"load_write_throughput_bytes_per_sec",
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));

conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
Expand Down Expand Up @@ -1718,9 +1718,19 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"load_write_throughput_bytes_per_sec",
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));

// update pipe config
commonDescriptor
.getConfig()
.setPipeAllSinksRateLimitBytesPerSecond(
Double.parseDouble(
properties.getProperty(
"pipe_all_sinks_rate_limit_bytes_per_second",
String.valueOf(
commonDescriptor.getConfig().getPipeAllSinksRateLimitBytesPerSecond()))));

// update merge_threshold_of_explain_analyze
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeEventBatch implements AutoCloseable {
Expand All @@ -61,6 +63,9 @@ public class PipeEventBatch implements AutoCloseable {
private final PipeMemoryBlock allocatedMemoryBlock;
private long totalBufferSize = 0;

// Used to rate limit when transferring data
private final Map<String, Long> pipeName2BytesAccumulated = new HashMap<>();

public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
this.maxDelayInMs = maxDelayInMs;
this.allocatedMemoryBlock =
Expand Down Expand Up @@ -112,6 +117,10 @@ public synchronized boolean onEvent(final TabletInsertionEvent event)

final int bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
pipeName2BytesAccumulated.compute(
((EnrichedEvent) event).getPipeName(),
(pipeName, bytesAccumulated) ->
bytesAccumulated == null ? bufferSize : bytesAccumulated + bufferSize);

if (firstEventProcessingTime == Long.MIN_VALUE) {
firstEventProcessingTime = System.currentTimeMillis();
Expand All @@ -137,6 +146,7 @@ public synchronized void onSuccess() {
firstEventProcessingTime = Long.MIN_VALUE;

totalBufferSize = 0;
pipeName2BytesAccumulated.clear();
}

public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
Expand All @@ -160,6 +170,14 @@ public List<Long> deepCopyRequestCommitIds() {
return new ArrayList<>(requestCommitIds);
}

public Map<String, Long> deepCopyPipeName2BytesAccumulated() {
return new HashMap<>(pipeName2BytesAccumulated);
}

public Map<String, Long> getPipeName2BytesAccumulated() {
return pipeName2BytesAccumulated;
}

private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
final ByteBuffer buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Set;
Expand Down Expand Up @@ -88,9 +87,8 @@ protected boolean mayNeedHandshakeWhenFail() {

@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
return compressIfNeeded(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}

@Override
Expand All @@ -103,11 +101,12 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

return compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

protected void doTransferWrapper(
final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -123,13 +122,14 @@ protected void doTransferWrapper(
}

private void doTransfer(
final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeSchemaRegionWritePlanEvent.getPipeName(),
socket,
compressIfNeeded(
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
Expand Down
Loading
Loading