-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SQL] Rewrite join implementation to allow streaming of one relation. #250
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
One or more automated tests failed |
Merged build triggered. One or more automated tests failed |
Merged build started. One or more automated tests failed |
/** Returns true if there are any NULL values in this row. */ | ||
def anyNull: Boolean = { | ||
var i = 0 | ||
while(i < length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after while and if...
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build triggered. Build is starting -or- tests failed to complete. |
Merged build started. Build is starting -or- tests failed to complete. |
Hey @rxin, thanks for looking this over. I added to TODOs for using Spark's collections, but did not make these changes. |
lgtm if travis or jenkins (whatever we are using nowadays ...) is happy |
Merged build triggered. Build is starting -or- tests failed to complete. |
Merged build started. Build is starting -or- tests failed to complete. |
Merged build finished. Build is starting -or- tests failed to complete. |
Build is starting -or- tests failed to complete. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
thanks. merged. |
README incorrectly suggests build sources spark-env.sh This is misleading because the build doesn't source that file. IMO it's better to force people to specify build environment variables on the command line always, like we do in every example, so I'm just removing this doc. (cherry picked from commit d2efe13) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Before we were materializing everything in memory. This also uses the projection interface so will be easier to plug in code gen (its ported from that branch). @rxin @liancheng Author: Michael Armbrust <michael@databricks.com> Closes apache#250 from marmbrus/hashJoin and squashes the following commits: 1ad873e [Michael Armbrust] Change hasNext logic back to the correct version. 8e6f2a2 [Michael Armbrust] Review comments. 1e9fb63 [Michael Armbrust] style bc0cb84 [Michael Armbrust] Rewrite join implementation to allow streaming of one relation.
## What changes were proposed in this pull request? A configuration parameter spark.databricks.debug.taskKiller.minOutputRows is added. It sets the minimum required number of records that need to be produced at some point in task execution, before the task can be terminated by DatabricksTaskDebugListener. ## How was this patch tested? Adds unit tests. Author: Ala Luszczak <ala@databricks.com> Closes apache#250 from ala/min-output-rows.
…is reused ## What changes were proposed in this pull request? With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` ## How was this patch tested? Modified the existing test. Closes #24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…is reused With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())apache#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())apache#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` Modified the existing test. Closes apache#24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Before we were materializing everything in memory. This also uses the projection interface so will be easier to plug in code gen (its ported from that branch).
@rxin @liancheng