Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic split sizes in hive connector #22051

Merged
merged 1 commit into from
Mar 5, 2024

Conversation

pranjalssh
Copy link
Contributor

@pranjalssh pranjalssh commented Feb 29, 2024

Description

FIxes #21911

presto scheduler creates splits according to file sizes - but does not take into account if we read only selected columns from the file. In general, we should be able to tune split sizes based on amount of data we select from the files - so we can have fewer splits and presto runs faster.

Motivation and Context

Impact

Test Plan

Added unit test, gated changes behind config, and ran test queries on a cluster

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

Hive Changes
* Add session property ``hive.dynamic_split_sizes_enabled`` to use dynamic split sizes based on data selected by query.

@pranjalssh pranjalssh requested a review from a team as a code owner February 29, 2024 23:21
@pranjalssh pranjalssh force-pushed the dynamic_splits2 branch 2 times, most recently from a5ada00 to 6e52f59 Compare March 1, 2024 00:53
@pranjalssh pranjalssh requested a review from a team as a code owner March 1, 2024 00:53
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pranjalssh Thank you for working on this. What kind of speed up have you observed on production queries? It would be nice to update commit message to provide more details about this change. Is there a way to see whether this optimization kicked in and what was the 'ratio' applied after the query finished running?

@@ -126,6 +130,14 @@ private HiveSplitSource(
this.useRewindableSplitSource = useRewindableSplitSource;
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
// Clamp value within [0.1, 1.0].
// This ratio will be used to increase split sizes. The range implies
// 1) We do not increase more than 10x(>= 0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why is this limit?

It would be helpful to update commit message to provide more details about the implementation and these kinds of limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you answer this question @pranjalssh?

HiveTableHandle hiveTableHandle = new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
List<HiveColumnHandle> allColumnHandles = new ArrayList<>();
allColumnHandles.addAll(getRegularColumnHandles(table));
allColumnHandles.addAll(getPartitionKeyColumnHandles(table));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we include partitioning keys? These are not part of the file.

double totalSize = tableStatistics.getTotalSize().getValue();
double requiredSize = 0;
double rowCount = tableStatistics.getRowCount().getValue();
for (Map.Entry<ColumnHandle, ColumnStatistics> entry : tableStatistics.getColumnStatistics().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we loop over all columns (can be thousands), but process only a set of readColumnHandles columns (can be a handful). Would it make sense to change this to loop over readColumnHandles instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to only query for read columns. I previously incorrectly assumed totalSize for summed for just columns provided

if (readColumnHandles.contains(entry.getKey())) {
double value = entry.getValue().getDataSize().getValue();
// We do not compute total size stats for fixed width types, so count them manually.
if (!isFinite(value) && isFinite(rowCount)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFinite(rowCount)

Can we move this check before the loop?

for (Map.Entry<ColumnHandle, ColumnStatistics> entry : tableStatistics.getColumnStatistics().entrySet()) {
if (readColumnHandles.contains(entry.getKey())) {
double value = entry.getValue().getDataSize().getValue();
// We do not compute total size stats for fixed width types, so count them manually.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean 'totalSize' doesn't include fixed-width columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, its just missing in column stats. Refactored

TestHiveEventListenerPlugin.TestingHiveEventListener eventListener = getEventListener();

// Wait for previous events to finish
Thread.sleep(2000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid explicit sleep calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that event manager is synchronous in these tests, so we can just remove all sleep calls

@kaikalur
Copy link
Contributor

kaikalur commented Mar 1, 2024

I have seen 90% reduction in number of splits for some queries with this option (380K vs 3M)! And corresponding reduction in latency. so this can help sometimes especially in things like count( * ) queries

@steveburnett
Copy link
Contributor

@pranjalssh Would you document the new configuration property?

https://prestodb.io/docs/current/connector/hive.html#hive-configuration-properties

CC: @steveburnett

@pranjalssh, when you can, please add documentation as suggested by @mbasmanova and recommended in the Documentation topic of the Review and Commit Guidelines.

When you do, you can use the request review feature to tag me so I'll be notified and able to respond in a timely way and not delay the PR. Thanks!

@pranjalssh pranjalssh force-pushed the dynamic_splits2 branch 3 times, most recently from a47125b to 99d773e Compare March 4, 2024 19:52
@pranjalssh
Copy link
Contributor Author

Addressed comments @steveburnett @mbasmanova

Copy link

github-actions bot commented Mar 4, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff ece7741...ce14167.

Notify File(s)
@steveburnett presto-docs/src/main/sphinx/connector/hive.rst

steveburnett
steveburnett previously approved these changes Mar 4, 2024
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local build, everything looks good.

if (!isDynamicSplitSizesEnabled(session)) {
return ratio;
}
HiveTableHandle hiveTableHandle = new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the function mergeRequestedAndPredicateColumns(). It will also handle if the same struct columns appear in both sets with different subfields.

hiveSplitLoader.start(splitSource);

return splitSource;
}

private double getSplitScanRatio(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what splitScanRatio means? The name is confusing to me. It looks like it's the proportion of bytes we expect to read based on column stats of columns we actually use compared to the total data size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


TableStatistics tableStatistics = metadata.getHiveStatisticsProvider().getTableStatistics(session, tableName, readColumns, readColumnTypes, partitions);
double totalSize = tableStatistics.getTotalSize().getValue();
double requiredSize = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does requiredSize mean? it looks like we're getting the total size of all columns we're selecting. What's "required" about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to readSize

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@@ -126,6 +130,14 @@ private HiveSplitSource(
this.useRewindableSplitSource = useRewindableSplitSource;
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
// Clamp value within [0.1, 1.0].
// This ratio will be used to increase split sizes. The range implies
// 1) We do not increase more than 10x(>= 0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you answer this question @pranjalssh?

If data scanned by query is smaller than total size of files, we use larger
splits than configured by Hive. We schedule upto 10x larger splits - which is
a very generous limit - and being conservative not to schedule splits with too
many rows
@pranjalssh
Copy link
Contributor Author

pranjalssh commented Mar 5, 2024

@rschlussel updated comment

        // We schedule only upto 10x larger splits - being conservative not to schedule splits with too many rows.
        // For default size of 64MB, this will keep split sizes sent within 1GB. Usually files are smaller than this.

@pranjalssh pranjalssh merged commit 96187ad into prestodb:master Mar 5, 2024
57 checks passed
@wanglinsong wanglinsong mentioned this pull request May 1, 2024
48 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Presto Hive Connector creates too many small splits
5 participants