diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 2414613ad96f..75997e483e26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -73,6 +73,11 @@ public synchronized void preparePipeResources( public synchronized void start() throws StartupException { PipeConfig.getInstance().printAllConfigs(); PipeAgentLauncher.launchPipeTaskAgent(); + + registerPeriodicalJob( + "PipeTaskAgent#restartAllStuckPipes", + PipeAgent.task()::restartAllStuckPipes, + PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds()); pipePeriodicalJobExecutor.start(); isShutdown.set(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java index 15741629389a..a42adb7cdc8a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.task.PipeTask; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta; @@ -33,11 +34,15 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; +import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics; +import org.apache.iotdb.db.pipe.resource.PipeResourceManager; import org.apache.iotdb.db.pipe.task.PipeDataNodeTask; import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder; import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder; import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder; +import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; @@ -52,8 +57,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.ConfigRegion; @@ -309,4 +317,73 @@ private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatRe PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); } + + ///////////////////////// Restart Logic ///////////////////////// + + public void restartAllStuckPipes() { + if (!tryWriteLockWithTimeOut(5)) { + return; + } + try { + restartAllStuckPipesInternal(); + } finally { + releaseWriteLock(); + } + } + + private void restartAllStuckPipesInternal() { + final Map taskId2ExtractorMap = + PipeExtractorMetrics.getInstance().getExtractorMap(); + + final Set stuckPipes = new HashSet<>(); + for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { + final String pipeName = pipeMeta.getStaticMeta().getPipeName(); + final List extractors = + taskId2ExtractorMap.values().stream() + .filter(e -> e.getPipeName().equals(pipeName)) + .collect(Collectors.toList()); + if (extractors.isEmpty() + || !extractors.get(0).isStreamMode() + || extractors.stream() + .noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) { + continue; + } + + if (mayLinkedTsFileCountReachDangerousThreshold() + || mayMemTablePinnedCountReachDangerousThreshold() + || mayWalSizeReachThrottleThreshold()) { + LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta()); + stuckPipes.add(pipeMeta); + } + } + + // Restart all stuck pipes + stuckPipes.parallelStream().forEach(this::restartStuckPipe); + } + + private boolean mayLinkedTsFileCountReachDangerousThreshold() { + return PipeResourceManager.tsfile().getLinkedTsfileCount() + >= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + } + + private boolean mayMemTablePinnedCountReachDangerousThreshold() { + return PipeResourceManager.wal().getPinnedWalCount() + >= 10 * PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount(); + } + + private boolean mayWalSizeReachThrottleThreshold() { + return 3 * WALManager.getInstance().getTotalDiskUsage() + > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold(); + } + + private void restartStuckPipe(PipeMeta pipeMeta) { + LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta()); + final long startTime = System.currentTimeMillis(); + handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName()); + handleSinglePipeMetaChangesInternal(pipeMeta); + LOGGER.warn( + "Pipe {} was restarted because of stuck, time cost: {} ms.", + pipeMeta.getStaticMeta(), + System.currentTimeMillis() - startTime); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index 58d274b82b11..c13f4071a67c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -372,6 +372,17 @@ public void close() throws Exception { } } + //////////////////////////// APIs provided for detecting stuck //////////////////////////// + + public boolean isStreamMode() { + return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor + || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor; + } + + public boolean hasConsumedAllHistoricalTsFiles() { + return historicalExtractor.hasConsumedAll(); + } + //////////////////////////// APIs provided for metric framework //////////////////////////// public String getTaskID() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 91b786bfffa3..568c96b52636 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -224,10 +224,12 @@ private boolean canNotUseTabletAnyMore() { // the write operation will be throttled, so we should not extract any more tablet events. // 3. The number of pinned memtables has reached the dangerous threshold. // 4. The number of tsfile events in the pending queue has exceeded the limit. + // 5. The number of linked tsfiles has reached the dangerous threshold. return !isStartedToSupply || mayWalSizeReachThrottleThreshold() || mayMemTablePinnedCountReachDangerousThreshold() - || isTsFileEventCountInQueueExceededLimit(); + || isTsFileEventCountInQueueExceededLimit() + || mayTsFileLinkedCountReachDangerousThreshold(); } private boolean mayWalSizeReachThrottleThreshold() { @@ -247,6 +249,11 @@ private boolean isTsFileEventCountInQueueExceededLimit() { >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); } + private boolean mayTsFileLinkedCountReachDangerousThreshold() { + return PipeResourceManager.tsfile().getLinkedTsfileCount() + >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); + } + public void informProcessorEventCollectorQueueTsFileSize(int queueSize) { processorEventCollectorQueueTsFileSize.set(queueSize); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java index f56672c7d6d5..b9dfb0c87738 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java @@ -35,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -46,7 +45,7 @@ public class PipeExtractorMetrics implements IMetricSet { private AbstractMetricService metricService; - private final Map extractorMap = new HashMap<>(); + private final Map extractorMap = new ConcurrentHashMap<>(); private final Map tabletRateMap = new ConcurrentHashMap<>(); @@ -56,6 +55,10 @@ public class PipeExtractorMetrics implements IMetricSet { private final Map recentProcessedTsFileEpochStateMap = new ConcurrentHashMap<>(); + public Map getExtractorMap() { + return extractorMap; + } + //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index 061944af1a08..967ca2d6b86e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -355,19 +355,19 @@ public int getConnectorIndex() { return connectorIndex; } - public Integer getTsFileInsertionEventCount() { + public int getTsFileInsertionEventCount() { return inputPendingQueue.getTsFileInsertionEventCount(); } - public Integer getTabletInsertionEventCount() { + public int getTabletInsertionEventCount() { return inputPendingQueue.getTabletInsertionEventCount(); } - public Integer getPipeHeartbeatEventCount() { + public int getPipeHeartbeatEventCount() { return inputPendingQueue.getPipeHeartbeatEventCount(); } - public Integer getAsyncConnectorRetryEventQueueSize() { + public int getAsyncConnectorRetryEventQueueSize() { return outputPipeConnector instanceof IoTDBThriftAsyncConnector ? ((IoTDBThriftAsyncConnector) outputPipeConnector).getRetryEventQueueSize() : 0; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index a461452d8076..11fd755a8ca6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -185,14 +185,16 @@ public class CommonConfig { private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2; private int pipeMaxAllowedPinnedMemTableCount = 50; + private long pipeMaxAllowedLinkedTsFileCount = 100; + private long pipeStuckRestartIntervalSeconds = 120; private boolean pipeMemoryManagementEnabled = true; private long pipeMemoryAllocateRetryIntervalMs = 1000; private int pipeMemoryAllocateMaxRetries = 10; private long pipeMemoryAllocateMinSizeInBytes = 32; - private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 * 1024; // 2MB - private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min - private float PipeLeaderCacheMemoryUsagePercentage = 0.1F; + private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 1024 * 1024; // 2MB + private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min + private float pipeLeaderCacheMemoryUsagePercentage = 0.1F; /** Whether to use persistent schema mode. */ private String schemaEngineMode = "Memory"; @@ -754,6 +756,22 @@ public void setPipeMaxAllowedPinnedMemTableCount(int pipeMaxAllowedPinnedMemTabl this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount; } + public long getPipeMaxAllowedLinkedTsFileCount() { + return pipeMaxAllowedLinkedTsFileCount; + } + + public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) { + this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount; + } + + public long getPipeStuckRestartIntervalSeconds() { + return pipeStuckRestartIntervalSeconds; + } + + public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { + this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds; + } + public boolean getPipeMemoryManagementEnabled() { return pipeMemoryManagementEnabled; } @@ -805,11 +823,11 @@ public void setPipeMemoryAllocateMinSizeInBytes(long pipeMemoryAllocateMinSizeIn } public float getPipeLeaderCacheMemoryUsagePercentage() { - return PipeLeaderCacheMemoryUsagePercentage; + return pipeLeaderCacheMemoryUsagePercentage; } public void setPipeLeaderCacheMemoryUsagePercentage(float pipeLeaderCacheMemoryUsagePercentage) { - this.PipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage; + this.pipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage; } public String getSchemaEngineMode() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index ab51e7b6428f..43ec8f911b44 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -427,6 +427,16 @@ private void loadPipeProps(Properties properties) { properties.getProperty( "pipe_max_allowed_pinned_memtable_count", String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount())))); + config.setPipeMaxAllowedLinkedTsFileCount( + Long.parseLong( + properties.getProperty( + "pipe_max_allowed_linked_tsfile_count", + String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount())))); + config.setPipeStuckRestartIntervalSeconds( + Long.parseLong( + properties.getProperty( + "pipe_stuck_restart_interval_seconds", + String.valueOf(config.getPipeStuckRestartIntervalSeconds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 0a32cd85dfca..3b7bf7712f7d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -120,7 +120,7 @@ public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( } } - private TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal( + protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal( PipeMeta pipeMetaFromCoordinator) { // Do nothing if node is removing or removed if (isShutdown()) { @@ -291,7 +291,7 @@ public TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) { } } - private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) { + protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) { // Do nothing if node is removing or removed if (isShutdown()) { return null; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 32fb5b0fdbbb..74a6711a96b4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -173,6 +173,14 @@ public int getPipeMaxAllowedPinnedMemTableCount() { return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount(); } + public long getPipeMaxAllowedLinkedTsFileCount() { + return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount(); + } + + public long getPipeStuckRestartIntervalSeconds() { + return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds(); + } + /////////////////////////////// Memory /////////////////////////////// public boolean getPipeMemoryManagementEnabled() { @@ -241,6 +249,8 @@ public void printAllConfigs() { LOGGER.info( "PipeConnectorRPCThriftCompressionEnabled: {}", isPipeConnectorRPCThriftCompressionEnabled()); + LOGGER.info( + "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber()); LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber()); @@ -264,6 +274,8 @@ public void printAllConfigs() { "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}", getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", getPipeMaxAllowedPinnedMemTableCount()); + LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount()); + LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); LOGGER.info("PipeMemoryManagementEnabled: {}", getPipeMemoryManagementEnabled()); LOGGER.info("PipeMemoryAllocateMaxRetries: {}", getPipeMemoryAllocateMaxRetries()); @@ -273,9 +285,6 @@ public void printAllConfigs() { LOGGER.info( "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}", getPipeMemoryAllocateForTsFileSequenceReaderInBytes()); - - LOGGER.info( - "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); } /////////////////////////////// Singleton ///////////////////////////////