Skip to content

Commit

Permalink
Minor refactoring of HivePageSourceProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
sdruzkin committed Mar 15, 2024
1 parent ee2e5c4 commit 0dd8d82
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class HivePageSourceProvider
private final Set<HiveRecordCursorProvider> cursorProviders;
private final Set<HiveBatchPageSourceFactory> pageSourceFactories;
private final Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories;
private Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories;
private final Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories;
private final TypeManager typeManager;
private final RowExpressionService rowExpressionService;
private final LoadingCache<RowExpressionCacheKey, RowExpression> optimizedRowExpressionCache;
Expand Down Expand Up @@ -255,16 +255,17 @@ private ConnectorPageSource createAggregatedPageSource(
HiveFileContext fileContext,
Optional<EncryptionInformation> encryptionInformation)
{
for (HiveAggregatedPageSourceFactory pageSourceFactory : aggregatedPageSourceFactories) {
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionKeys(),
selectedColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getFileSplit(),
hiveSplit.getTableBucketNumber());
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionKeys(),
selectedColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getFileSplit(),
hiveSplit.getTableBucketNumber());

List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);

List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);
for (HiveAggregatedPageSourceFactory pageSourceFactory : aggregatedPageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
Expand Down Expand Up @@ -364,6 +365,10 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
return Optional.of(new HiveEmptySplitPageSource());
}

TupleDomain<Subfield> domainPredicate = splitContext.getDynamicFilterPredicate()
.map(filter -> filter.transform(handle -> new Subfield(((HiveColumnHandle) handle).getName())).intersect(layout.getDomainPredicate()))
.orElse(layout.getDomainPredicate());

for (HiveSelectivePageSourceFactory pageSourceFactory : selectivePageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
Expand All @@ -375,8 +380,7 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
coercers,
bucketAdaptation,
outputColumns,
splitContext.getDynamicFilterPredicate().map(filter -> filter.transform(
handle -> new Subfield(((HiveColumnHandle) handle).getName())).intersect(layout.getDomainPredicate())).orElse(layout.getDomainPredicate()),
domainPredicate,
optimizedRemainingPredicate,
hiveStorageTimeZone,
fileContext,
Expand Down Expand Up @@ -459,7 +463,6 @@ public static Optional<ConnectorPageSource> createHivePageSource(
fileSplit,
storage,
effectivePredicate,
hiveColumns,
hiveStorageTimeZone,
typeManager,
tableName,
Expand Down Expand Up @@ -522,7 +525,6 @@ public static Optional<ConnectorPageSource> createHivePageSource(
fileSplit,
storage,
effectivePredicate,
hiveColumns,
hiveStorageTimeZone,
typeManager,
tableName,
Expand All @@ -549,7 +551,6 @@ private static Optional<ConnectorPageSource> getPageSourceFromCursorProvider(
HiveFileSplit fileSplit,
Storage storage,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> hiveColumns,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
SchemaTableName tableName,
Expand Down Expand Up @@ -585,8 +586,8 @@ private static Optional<ConnectorPageSource> getPageSourceFromCursorProvider(
tableParameters,
tableName.getSchemaName(),
tableName.getTableName(),
partitionKeyColumnHandles.stream().map(column -> column.getName()).collect(toImmutableList()),
partitionKeyColumnHandles.stream().map(column -> column.getHiveType()).collect(toImmutableList()));
partitionKeyColumnHandles.stream().map(BaseHiveColumnHandle::getName).collect(toImmutableList()),
partitionKeyColumnHandles.stream().map(HiveColumnHandle::getHiveType).collect(toImmutableList()));

Optional<RecordCursor> cursor = provider.createRecordCursor(
configuration,
Expand Down

0 comments on commit 0dd8d82

Please sign in to comment.