From e9e01f3bdba9c388eb73a94a24b4c3706f3ef133 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Wed, 17 Jan 2024 18:41:45 +0800 Subject: [PATCH] Pipe: make PipeHistoricalDataRegionExtractor & PipeRealtimeDataRegionExtractor log info much clear with pipe name and region id added (#11920) --- .../extractor/IoTDBDataRegionExtractor.java | 23 +++++--- ...peHistoricalDataRegionTsFileExtractor.java | 54 +++++++++++++++---- 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index dfa8483ca34b..58d274b82b11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -220,14 +220,21 @@ private void constructRealtimeExtractor(PipeParameters parameters) { EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor(); LOGGER.info( - "'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE_KEY); + "Pipe {}@{}: '{}' is set to false, use fake realtime extractor.", + pipeName, + dataRegionId, + EXTRACTOR_REALTIME_ENABLE_KEY); return; } // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); - LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); + LOGGER.info( + "Pipe {}@{}: '{}' is not set, use hybrid mode by default.", + pipeName, + dataRegionId, + EXTRACTOR_REALTIME_MODE_KEY); return; } @@ -248,7 +255,9 @@ private void constructRealtimeExtractor(PipeParameters parameters) { realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); if (LOGGER.isWarnEnabled()) { LOGGER.warn( - "Unsupported extractor realtime mode: {}, create a hybrid extractor.", + "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a hybrid extractor.", + pipeName, + dataRegionId, parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)); } } @@ -321,9 +330,11 @@ private void startHistoricalExtractorAndRealtimeExtractor( } catch (Exception e) { exceptionHolder.set(e); LOGGER.warn( - String.format( - "Start historical extractor %s and realtime extractor %s error.", - historicalExtractor, realtimeExtractor), + "Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.", + pipeName, + dataRegionId, + historicalExtractor, + realtimeExtractor, e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 917c4cd33081..a64d2a4ae7ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -251,7 +251,9 @@ public void customize( .contains("time"); LOGGER.info( - "historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}", + "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}", + pipeName, + dataRegionId, DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime), historicalDataExtractionStartTime, DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime), @@ -286,20 +288,42 @@ public synchronized void start() { dataRegion.writeLock("Pipe: start to extract historical TsFile"); final long startHistoricalExtractionTime = System.currentTimeMillis(); try { + LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, dataRegionId); synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { dataRegion.syncCloseAllWorkingTsFileProcessors(); DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + LOGGER.info( + "Pipe {}@{}: finish to flush data region, took {} ms", + pipeName, + dataRegionId, + System.currentTimeMillis() - startHistoricalExtractionTime); + } else { + LOGGER.info( + "Pipe {}@{}: skip to flush data region, last flushed time {} ms ago", + pipeName, + dataRegionId, + System.currentTimeMillis() - lastFlushedByPipeTime); } } final TsFileManager tsFileManager = dataRegion.getTsFileManager(); tsFileManager.readLock(); try { + final int originalSequenceTsFileCount = tsFileManager.size(true); + final int originalUnsequenceTsFileCount = tsFileManager.size(false); final List resourceList = - new ArrayList<>(tsFileManager.size(true) + tsFileManager.size(false)); + new ArrayList<>(originalSequenceTsFileCount + originalUnsequenceTsFileCount); + LOGGER.info( + "Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, " + + "original unsequence file count {}, start progress index {}", + pipeName, + dataRegionId, + originalSequenceTsFileCount, + originalUnsequenceTsFileCount, + startIndex); final Collection sequenceTsFileResources = tsFileManager.getTsFileList(true).stream() @@ -343,11 +367,16 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) pendingQueue = new ArrayDeque<>(resourceList); LOGGER.info( - "Pipe: start to extract historical TsFile, data region {}, " - + "sequence file count {}, unsequence file count {}, historical extraction time {} ms", + "Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, " + + "extracted unsequence file count {}/{}, extracted file count {}/{}, took {} ms", + pipeName, dataRegionId, sequenceTsFileResources.size(), + originalSequenceTsFileCount, unsequenceTsFileResources.size(), + originalUnsequenceTsFileCount, + resourceList.size(), + originalSequenceTsFileCount + originalUnsequenceTsFileCount, System.currentTimeMillis() - startHistoricalExtractionTime); } finally { tsFileManager.readUnlock(); @@ -373,9 +402,12 @@ private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource re <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); } catch (IOException e) { LOGGER.warn( - String.format( - "failed to get the generation time of TsFile %s, extract it anyway", - resource.getTsFilePath()), + "Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway" + + " (historical data extraction time lower bound: {})", + pipeName, + dataRegionId, + resource.getTsFilePath(), + historicalDataExtractionTimeLowerBound, e); // If failed to get the generation time of the TsFile, we will extract the data in the TsFile // anyway. @@ -416,7 +448,9 @@ public synchronized Event supply() { PipeResourceManager.tsfile().unpinTsFileResource(resource); } catch (IOException e) { LOGGER.warn( - "Pipe: failed to unpin TsFileResource after creating event, original path: {}", + "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", + pipeName, + dataRegionId, resource.getTsFilePath()); } @@ -441,7 +475,9 @@ public synchronized void close() { PipeResourceManager.tsfile().unpinTsFileResource(resource); } catch (IOException e) { LOGGER.warn( - "Pipe: failed to unpin TsFileResource after dropping pipe, original path: {}", + "Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", + pipeName, + dataRegionId, resource.getTsFilePath()); } });