Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proceed to next time window when no segment match for RealtimeToOfflineTask #7814

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
Map<String, TaskState> incompleteTasks =
TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
if (!incompleteTasks.isEmpty()) {
LOGGER
.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(),
realtimeTableName);
LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.",
incompleteTasks.keySet(), realtimeTableName);
continue;
}

Expand All @@ -132,16 +131,15 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestCompletedSegmentName,
allPartitions);
if (completedSegmentsZKMetadata.isEmpty()) {
LOGGER
.info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
taskType);
LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}",
realtimeTableName, taskType);
continue;
}
allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
if (!allPartitions.isEmpty()) {
LOGGER
.info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
allPartitions, realtimeTableName, taskType);
LOGGER.info(
"Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
allPartitions, realtimeTableName, taskType);
continue;
}

Expand All @@ -163,47 +161,53 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs);
long windowEndMs = windowStartMs + bucketMs;

// Check that execution window is older than bufferTime
if (windowEndMs > System.currentTimeMillis() - bufferMs) {
LOGGER.info(
"Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+ "generation: {}",
windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
continue;
}

// Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd
// (exclusive)
List<String> segmentNames = new ArrayList<>();
List<String> downloadURLs = new ArrayList<>();
Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
boolean skipGenerate = false;
for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();

// Check overlap with window
if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
// If last completed segment is being used, make sure that segment crosses over end of window.
// In the absence of this check, CONSUMING segments could contain some portion of the window. That data
// would be skipped forever.
if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
LOGGER.info(
"Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ "generation: {}",
segmentName, taskType);
skipGenerate = true;
break;
while (true) {
// Check that execution window is older than bufferTime
if (windowEndMs > System.currentTimeMillis() - bufferMs) {
LOGGER.info(
"Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+ "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
skipGenerate = true;
break;
}

for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();

// Check overlap with window
if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
// If last completed segment is being used, make sure that segment crosses over end of window.
// In the absence of this check, CONSUMING segments could contain some portion of the window. That data
// would be skipped forever.
if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ "generation: {}", segmentName, taskType);
skipGenerate = true;
break;
}
segmentNames.add(segmentName);
downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
segmentNames.add(segmentName);
downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
if (skipGenerate || !segmentNames.isEmpty()) {
break;
}

LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
taskType, windowStartMs, windowEndMs);
windowStartMs = windowEndMs;
windowEndMs += bucketMs;
}

if (segmentNames.isEmpty() || skipGenerate) {
LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType,
windowStartMs, windowEndMs);
if (skipGenerate) {
continue;
}

Expand Down Expand Up @@ -264,8 +268,8 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe

if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
completedSegmentsZKMetadata.add(segmentZKMetadata);
latestLLCSegmentNameMap
.compute(llcSegmentName.getPartitionGroupId(), (partitionGroupId, latestLLCSegmentName) -> {
latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
(partitionGroupId, latestLLCSegmentName) -> {
if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
Expand All @@ -291,11 +295,12 @@ private void getCompletedSegmentsInfo(String realtimeTableName, List<SegmentZKMe
*/
private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata,
long bucketMs) {
ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor
.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
ZNRecord realtimeToOfflineZNRecord =
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
realtimeTableName);
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata
.fromZNRecord(realtimeToOfflineZNRecord) : null;
realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
realtimeToOfflineZNRecord) : null;

if (realtimeToOfflineSegmentsTaskMetadata == null) {
// No ZNode exists. Cold-start.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -403,6 +404,37 @@ public void testOverflowIntoConsuming() {
assertEquals(pinotTaskConfigs.size(), 1);
}

/**
* Tests for task generation when there is time gap between segments.
*/
@Test
public void testTimeGap() {
Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);

ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE,
REALTIME_TABLE_NAME)).thenReturn(
new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590220800000L, 1590307200000L,
TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
Collections.singletonList(segmentZKMetadata));

RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
generator.init(mockClusterInfoProvide);

// Generated task should skip 2 days and have time window of [23 May 2020 UTC, 24 May 2020 UTC)
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590192000000");
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590278400000");
}

@Test
public void testBuffer() {
Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
Expand Down