Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOT A BUG] Why comet does not convert the HashAggregate expression to native in my query? #503

Closed
SemyonSinchenko opened this issue Jun 3, 2024 · 5 comments · Fixed by #512
Labels
bug Something isn't working

Comments

@SemyonSinchenko
Copy link
Member

SemyonSinchenko commented Jun 3, 2024

Describe the bug

I'm running a query that do the following:

  1. Read parquet files
  2. Generate a lot of case-when columns
  3. Run groupBy + agg on top of that columns (sum, min, max, mean)

I have the following logical plan (I manually truncated some parts):

== Parsed Logical Plan ==
'Aggregate ['customer_id], ['customer_id, sum('DC_food-and-household_7d_flag) AS DC_food-and-household_7d_count#18, ..., ..., ... 2057 more fields]
+- Project [customer_id AS customer_id#5422, CASE WHEN (((true AND (t_minus#5L <= cast(7 as bigint))) AND (card_type#1 = DC)) AND (trx_type#2 = food-and-household)) THEN 1 ELSE 0 END AS DC_food-and-household_7d_flag#14, ..., ... 1225 more fields]
   +- Relation [customer_id#0L,card_type#1,trx_type#2,channel#3,trx_amnt#4,t_minus#5L,part_col#6] parquet

It is converted to the following Comet plan:

== Physical Plan ==
HashAggregate(keys=[customer_id#10003], functions=[sum(DC_food-and-household_7d_flag#14), ..., ... 2057 more fields])
+- Exchange hashpartitioning(customer_id#10003, 11), ENSURE_REQUIREMENTS, [plan_id=35]
   +- ColumnarToRow
      +- CometHashAggregate [DC_food-and-household_7d_flag#14, DC_food-and-household_7d_or_none#15, ..., ... 2056 more fields]
         +- CometProject [DC_food-and-household_7d_flag#14, DC_food-and-household_7d_or_none#15, ..., ... 1224 more fields], [CASE WHEN (((t_minus#5L <= 7) AND (card_type#1 = DC)) AND (trx_type#2 = food-and-household)) THEN 1 ELSE 0 END AS DC_food-and-household_7d_flag#14, ..., ... 1224 more fields]
            +- CometScan parquet [...] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...>

Visualization:
image

Steps to reproduce

I'm running my own benchmark:

  1. Generation of the dataset (link to github): generator --prefix test_data_tiny;
  2. PySpark code (link to github);
  3. Entry point (link to github)

Expected behavior

I expected to see full native plan, but for some reason the last HashAggregate is running on Spark. It looks to me that it is running even in "spark interpreter mode" (I guess because I want too much aggregations and it exceed the limit of the code size for the "Whole stage CodeGet" but I'm not 100% sure).

I checked the documentation of the Comet project and it looks like case-when expressions, sum/min/max/mean expressions are supported. HashAggregate is supported too. Exchange should be supported too because I turned on Comet shuffle (--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager, --conf spark.comet.exec.shuffle.enabled=true, --conf spark.comet.exec.shuffle.mode=native).

Why if partial aggregation is in Comet the final one isn't and I have a ColumnarToRow instead?

Additional context

I'm ready to provide any additional information or to run any debug query.

Thanks in advance!

@SemyonSinchenko SemyonSinchenko added the bug Something isn't working label Jun 3, 2024
@viirya
Copy link
Member

viirya commented Jun 3, 2024

Could you run a simple query to verify if Comet shuffle can be triggered?

@viirya
Copy link
Member

viirya commented Jun 3, 2024

Oh, could you disable spark.sql.adaptive.coalescePartitions.enabled and retry? Comet shuffle will be disabled if spark.sql.adaptive.coalescePartitions.enabled is enabled. Although you disable AQE, but this config is still enabled by default (i.e, it won't be set to false even you disable AQE).

@SemyonSinchenko
Copy link
Member Author

Wow! With a disabled spark.sql.adaptive.coalescePartitions.enabled it works! May I open a PR with updates to documentation? Looks like I need to update this page (source)

@viirya
Copy link
Member

viirya commented Jun 3, 2024

Yea, you can open a PR to update the document. Although it should be just a temporary limit and we are working on to remove it. We can update the document again once the limitation is removed.

@andygrove
Copy link
Member

You may also want to set spark.comet.explainFallback.enabled=true so that you can see the reasons why parts of your query are not native (this would show up as logging in the driver log).

I wonder if we should default this to true.

kazuyukitanimura pushed a commit that referenced this issue Jun 5, 2024
## Which issue does this PR close?
Closes #503
Closes #191 

## Rationale for this change

1. Provide a way to build Comet from the source on an isolated environments with an access to github.com
2. Update documentation in part, related to compatibility of Spark AQE and Comet Shuffle

## What changes are included in this PR?

- Update tuning section about the compatibility of Shuffle and Spark AQE
- Add `release-nogit` for building on an isolated environments
- Update docs in the section about an installation process


 Changes to be committed:
	modified:   Makefile
	modified:   docs/source/user-guide/installation.md
	modified:   docs/source/user-guide/tuning.md

## How are these changes tested?

I run both `make release` and `make release-nogit`. The first one created properties file in `common/target/classes` but the second did not. The flag `-Dmaven.gitcommitid.skip=true` is described in [this comment](git-commit-id/git-commit-id-maven-plugin#392 (comment)).
himadripal pushed a commit to himadripal/datafusion-comet that referenced this issue Sep 7, 2024
## Which issue does this PR close?
Closes apache#503
Closes apache#191 

## Rationale for this change

1. Provide a way to build Comet from the source on an isolated environments with an access to github.com
2. Update documentation in part, related to compatibility of Spark AQE and Comet Shuffle

## What changes are included in this PR?

- Update tuning section about the compatibility of Shuffle and Spark AQE
- Add `release-nogit` for building on an isolated environments
- Update docs in the section about an installation process


 Changes to be committed:
	modified:   Makefile
	modified:   docs/source/user-guide/installation.md
	modified:   docs/source/user-guide/tuning.md

## How are these changes tested?

I run both `make release` and `make release-nogit`. The first one created properties file in `common/target/classes` but the second did not. The flag `-Dmaven.gitcommitid.skip=true` is described in [this comment](git-commit-id/git-commit-id-maven-plugin#392 (comment)).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants