From 84f6dc4df213f3416a46029971301d61e91f847b Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Mon, 22 Nov 2021 14:44:02 -0800 Subject: [PATCH] Proceed to next time window when no segment match for RealtimeToOfflineTask --- ...ealtimeToOfflineSegmentsTaskGenerator.java | 97 ++++++++++--------- ...imeToOfflineSegmentsTaskGeneratorTest.java | 32 ++++++ 2 files changed, 83 insertions(+), 46 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java index b715e550e32c..7b0b49022686 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java @@ -119,9 +119,8 @@ public List generateTasks(List tableConfigs) { Map incompleteTasks = TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor); if (!incompleteTasks.isEmpty()) { - LOGGER - .warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(), - realtimeTableName); + LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", + incompleteTasks.keySet(), realtimeTableName); continue; } @@ -132,16 +131,15 @@ public List generateTasks(List tableConfigs) { getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestCompletedSegmentName, allPartitions); if (completedSegmentsZKMetadata.isEmpty()) { - LOGGER - .info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName, - taskType); + LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}", + realtimeTableName, taskType); continue; } allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet()); if (!allPartitions.isEmpty()) { - LOGGER - .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", - allPartitions, realtimeTableName, taskType); + LOGGER.info( + "Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", + allPartitions, realtimeTableName, taskType); continue; } @@ -163,47 +161,53 @@ public List generateTasks(List tableConfigs) { long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs); long windowEndMs = windowStartMs + bucketMs; - // Check that execution window is older than bufferTime - if (windowEndMs > System.currentTimeMillis() - bufferMs) { - LOGGER.info( - "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " - + "generation: {}", - windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); - continue; - } - // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd // (exclusive) List segmentNames = new ArrayList<>(); List downloadURLs = new ArrayList<>(); Set lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values()); boolean skipGenerate = false; - for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { - String segmentName = segmentZKMetadata.getSegmentName(); - long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); - long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); - - // Check overlap with window - if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { - // If last completed segment is being used, make sure that segment crosses over end of window. - // In the absence of this check, CONSUMING segments could contain some portion of the window. That data - // would be skipped forever. - if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { - LOGGER.info( - "Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " - + "generation: {}", - segmentName, taskType); - skipGenerate = true; - break; + while (true) { + // Check that execution window is older than bufferTime + if (windowEndMs > System.currentTimeMillis() - bufferMs) { + LOGGER.info( + "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task " + + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType); + skipGenerate = true; + break; + } + + for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) { + String segmentName = segmentZKMetadata.getSegmentName(); + long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs(); + long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs(); + + // Check overlap with window + if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) { + // If last completed segment is being used, make sure that segment crosses over end of window. + // In the absence of this check, CONSUMING segments could contain some portion of the window. That data + // would be skipped forever. + if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) { + LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task " + + "generation: {}", segmentName, taskType); + skipGenerate = true; + break; + } + segmentNames.add(segmentName); + downloadURLs.add(segmentZKMetadata.getDownloadUrl()); } - segmentNames.add(segmentName); - downloadURLs.add(segmentZKMetadata.getDownloadUrl()); } + if (skipGenerate || !segmentNames.isEmpty()) { + break; + } + + LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", + taskType, windowStartMs, windowEndMs); + windowStartMs = windowEndMs; + windowEndMs += bucketMs; } - if (segmentNames.isEmpty() || skipGenerate) { - LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType, - windowStartMs, windowEndMs); + if (skipGenerate) { continue; } @@ -264,8 +268,8 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List { + latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), + (partitionGroupId, latestLLCSegmentName) -> { if (latestLLCSegmentName == null) { return llcSegmentName; } else { @@ -291,11 +295,12 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List completedSegmentsZKMetadata, long bucketMs) { - ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor - .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName); + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = - realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata - .fromZNRecord(realtimeToOfflineZNRecord) : null; + realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord( + realtimeToOfflineZNRecord) : null; if (realtimeToOfflineSegmentsTaskMetadata == null) { // No ZNode exists. Cold-start. diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java index fc71b5bd939b..000383b7ff02 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -403,6 +404,37 @@ public void testOverflowIntoConsuming() { assertEquals(pinotTaskConfigs.size(), 1); } + /** + * Tests for task generation when there is time gap between segments. + */ + @Test + public void testTimeGap() { + Map> taskConfigsMap = new HashMap<>(); + taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()); + TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap); + + ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); + when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>()); + when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, + REALTIME_TABLE_NAME)).thenReturn( + new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC + SegmentZKMetadata segmentZKMetadata = + getSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590220800000L, 1590307200000L, + TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC + when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Collections.singletonList(segmentZKMetadata)); + + RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator(); + generator.init(mockClusterInfoProvide); + + // Generated task should skip 2 days and have time window of [23 May 2020 UTC, 24 May 2020 UTC) + List pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig)); + assertEquals(pinotTaskConfigs.size(), 1); + Map configs = pinotTaskConfigs.get(0).getConfigs(); + assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590192000000"); + assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590278400000"); + } + @Test public void testBuffer() { Map> taskConfigsMap = new HashMap<>();