Skip to content

Commit

Permalink
Pipe: make PipeHistoricalDataRegionExtractor & PipeRealtimeDataRegion…
Browse files Browse the repository at this point in the history
…Extractor log info much clear with pipe name and region id added (apache#11920)
  • Loading branch information
SteveYurongSu authored Jan 17, 2024
1 parent 9aceb9f commit e9e01f3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,21 @@ private void constructRealtimeExtractor(PipeParameters parameters) {
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
LOGGER.info(
"'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE_KEY);
"Pipe {}@{}: '{}' is set to false, use fake realtime extractor.",
pipeName,
dataRegionId,
EXTRACTOR_REALTIME_ENABLE_KEY);
return;
}

// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY);
LOGGER.info(
"Pipe {}@{}: '{}' is not set, use hybrid mode by default.",
pipeName,
dataRegionId,
EXTRACTOR_REALTIME_MODE_KEY);
return;
}

Expand All @@ -248,7 +255,9 @@ private void constructRealtimeExtractor(PipeParameters parameters) {
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Unsupported extractor realtime mode: {}, create a hybrid extractor.",
"Pipe {}@{}: Unsupported extractor realtime mode: {}, create a hybrid extractor.",
pipeName,
dataRegionId,
parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY));
}
}
Expand Down Expand Up @@ -321,9 +330,11 @@ private void startHistoricalExtractorAndRealtimeExtractor(
} catch (Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
String.format(
"Start historical extractor %s and realtime extractor %s error.",
historicalExtractor, realtimeExtractor),
"Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.",
pipeName,
dataRegionId,
historicalExtractor,
realtimeExtractor,
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ public void customize(
.contains("time");

LOGGER.info(
"historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}",
"Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}",
pipeName,
dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
historicalDataExtractionStartTime,
DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
Expand Down Expand Up @@ -286,20 +288,42 @@ public synchronized void start() {
dataRegion.writeLock("Pipe: start to extract historical TsFile");
final long startHistoricalExtractionTime = System.currentTimeMillis();
try {
LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, dataRegionId);
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis());
LOGGER.info(
"Pipe {}@{}: finish to flush data region, took {} ms",
pipeName,
dataRegionId,
System.currentTimeMillis() - startHistoricalExtractionTime);
} else {
LOGGER.info(
"Pipe {}@{}: skip to flush data region, last flushed time {} ms ago",
pipeName,
dataRegionId,
System.currentTimeMillis() - lastFlushedByPipeTime);
}
}

final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
try {
final int originalSequenceTsFileCount = tsFileManager.size(true);
final int originalUnsequenceTsFileCount = tsFileManager.size(false);
final List<TsFileResource> resourceList =
new ArrayList<>(tsFileManager.size(true) + tsFileManager.size(false));
new ArrayList<>(originalSequenceTsFileCount + originalUnsequenceTsFileCount);
LOGGER.info(
"Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, "
+ "original unsequence file count {}, start progress index {}",
pipeName,
dataRegionId,
originalSequenceTsFileCount,
originalUnsequenceTsFileCount,
startIndex);

final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
Expand Down Expand Up @@ -343,11 +367,16 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
pendingQueue = new ArrayDeque<>(resourceList);

LOGGER.info(
"Pipe: start to extract historical TsFile, data region {}, "
+ "sequence file count {}, unsequence file count {}, historical extraction time {} ms",
"Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, "
+ "extracted unsequence file count {}/{}, extracted file count {}/{}, took {} ms",
pipeName,
dataRegionId,
sequenceTsFileResources.size(),
originalSequenceTsFileCount,
unsequenceTsFileResources.size(),
originalUnsequenceTsFileCount,
resourceList.size(),
originalSequenceTsFileCount + originalUnsequenceTsFileCount,
System.currentTimeMillis() - startHistoricalExtractionTime);
} finally {
tsFileManager.readUnlock();
Expand All @@ -373,9 +402,12 @@ private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource re
<= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
} catch (IOException e) {
LOGGER.warn(
String.format(
"failed to get the generation time of TsFile %s, extract it anyway",
resource.getTsFilePath()),
"Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway"
+ " (historical data extraction time lower bound: {})",
pipeName,
dataRegionId,
resource.getTsFilePath(),
historicalDataExtractionTimeLowerBound,
e);
// If failed to get the generation time of the TsFile, we will extract the data in the TsFile
// anyway.
Expand Down Expand Up @@ -416,7 +448,9 @@ public synchronized Event supply() {
PipeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (IOException e) {
LOGGER.warn(
"Pipe: failed to unpin TsFileResource after creating event, original path: {}",
"Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}",
pipeName,
dataRegionId,
resource.getTsFilePath());
}

Expand All @@ -441,7 +475,9 @@ public synchronized void close() {
PipeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (IOException e) {
LOGGER.warn(
"Pipe: failed to unpin TsFileResource after dropping pipe, original path: {}",
"Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}",
pipeName,
dataRegionId,
resource.getTsFilePath());
}
});
Expand Down

0 comments on commit e9e01f3

Please sign in to comment.