Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29248][SQL] Pass in number of partitions to WriteBuilder #25945

Closed
wants to merge 4 commits into from

Conversation

edrevo
Copy link
Contributor

@edrevo edrevo commented Sep 26, 2019

What changes were proposed in this pull request?

When implementing a ScanBuilder, we require the implementor to provide the schema of the data and the number of partitions.

However, when someone is implementing WriteBuilder we only pass them the schema, but not the number of partitions. This is an asymetrical developer experience.

Why are the changes needed?

Passing in the number of partitions on the WriteBuilder would enable data sources to provision their write targets before starting to write. For example:

  • it could be used to provision a Kafka topic with a specific number of partitions
  • it could be used to scale a microservice prior to sending the data to it
  • it could be used to create a DsV2 that sends the data to another spark cluster (currently not possible since the reader wouldn't be able to know the number of partitions)

Does this PR introduce any user-facing change?

No

How was this patch tested?

I ran the test, but I am getting an OOM error so I haven't been able to run the full suite.

@HeartSaVioR
Copy link
Contributor

cc. @rdblue @cloud-fan as it proposes a change to DSv2 API

@rdblue
Copy link
Contributor

rdblue commented Sep 26, 2019

The use case seems reasonable to me, as does the approach of adding the number of partitions with a method that is defaulted.

I'd like to make sure that all code paths call this method in tests. Could you update the InMemoryTable test class so that it throws an exception if this is not called before a write operation commits? That will ensure in tests that all code paths that commit to a table call this correctly.

* @return a new builder with the `schema`. By default it returns `this`, which means the given
* `numPartitions` is ignored. Please override this method to take the `numPartitions`.
*/
default WriteBuilder withNumPartitions(int numPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with the approach here, but just want to share a few thoughts about how to make the API better. The use case is: there are some additional information (input schema, numPartition, etc.) that Spark should always provide, and the implementation only need to write extra code if they need to access the additional information.

With the current API, we can:

  1. add more additional information in future versions without breaking backward compatibility.
  2. users only need to overwrite withNumPartitions and other methods if they need to access the additional information.

But there is one drawback: we need to take extra effort to make sure the additional information is provided by Spark. It's better to guarantee this at compile time.

I think we can improve this API a little bit. For Table#newWriteBuilder, we can define it as

WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options, WriteInfo info);

While WriteInfo is an interface providing additional information:

interface WriteInfo {
  String queryId();
  StructType inputDataSchema();
  ...
}

The WriteInfo is implemented by Spark and called by data source implementations, so we can add more methods in future versions without breaking backward compatibility. The WriteInfo can also make sure Spark always provide additional information at compile time.

If you guys think it makes sense, we can do it in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that the WriteInfo approach has better compile time guarantees. I actually started implementing the change like that, but then felt it was maybe too much of a change and that I should focus on the numPartitions.

I'm happy to change it in a followup PR, if that works for everyone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to take this approach, then let's do it now before a release. Otherwise we should use the original implementation to add an additional method because that is a compatible change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edrevo can you implement this approach here? I think adding a numPartitions is really a small change, we can set the main focus of this PR to improve this API.

@edrevo
Copy link
Contributor Author

edrevo commented Sep 27, 2019

@rdblue, I have modified the tests

@edrevo
Copy link
Contributor Author

edrevo commented Nov 7, 2019

@rdblue, since #26001 seems to be blocked, could we more forward with this PR which contains the minimal changes required to propagate partition information to ensure this gets in for Spark 3.0?

@edrevo
Copy link
Contributor Author

edrevo commented Nov 14, 2019

@cloud-fan, after today's Community Sync it seems that it is better to move forward with this PR instead of #25990. You did mention there was an alternative trait/interface where the number of physical partitions could be reported, but I didn't get the specific name. Were you refering to BatchWrite?

Thanks for all the help with this, by the way! Much appreciated!

@cloud-fan
Copy link
Contributor

Yea we should add the method to BatchWrite and StreamingWrite, so that we can pass the physical info after the write instance is created.

@edrevo edrevo requested a review from cloud-fan November 14, 2019 14:54
@@ -117,12 +119,32 @@ class InMemoryTable(
this
}

override def buildForBatch(): BatchWrite = writer
override def withQueryId(queryId: String): WriteBuilder = {
assert(!queryIdProvided, "queryId provided twice")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

later on can we continue the work in #25990 ? It still has value to give stronger guarantees. So that implementations don't need to do check like this.

@cloud-fan
Copy link
Contributor

ok to test

@@ -36,8 +36,8 @@ class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends B
writeSupport.abort(eppchId, messages)
}

override def createBatchWriterFactory(): DataWriterFactory = {
new MicroBatchWriterFactory(eppchId, writeSupport.createStreamingWriterFactory())
override def createBatchWriterFactory(numPartitions: Int): DataWriterFactory = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to come up with this at the last minute: can we create a PhysicalWriteInfo interface? In case we want to add more physical information in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Since we still want to move forward with the interface-based approach, I've decided to evolve #25990 to include both the PhysicalWriteInfo as well as WriteInfo

@SparkQA
Copy link

SparkQA commented Nov 15, 2019

Test build #113832 has finished for PR 25945 at commit f3dba5e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@edrevo
Copy link
Contributor Author

edrevo commented Nov 15, 2019

Closing this in favor of #25990

@edrevo edrevo closed this Nov 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants