Skip to content

Commit

Permalink
Pipe: support restarting pipes on datanodes when they are stuck (apac…
Browse files Browse the repository at this point in the history
…he#11955)

Co-authored-by: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
  • Loading branch information
SteveYurongSu and Caideyipi authored Jan 23, 2024
1 parent 827735d commit 15f6330
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public synchronized void preparePipeResources(
public synchronized void start() throws StartupException {
PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent();

registerPeriodicalJob(
"PipeTaskAgent#restartAllStuckPipes",
PipeAgent.task()::restartAllStuckPipes,
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
pipePeriodicalJobExecutor.start();

isShutdown.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
Expand All @@ -33,11 +34,15 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
Expand All @@ -52,8 +57,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.ConfigRegion;

Expand Down Expand Up @@ -309,4 +317,73 @@ private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatRe

PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}

///////////////////////// Restart Logic /////////////////////////

public void restartAllStuckPipes() {
if (!tryWriteLockWithTimeOut(5)) {
return;
}
try {
restartAllStuckPipesInternal();
} finally {
releaseWriteLock();
}
}

private void restartAllStuckPipesInternal() {
final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
PipeExtractorMetrics.getInstance().getExtractorMap();

final Set<PipeMeta> stuckPipes = new HashSet<>();
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final List<IoTDBDataRegionExtractor> extractors =
taskId2ExtractorMap.values().stream()
.filter(e -> e.getPipeName().equals(pipeName))
.collect(Collectors.toList());
if (extractors.isEmpty()
|| !extractors.get(0).isStreamMode()
|| extractors.stream()
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
continue;
}

if (mayLinkedTsFileCountReachDangerousThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
|| mayWalSizeReachThrottleThreshold()) {
LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
stuckPipes.add(pipeMeta);
}
}

// Restart all stuck pipes
stuckPipes.parallelStream().forEach(this::restartStuckPipe);
}

private boolean mayLinkedTsFileCountReachDangerousThreshold() {
return PipeResourceManager.tsfile().getLinkedTsfileCount()
>= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}

private boolean mayMemTablePinnedCountReachDangerousThreshold() {
return PipeResourceManager.wal().getPinnedWalCount()
>= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
}

private boolean mayWalSizeReachThrottleThreshold() {
return 3 * WALManager.getInstance().getTotalDiskUsage()
> 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
}

private void restartStuckPipe(PipeMeta pipeMeta) {
LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta());
final long startTime = System.currentTimeMillis();
handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
handleSinglePipeMetaChangesInternal(pipeMeta);
LOGGER.warn(
"Pipe {} was restarted because of stuck, time cost: {} ms.",
pipeMeta.getStaticMeta(),
System.currentTimeMillis() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,17 @@ public void close() throws Exception {
}
}

//////////////////////////// APIs provided for detecting stuck ////////////////////////////

public boolean isStreamMode() {
return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
|| realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
}

public boolean hasConsumedAllHistoricalTsFiles() {
return historicalExtractor.hasConsumedAll();
}

//////////////////////////// APIs provided for metric framework ////////////////////////////

public String getTaskID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,12 @@ private boolean canNotUseTabletAnyMore() {
// the write operation will be throttled, so we should not extract any more tablet events.
// 3. The number of pinned memtables has reached the dangerous threshold.
// 4. The number of tsfile events in the pending queue has exceeded the limit.
// 5. The number of linked tsfiles has reached the dangerous threshold.
return !isStartedToSupply
|| mayWalSizeReachThrottleThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
|| isTsFileEventCountInQueueExceededLimit();
|| isTsFileEventCountInQueueExceededLimit()
|| mayTsFileLinkedCountReachDangerousThreshold();
}

private boolean mayWalSizeReachThrottleThreshold() {
Expand All @@ -247,6 +249,11 @@ private boolean isTsFileEventCountInQueueExceededLimit() {
>= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}

private boolean mayTsFileLinkedCountReachDangerousThreshold() {
return PipeResourceManager.tsfile().getLinkedTsfileCount()
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}

public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
processorEventCollectorQueueTsFileSize.set(queueSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -46,7 +45,7 @@ public class PipeExtractorMetrics implements IMetricSet {

private AbstractMetricService metricService;

private final Map<String, IoTDBDataRegionExtractor> extractorMap = new HashMap<>();
private final Map<String, IoTDBDataRegionExtractor> extractorMap = new ConcurrentHashMap<>();

private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();

Expand All @@ -56,6 +55,10 @@ public class PipeExtractorMetrics implements IMetricSet {

private final Map<String, Gauge> recentProcessedTsFileEpochStateMap = new ConcurrentHashMap<>();

public Map<String, IoTDBDataRegionExtractor> getExtractorMap() {
return extractorMap;
}

//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,19 @@ public int getConnectorIndex() {
return connectorIndex;
}

public Integer getTsFileInsertionEventCount() {
public int getTsFileInsertionEventCount() {
return inputPendingQueue.getTsFileInsertionEventCount();
}

public Integer getTabletInsertionEventCount() {
public int getTabletInsertionEventCount() {
return inputPendingQueue.getTabletInsertionEventCount();
}

public Integer getPipeHeartbeatEventCount() {
public int getPipeHeartbeatEventCount() {
return inputPendingQueue.getPipeHeartbeatEventCount();
}

public Integer getAsyncConnectorRetryEventQueueSize() {
public int getAsyncConnectorRetryEventQueueSize() {
return outputPipeConnector instanceof IoTDBThriftAsyncConnector
? ((IoTDBThriftAsyncConnector) outputPipeConnector).getRetryEventQueueSize()
: 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ public class CommonConfig {

private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
private int pipeMaxAllowedPinnedMemTableCount = 50;
private long pipeMaxAllowedLinkedTsFileCount = 100;
private long pipeStuckRestartIntervalSeconds = 120;

private boolean pipeMemoryManagementEnabled = true;
private long pipeMemoryAllocateRetryIntervalMs = 1000;
private int pipeMemoryAllocateMaxRetries = 10;
private long pipeMemoryAllocateMinSizeInBytes = 32;
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 * 1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
private float PipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 1024 * 1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;

/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
Expand Down Expand Up @@ -754,6 +756,22 @@ public void setPipeMaxAllowedPinnedMemTableCount(int pipeMaxAllowedPinnedMemTabl
this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount;
}

public long getPipeMaxAllowedLinkedTsFileCount() {
return pipeMaxAllowedLinkedTsFileCount;
}

public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) {
this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
}

public long getPipeStuckRestartIntervalSeconds() {
return pipeStuckRestartIntervalSeconds;
}

public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) {
this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
}

public boolean getPipeMemoryManagementEnabled() {
return pipeMemoryManagementEnabled;
}
Expand Down Expand Up @@ -805,11 +823,11 @@ public void setPipeMemoryAllocateMinSizeInBytes(long pipeMemoryAllocateMinSizeIn
}

public float getPipeLeaderCacheMemoryUsagePercentage() {
return PipeLeaderCacheMemoryUsagePercentage;
return pipeLeaderCacheMemoryUsagePercentage;
}

public void setPipeLeaderCacheMemoryUsagePercentage(float pipeLeaderCacheMemoryUsagePercentage) {
this.PipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage;
this.pipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage;
}

public String getSchemaEngineMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,16 @@ private void loadPipeProps(Properties properties) {
properties.getProperty(
"pipe_max_allowed_pinned_memtable_count",
String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount()))));
config.setPipeMaxAllowedLinkedTsFileCount(
Long.parseLong(
properties.getProperty(
"pipe_max_allowed_linked_tsfile_count",
String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
config.setPipeStuckRestartIntervalSeconds(
Long.parseLong(
properties.getProperty(
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));

config.setPipeMemoryManagementEnabled(
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
}
}

private TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
PipeMeta pipeMetaFromCoordinator) {
// Do nothing if node is removing or removed
if (isShutdown()) {
Expand Down Expand Up @@ -291,7 +291,7 @@ public TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) {
}
}

private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) {
protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) {
// Do nothing if node is removing or removed
if (isShutdown()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ public int getPipeMaxAllowedPinnedMemTableCount() {
return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount();
}

public long getPipeMaxAllowedLinkedTsFileCount() {
return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
}

public long getPipeStuckRestartIntervalSeconds() {
return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
}

/////////////////////////////// Memory ///////////////////////////////

public boolean getPipeMemoryManagementEnabled() {
Expand Down Expand Up @@ -241,6 +249,8 @@ public void printAllConfigs() {
LOGGER.info(
"PipeConnectorRPCThriftCompressionEnabled: {}",
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage());

LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber());
Expand All @@ -264,6 +274,8 @@ public void printAllConfigs() {
"PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", getPipeMaxAllowedPinnedMemTableCount());
LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds());

LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled());
LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries());
Expand All @@ -273,9 +285,6 @@ public void printAllConfigs() {
LOGGER.info(
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());

LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage());
}

/////////////////////////////// Singleton ///////////////////////////////
Expand Down

0 comments on commit 15f6330

Please sign in to comment.