Skip to content

Commit 6453f9d

Browse files
committed
Error samples data service changed to iterate over partitions and stop when data is found.
1 parent 8e62053 commit 6453f9d

File tree

1 file changed

+88
-74
lines changed

1 file changed

+88
-74
lines changed

dqops/src/main/java/com/dqops/data/errorsamples/services/ErrorSamplesDataServiceImpl.java

+88-74
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import com.dqops.data.errorsamples.snapshot.ErrorSamplesSnapshot;
2828
import com.dqops.data.errorsamples.snapshot.ErrorSamplesSnapshotFactory;
2929
import com.dqops.data.normalization.CommonTableNormalizationService;
30+
import com.dqops.data.storage.LoadedMonthlyPartition;
31+
import com.dqops.data.storage.ParquetPartitionId;
3032
import com.dqops.metadata.id.HierarchyId;
3133
import com.dqops.metadata.sources.PhysicalTableName;
3234
import com.dqops.metadata.timeseries.TimePeriodGradient;
3335
import com.dqops.utils.datetime.LocalDateTimePeriodUtility;
3436
import com.dqops.utils.datetime.LocalDateTimeTruncateUtility;
3537
import com.dqops.utils.tables.TableRowUtility;
3638
import com.google.common.base.Strings;
39+
import com.google.common.collect.Lists;
3740
import org.apache.commons.lang3.StringUtils;
3841
import org.jetbrains.annotations.NotNull;
3942
import org.springframework.beans.factory.annotation.Autowired;
@@ -47,10 +50,7 @@
4750
import java.time.Instant;
4851
import java.time.LocalDate;
4952
import java.time.LocalDateTime;
50-
import java.util.LinkedHashMap;
51-
import java.util.List;
52-
import java.util.Map;
53-
import java.util.Objects;
53+
import java.util.*;
5454
import java.util.stream.Collectors;
5555

5656
/**
@@ -84,87 +84,102 @@ public ErrorSamplesListModel[] readErrorSamplesDetailed(AbstractRootChecksContai
8484
String connectionName = checksContainerHierarchyId.getConnectionName();
8585
PhysicalTableName physicalTableName = checksContainerHierarchyId.getPhysicalTableName();
8686

87-
Table errorSamplesTable = loadRecentErrorSamples(loadParameters, connectionName, physicalTableName, userDomainIdentity);
88-
if (errorSamplesTable == null || errorSamplesTable.isEmpty()) {
87+
ErrorSamplesSnapshot samplesSnapshot = loadRecentErrorSamples(loadParameters, connectionName, physicalTableName, userDomainIdentity);
88+
Map<ParquetPartitionId, LoadedMonthlyPartition> loadedMonthlyPartitions = samplesSnapshot.getLoadedMonthlyPartitions();
89+
if (loadedMonthlyPartitions == null || loadedMonthlyPartitions.isEmpty()) {
8990
return new ErrorSamplesListModel[0]; // empty array
9091
}
9192

92-
Table filteredTable = filterTableToRootChecksContainerAndFilterParameters(rootChecksContainerSpec, errorSamplesTable, loadParameters);
93-
if (filteredTable.isEmpty()) {
94-
return new ErrorSamplesListModel[0]; // empty array
95-
}
96-
97-
Table filteredTableByDataGroup = filteredTable;
98-
if (!Strings.isNullOrEmpty(loadParameters.getDataGroupName())) {
99-
TextColumn dataGroupNameFilteredColumn = filteredTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
100-
filteredTableByDataGroup = filteredTable.where(dataGroupNameFilteredColumn.isEqualTo(loadParameters.getDataGroupName()));
101-
}
102-
103-
if (filteredTableByDataGroup.isEmpty()) {
104-
return new ErrorSamplesListModel[0]; // empty array
105-
}
93+
ArrayList<LoadedMonthlyPartition> loadedPartitions = new ArrayList<>(loadedMonthlyPartitions.values());
94+
Lists.reverse(loadedPartitions);
10695

107-
Table sortedTable = filteredTableByDataGroup.sortDescendingOn(
108-
ErrorSamplesColumnNames.EXECUTED_AT_COLUMN_NAME); // most recent execution first
96+
for (LoadedMonthlyPartition loadedMonthlyPartition : loadedPartitions) {
97+
if (loadedMonthlyPartition == null || loadedMonthlyPartition.getData() == null) {
98+
continue;
99+
}
109100

110-
LongColumn checkHashColumn = sortedTable.longColumn(ErrorSamplesColumnNames.CHECK_HASH_COLUMN_NAME);
111-
LongColumn checkHashColumnUnsorted = filteredTable.longColumn(ErrorSamplesColumnNames.CHECK_HASH_COLUMN_NAME);
112-
TextColumn allDataGroupColumnUnsorted = filteredTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
113-
TextColumn allDataGroupColumn = sortedTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
101+
Table errorSamplesTable = loadedMonthlyPartition.getData();
102+
Table filteredTable = filterTableToRootChecksContainerAndFilterParameters(rootChecksContainerSpec, errorSamplesTable, loadParameters);
103+
if (filteredTable.isEmpty()) {
104+
continue; // empty array
105+
}
114106

115-
int rowCount = sortedTable.rowCount();
116-
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
117-
Long checkHash = checkHashColumn.get(rowIndex);
118-
String dataGroupNameForCheck = allDataGroupColumn.get(rowIndex);
119-
ErrorSamplesListModel errorsListModel = errorMap.get(checkHash);
107+
Table filteredTableByDataGroup = filteredTable;
108+
if (!Strings.isNullOrEmpty(loadParameters.getDataGroupName())) {
109+
TextColumn dataGroupNameFilteredColumn = filteredTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
110+
filteredTableByDataGroup = filteredTable.where(dataGroupNameFilteredColumn.isEqualTo(loadParameters.getDataGroupName()));
111+
}
120112

121-
ErrorSampleEntryModel singleModel = null;
113+
if (filteredTableByDataGroup.isEmpty()) {
114+
continue; // empty array
115+
}
122116

123-
if (errorsListModel != null) {
124-
if (errorsListModel.getErrorSamplesEntries().size() >= loadParameters.getMaxResultsPerCheck()) {
125-
continue; // enough results loaded
117+
Table sortedTable = filteredTableByDataGroup.sortDescendingOn(
118+
ErrorSamplesColumnNames.EXECUTED_AT_COLUMN_NAME); // most recent execution first
119+
120+
LongColumn checkHashColumn = sortedTable.longColumn(ErrorSamplesColumnNames.CHECK_HASH_COLUMN_NAME);
121+
LongColumn checkHashColumnUnsorted = filteredTable.longColumn(ErrorSamplesColumnNames.CHECK_HASH_COLUMN_NAME);
122+
TextColumn allDataGroupColumnUnsorted = filteredTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
123+
TextColumn allDataGroupColumn = sortedTable.textColumn(ErrorSamplesColumnNames.DATA_GROUP_NAME_COLUMN_NAME);
124+
125+
int rowCount = sortedTable.rowCount();
126+
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
127+
Long checkHash = checkHashColumn.get(rowIndex);
128+
String dataGroupNameForCheck = allDataGroupColumn.get(rowIndex);
129+
ErrorSamplesListModel errorsListModel = errorMap.get(checkHash);
130+
131+
ErrorSampleEntryModel singleModel = null;
132+
133+
if (errorsListModel != null) {
134+
if (errorsListModel.getErrorSamplesEntries().size() >= loadParameters.getMaxResultsPerCheck()) {
135+
continue; // enough results loaded
136+
}
137+
138+
if (!Objects.equals(dataGroupNameForCheck, errorsListModel.getDataGroup())) {
139+
continue; // we are not mixing groups, results for a different group were already loaded
140+
}
141+
} else {
142+
Row row = sortedTable.row(rowIndex);
143+
singleModel = createErrorSingleModel(row);
144+
String checkCategory = row.getString(ErrorSamplesColumnNames.CHECK_CATEGORY_COLUMN_NAME);
145+
String checkDisplayName = row.getString(ErrorSamplesColumnNames.CHECK_DISPLAY_NAME_COLUMN_NAME);
146+
String checkName = row.getString(ErrorSamplesColumnNames.CHECK_NAME_COLUMN_NAME);
147+
String checkTypeString = row.getString(ErrorSamplesColumnNames.CHECK_TYPE_COLUMN_NAME);
148+
149+
errorsListModel = new ErrorSamplesListModel();
150+
errorsListModel.setCheckCategory(checkCategory);
151+
errorsListModel.setCheckName(checkName);
152+
errorsListModel.setCheckHash(checkHash);
153+
errorsListModel.setCheckType(CheckType.fromString(checkTypeString));
154+
errorsListModel.setCheckDisplayName(checkDisplayName);
155+
errorsListModel.setDataGroup(dataGroupNameForCheck);
156+
157+
Selection resultsForCheckHash = checkHashColumnUnsorted.isIn(checkHash);
158+
List<String> dataGroupsForCheck = allDataGroupColumnUnsorted.where(resultsForCheckHash)
159+
.unique().asList().stream().sorted().collect(Collectors.toList());
160+
161+
if (dataGroupsForCheck.size() > 1 && dataGroupsForCheck.contains(CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME)) {
162+
dataGroupsForCheck.remove(CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME);
163+
dataGroupsForCheck.add(0, CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME);
164+
}
165+
166+
errorsListModel.setDataGroupsNames(dataGroupsForCheck);
167+
errorMap.put(checkHash, errorsListModel);
126168
}
127169

128-
if (!Objects.equals(dataGroupNameForCheck, errorsListModel.getDataGroup())) {
129-
continue; // we are not mixing groups, results for a different group were already loaded
130-
}
131-
} else {
132-
Row row = sortedTable.row(rowIndex);
133-
singleModel = createErrorSingleModel(row);
134-
String checkCategory = row.getString(ErrorSamplesColumnNames.CHECK_CATEGORY_COLUMN_NAME);
135-
String checkDisplayName = row.getString(ErrorSamplesColumnNames.CHECK_DISPLAY_NAME_COLUMN_NAME);
136-
String checkName = row.getString(ErrorSamplesColumnNames.CHECK_NAME_COLUMN_NAME);
137-
String checkTypeString = row.getString(ErrorSamplesColumnNames.CHECK_TYPE_COLUMN_NAME);
138-
139-
errorsListModel = new ErrorSamplesListModel();
140-
errorsListModel.setCheckCategory(checkCategory);
141-
errorsListModel.setCheckName(checkName);
142-
errorsListModel.setCheckHash(checkHash);
143-
errorsListModel.setCheckType(CheckType.fromString(checkTypeString));
144-
errorsListModel.setCheckDisplayName(checkDisplayName);
145-
errorsListModel.setDataGroup(dataGroupNameForCheck);
146-
147-
Selection resultsForCheckHash = checkHashColumnUnsorted.isIn(checkHash);
148-
List<String> dataGroupsForCheck = allDataGroupColumnUnsorted.where(resultsForCheckHash)
149-
.unique().asList().stream().sorted().collect(Collectors.toList());
150-
151-
if (dataGroupsForCheck.size() > 1 && dataGroupsForCheck.contains(CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME)) {
152-
dataGroupsForCheck.remove(CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME);
153-
dataGroupsForCheck.add(0, CommonTableNormalizationService.NO_GROUPING_DATA_GROUP_NAME);
170+
if (singleModel == null) {
171+
singleModel = createErrorSingleModel(sortedTable.row(rowIndex));
154172
}
155173

156-
errorsListModel.setDataGroupsNames(dataGroupsForCheck);
157-
errorMap.put(checkHash, errorsListModel);
174+
errorsListModel.getErrorSamplesEntries().add(singleModel);
158175
}
159176

160-
if (singleModel == null) {
161-
singleModel = createErrorSingleModel(sortedTable.row(rowIndex));
177+
if (!errorMap.isEmpty()) {
178+
return errorMap.values().toArray(ErrorSamplesListModel[]::new);
162179
}
163-
164-
errorsListModel.getErrorSamplesEntries().add(singleModel);
165180
}
166181

167-
return errorMap.values().toArray(ErrorSamplesListModel[]::new);
182+
return new ErrorSamplesListModel[0]; // no results found
168183
}
169184

170185
/**
@@ -347,10 +362,10 @@ protected Table filterTableToRootChecksContainerAndFilterParameters(AbstractRoot
347362
* @param userDomainIdentity User identity within the data domain.
348363
* @return Table with error samples for the most recent two months inside the specified range or null when no data found.
349364
*/
350-
protected Table loadRecentErrorSamples(ErrorSamplesFilterParameters loadParameters,
351-
String connectionName,
352-
PhysicalTableName physicalTableName,
353-
UserDomainIdentity userDomainIdentity) {
365+
protected ErrorSamplesSnapshot loadRecentErrorSamples(ErrorSamplesFilterParameters loadParameters,
366+
String connectionName,
367+
PhysicalTableName physicalTableName,
368+
UserDomainIdentity userDomainIdentity) {
354369
ErrorSamplesSnapshot errorSamplesSnapshot = this.errorSamplesSnapshotFactory.createReadOnlySnapshot(connectionName,
355370
physicalTableName, ErrorSamplesColumnNames.COLUMN_NAMES_FOR_READ_ONLY_ACCESS, userDomainIdentity);
356371
int maxMonthsToLoad = DEFAULT_MAX_RECENT_LOADED_MONTHS;
@@ -365,7 +380,6 @@ protected Table loadRecentErrorSamples(ErrorSamplesFilterParameters loadParamete
365380
}
366381

367382
errorSamplesSnapshot.ensureNRecentMonthsAreLoaded(loadParameters.getStartMonth(), loadParameters.getEndMonth(), maxMonthsToLoad);
368-
Table ruleResultsData = errorSamplesSnapshot.getAllData();
369-
return ruleResultsData;
383+
return errorSamplesSnapshot;
370384
}
371385
}

0 commit comments

Comments
 (0)