Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic split sizes in hive connector #22051

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ Property Name Description

``hive.max-partitions-per-scan`` Maximum number of partitions for a single table scan. 100,000

``hive.dynamic-split-sizes-enabled`` Enable dynamic sizing of splits based on data scanned by ``false``
the query.

``hive.metastore.authentication.type`` Hive metastore authentication type. ``NONE``
Possible values are ``NONE`` or ``KERBEROS``.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public class HiveClientConfig

private boolean sizeBasedSplitWeightsEnabled = true;
private double minimumAssignedSplitWeight = 0.05;
private boolean dynamicSplitSizesEnabled;

private boolean userDefinedTypeEncodingEnabled;

Expand Down Expand Up @@ -1559,6 +1560,18 @@ public boolean isSizeBasedSplitWeightsEnabled()
return sizeBasedSplitWeightsEnabled;
}

@Config("hive.dynamic-split-sizes-enabled")
public HiveClientConfig setDynamicSplitSizesEnabled(boolean dynamicSplitSizesEnabled)
{
this.dynamicSplitSizesEnabled = dynamicSplitSizesEnabled;
return this;
}

public boolean isDynamicSplitSizesEnabled()
{
return dynamicSplitSizesEnabled;
}

@Config("hive.minimum-assigned-split-weight")
@ConfigDescription("Minimum weight that a split can be assigned when size based split weights are enabled")
public HiveClientConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ public SemiTransactionalHiveMetastore getMetastore()
return metastore;
}

@Override
public HiveStatisticsProvider getHiveStatisticsProvider()
{
return hiveStatisticsProvider;
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class HiveSessionProperties
public static final String QUICK_STATS_ENABLED = "quick_stats_enabled";
public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout";
public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout";
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -558,6 +559,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Enable estimating split weights based on size in bytes",
hiveClientConfig.isSizeBasedSplitWeightsEnabled(),
false),
booleanProperty(
DYNAMIC_SPLIT_SIZES_ENABLED,
"Enable dynamic sizing of splits based on column statistics",
hiveClientConfig.isDynamicSplitSizesEnabled(),
false),
new PropertyMetadata<>(
MINIMUM_ASSIGNED_SPLIT_WEIGHT,
"Minimum assigned split weight when size based split weighting is enabled",
Expand Down Expand Up @@ -1032,6 +1038,11 @@ public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session)
return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class);
}

public static boolean isDynamicSplitSizesEnabled(ConnectorSession session)
{
return session.getProperty(DYNAMIC_SPLIT_SIZES_ENABLED, Boolean.class);
}

public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.common.predicate.SortedRangeSet;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter;
import com.facebook.presto.hive.metastore.Column;
Expand All @@ -46,6 +47,8 @@
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -89,6 +92,7 @@
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HiveSessionProperties.getHiveMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getLeaseDuration;
import static com.facebook.presto.hive.HiveSessionProperties.isDynamicSplitSizesEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPartitionStatisticsBasedOptimizationEnabled;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
Expand All @@ -113,11 +117,14 @@
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Iterables.transform;
import static java.lang.Double.isFinite;
import static java.lang.Float.floatToIntBits;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.reducing;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;
Expand Down Expand Up @@ -281,6 +288,8 @@ public ConnectorSplitSource getSplits(
layout.getPredicateColumns(),
layout.getDomainPredicate().getDomains());

double ratio = getSplitScanRatio(session, tableName, layout, metadata, partitions);

HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
hivePartitions,
Expand All @@ -296,16 +305,63 @@ public ConnectorSplitSource getSplits(
splitSchedulingContext.schedulerUsesHostAddresses(),
layout.isPartialAggregationsPushedDown());

HiveSplitSource splitSource = computeSplitSource(splitSchedulingContext, table, session, hiveSplitLoader);
HiveSplitSource splitSource = computeSplitSource(splitSchedulingContext, table, session, hiveSplitLoader, ratio);
hiveSplitLoader.start(splitSource);

return splitSource;
}

// Get estimated split scan ratio, which is data read by operator / data existing in split.
// This ratio is used to increase split sizes if data read by operator is too small.
private double getSplitScanRatio(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what splitScanRatio means? The name is confusing to me. It looks like it's the proportion of bytes we expect to read based on column stats of columns we actually use compared to the total data size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ConnectorSession session,
SchemaTableName tableName,
HiveTableLayoutHandle layout,
TransactionalMetadata metadata,
List<HivePartition> partitions)
{
if (!isDynamicSplitSizesEnabled(session)) {
return 1.0;
}
HiveTableHandle hiveTableHandle = new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the function mergeRequestedAndPredicateColumns(). It will also handle if the same struct columns appear in both sets with different subfields.


Set<HiveColumnHandle> readColumnHandles = mergeRequestedAndPredicateColumns(
layout.getRequestedColumns(),
layout.getPredicateColumns().values().stream().collect(toImmutableSet())
).orElseGet(ImmutableSet::of);

Map<String, Type> readColumnTypes = readColumnHandles.stream().collect(toImmutableMap(HiveColumnHandle::getName, handle -> metadata.getColumnMetadata(session, hiveTableHandle, handle).getType()));
Map<String, ColumnHandle> readColumns = readColumnHandles.stream().collect(toImmutableMap(HiveColumnHandle::getName, identity()));

TableStatistics tableStatistics = metadata.getHiveStatisticsProvider().getTableStatistics(session, tableName, readColumns, readColumnTypes, partitions);
double totalSize = tableStatistics.getTotalSize().getValue();
double readSize = 0;
double rowCount = tableStatistics.getRowCount().getValue();
for (Map.Entry<ColumnHandle, ColumnStatistics> entry : tableStatistics.getColumnStatistics().entrySet()) {
double value = entry.getValue().getDataSize().getValue();
// We do not compute column stats for fixed width types, so count them manually.
if (!isFinite(value) && isFinite(rowCount)) {
HiveColumnHandle columnHandle = (HiveColumnHandle) entry.getKey();
Type type = metadata.getColumnMetadata(session, hiveTableHandle, columnHandle).getType();
if (type instanceof FixedWidthType) {
int size = ((FixedWidthType) type).getFixedSize();
value = size * rowCount;
}
}
readSize += value;
}

if (totalSize > 0 && isFinite(totalSize) && isFinite(readSize)) {
return readSize / totalSize;
}
return 1.0;
}

private HiveSplitSource computeSplitSource(SplitSchedulingContext splitSchedulingContext,
Table table,
ConnectorSession session,
HiveSplitLoader hiveSplitLoader)
HiveSplitLoader hiveSplitLoader,
double splitScanRatio)
{
HiveSplitSource splitSource;
CacheQuotaRequirement cacheQuotaRequirement = cacheQuotaRequirementProvider.getCacheQuotaRequirement(table.getDatabaseName(), table.getTableName());
Expand All @@ -321,7 +377,8 @@ private HiveSplitSource computeSplitSource(SplitSchedulingContext splitSchedulin
maxOutstandingSplitsSize,
hiveSplitLoader,
executor,
new CounterStat());
new CounterStat(),
splitScanRatio);
break;
case GROUPED_SCHEDULING:
splitSource = HiveSplitSource.bucketed(
Expand All @@ -334,7 +391,8 @@ private HiveSplitSource computeSplitSource(SplitSchedulingContext splitSchedulin
maxOutstandingSplitsSize,
hiveSplitLoader,
executor,
new CounterStat());
new CounterStat(),
splitScanRatio);
break;
case REWINDABLE_GROUPED_SCHEDULING:
splitSource = HiveSplitSource.bucketedRewindable(
Expand All @@ -346,7 +404,8 @@ private HiveSplitSource computeSplitSource(SplitSchedulingContext splitSchedulin
maxOutstandingSplitsSize,
hiveSplitLoader,
executor,
new CounterStat());
new CounterStat(),
splitScanRatio);
break;
default:
throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingContext.getSplitSchedulingStrategy());
Expand Down Expand Up @@ -909,7 +968,7 @@ static Optional<Set<HiveColumnHandle>> mergeRequestedAndPredicateColumns(Optiona
static boolean isBucketCountCompatible(int tableBucketCount, int partitionBucketCount)
{
checkArgument(tableBucketCount > 0 && partitionBucketCount > 0);
int larger = Math.max(tableBucketCount, partitionBucketCount);
int larger = max(tableBucketCount, partitionBucketCount);
int smaller = Math.min(tableBucketCount, partitionBucketCount);
if (larger % smaller != 0) {
// must be evenly divisible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -98,6 +100,7 @@ class HiveSplitSource
private final CounterStat highMemorySplitSourceCounter;
private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();
private final HiveSplitWeightProvider splitWeightProvider;
private final double splitScanRatio;

private HiveSplitSource(
ConnectorSession session,
Expand All @@ -109,7 +112,8 @@ private HiveSplitSource(
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
CounterStat highMemorySplitSourceCounter,
boolean useRewindableSplitSource)
boolean useRewindableSplitSource,
double splitScanRatio)
{
requireNonNull(session, "session is null");
this.queryId = session.getQueryId();
Expand All @@ -126,6 +130,16 @@ private HiveSplitSource(
this.useRewindableSplitSource = useRewindableSplitSource;
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
// Clamp value within [0.1, 1.0].
// This ratio will be used to increase split sizes. The range implies
// 1) We do not increase more than 10x(>= 0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why is this limit?

It would be helpful to update commit message to provide more details about the implementation and these kinds of limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you answer this question @pranjalssh?

// 2) We do not decrease split sizes(<= 1.0)
// We schedule only upto 10x larger splits - being conservative not to schedule splits with too many rows.
// For default size of 64MB, this will keep split sizes sent within 1GB. Usually files are smaller than this.
if (!Double.isFinite(splitScanRatio)) {
splitScanRatio = 1.0;
}
this.splitScanRatio = max(min(splitScanRatio, 1.0), 0.1);
}

public static HiveSplitSource allAtOnce(
Expand All @@ -138,7 +152,8 @@ public static HiveSplitSource allAtOnce(
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
Executor executor,
CounterStat highMemorySplitSourceCounter)
CounterStat highMemorySplitSourceCounter,
double splitScanRatio)
{
return new HiveSplitSource(
session,
Expand Down Expand Up @@ -186,7 +201,8 @@ public int rewind(OptionalInt bucketNumber)
maxOutstandingSplitsSize,
splitLoader,
highMemorySplitSourceCounter,
false);
false,
splitScanRatio);
}

public static HiveSplitSource bucketed(
Expand All @@ -199,7 +215,8 @@ public static HiveSplitSource bucketed(
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
Executor executor,
CounterStat highMemorySplitSourceCounter)
CounterStat highMemorySplitSourceCounter,
double splitScanRatio)
{
return new HiveSplitSource(
session,
Expand Down Expand Up @@ -267,7 +284,8 @@ private AsyncQueue<InternalHiveSplit> queueFor(OptionalInt bucketNumber)
maxOutstandingSplitsSize,
splitLoader,
highMemorySplitSourceCounter,
false);
false,
splitScanRatio);
}

public static HiveSplitSource bucketedRewindable(
Expand All @@ -279,7 +297,8 @@ public static HiveSplitSource bucketedRewindable(
DataSize maxOutstandingSplitsSize,
HiveSplitLoader splitLoader,
Executor executor,
CounterStat highMemorySplitSourceCounter)
CounterStat highMemorySplitSourceCounter,
double splitScanRatio)
{
return new HiveSplitSource(
session,
Expand Down Expand Up @@ -359,7 +378,8 @@ public int decrementAndGetPartitionReferences(InternalHiveSplit split)
maxOutstandingSplitsSize,
splitLoader,
highMemorySplitSourceCounter,
true);
true,
splitScanRatio);
}

/**
Expand Down Expand Up @@ -469,6 +489,8 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
maxSplitBytes = maxInitialSplitSize.toBytes();
}
}
// Increase split size if scanned bytes per split are expected to be less.
maxSplitBytes = (long) (maxSplitBytes / splitScanRatio);
InternalHiveBlock block = internalSplit.currentBlock();
long splitBytes;
if (internalSplit.isSplittable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;

Expand All @@ -25,4 +26,6 @@ public interface TransactionalMetadata
void rollback();

SemiTransactionalHiveMetastore getMetastore();

HiveStatisticsProvider getHiveStatisticsProvider();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -242,11 +243,12 @@ public static DistributedQueryRunner createQueryRunner(
.put("hive.compression-codec", "NONE")
.build();

Map<String, String> hiveBucketedProperties = ImmutableMap.<String, String>builder()
.putAll(storageProperties)
.put("hive.max-initial-split-size", "10kB") // so that each bucket has multiple splits
.put("hive.max-split-size", "10kB") // so that each bucket has multiple splits
.build();
Map<String, String> hiveBucketedProperties = new HashMap<>();
hiveBucketedProperties.putAll(storageProperties);
hiveBucketedProperties.put("hive.max-initial-split-size", "10kB"); // so that each bucket has multiple splits
hiveBucketedProperties.put("hive.max-split-size", "10kB"); // so that each bucket has multiple splits
hiveBucketedProperties = ImmutableMap.copyOf(hiveBucketedProperties);

queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties);
queryRunner.createCatalog(HIVE_BUCKETED_CATALOG, HIVE_CATALOG, hiveBucketedProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ private static HiveSplitSource hiveSplitSource(BackgroundHiveSplitLoader backgro
new DataSize(32, MEGABYTE),
backgroundHiveSplitLoader,
EXECUTOR,
new CounterStat());
new CounterStat(),
1);
}

private static Table table(
Expand Down
Loading
Loading