-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29248][SQL] Pass in number of partitions to WriteBuilder #25945
Closed
Closed
Changes from 2 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
280986c
[SPARK-29248][SQL] Pass in number of partitions to WriteBuilder
edrevo 11c6dd8
PR feedback
edrevo c6b85f9
Merge branch 'master' of https://github.com/apache/spark into add-par…
edrevo f3dba5e
moving away from withNumPartitions
edrevo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,6 +98,9 @@ class InMemoryTable( | |
|
||
new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite { | ||
private var writer: BatchWrite = Append | ||
private var numPartitionsProvided = false | ||
private var queryIdProvided = false | ||
private var inputDataSchemaProvided = false | ||
|
||
override def truncate(): WriteBuilder = { | ||
assert(writer == Append) | ||
|
@@ -117,7 +120,36 @@ class InMemoryTable( | |
this | ||
} | ||
|
||
override def buildForBatch(): BatchWrite = writer | ||
override def withNumPartitions(numPartitions: Int): WriteBuilder = { | ||
assert(!numPartitionsProvided, "numPartitions provided twice") | ||
numPartitionsProvided = true | ||
this | ||
} | ||
|
||
override def withQueryId(queryId: String): WriteBuilder = { | ||
assert(!queryIdProvided, "queryId provided twice") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. later on can we continue the work in #25990 ? It still has value to give stronger guarantees. So that implementations don't need to do check like this. |
||
queryIdProvided = true | ||
this | ||
} | ||
|
||
override def withInputDataSchema(schema: StructType): WriteBuilder = { | ||
assert(!queryIdProvided, "schema provided twice") | ||
inputDataSchemaProvided = true | ||
this | ||
} | ||
|
||
override def buildForBatch(): BatchWrite = { | ||
assert( | ||
inputDataSchemaProvided, | ||
"Input data schema wasn't provided before calling buildForBatch") | ||
assert( | ||
queryIdProvided, | ||
"Query id wasn't provided before calling buildForBatch") | ||
assert( | ||
numPartitionsProvided, | ||
"Number of partitions schema wasn't provided before calling buildForBatch") | ||
writer | ||
} | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with the approach here, but just want to share a few thoughts about how to make the API better. The use case is: there are some additional information (input schema, numPartition, etc.) that Spark should always provide, and the implementation only need to write extra code if they need to access the additional information.
With the current API, we can:
withNumPartitions
and other methods if they need to access the additional information.But there is one drawback: we need to take extra effort to make sure the additional information is provided by Spark. It's better to guarantee this at compile time.
I think we can improve this API a little bit. For
Table#newWriteBuilder
, we can define it asWhile
WriteInfo
is an interface providing additional information:The
WriteInfo
is implemented by Spark and called by data source implementations, so we can add more methods in future versions without breaking backward compatibility. TheWriteInfo
can also make sure Spark always provide additional information at compile time.If you guys think it makes sense, we can do it in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that the WriteInfo approach has better compile time guarantees. I actually started implementing the change like that, but then felt it was maybe too much of a change and that I should focus on the numPartitions.
I'm happy to change it in a followup PR, if that works for everyone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to take this approach, then let's do it now before a release. Otherwise we should use the original implementation to add an additional method because that is a compatible change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edrevo can you implement this approach here? I think adding a
numPartitions
is really a small change, we can set the main focus of this PR to improve this API.