-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17658][SPARKR] read.df/write.df API taking path optionally in SparkR #15231
Conversation
cc @felixcheung and @shivaram |
FWIW - Python API seems having [1]https://github.com/apache/spark/blob/master/python/pyspark/sql/readwriter.py#L521 |
Test build #65865 has finished for PR 15231 at commit
|
retest this please |
Test build #65866 has finished for PR 15231 at commit
|
Test build #65868 has finished for PR 15231 at commit
|
Let's add some tests for this? |
@felixcheung Yeap, I just added. As we don't currently have an internal datasource allowing |
Test build #65880 has finished for PR 15231 at commit
|
Hmm, should we hold till 12601 is merged then? Seems like we shouldn't allow this unless internal datasources are supporting this more broadly. Also, before the path parameter type is in the signature, ie.
Would error with some descriptive error, with this change it would get some JVM exception which seems to degrade the experience a bit. Similarly for the path not specified case Could you add checks to |
@felixcheung , I usually don't like to answer by quote but let me do this just to clarify.
As omitting
Yeap, I could add some type checks
Also, yes. Maybe, we could avoid the direct JVM message after catching this and make it pretty within R just like PySpark does[1]. (although I am not sure if it sounds good in R).
Sure, I will try to address the points. [1] spark/python/pyspark/sql/utils.py Lines 64 to 80 in 9a50719
|
Oh, BTW it seems |
Test build #65890 has finished for PR 15231 at commit
|
(I just updated the PR description too) |
Test build #65893 has finished for PR 15231 at commit
|
Test build #65894 has finished for PR 15231 at commit
|
@@ -2624,6 +2624,15 @@ setMethod("except", | |||
setMethod("write.df", | |||
signature(df = "SparkDataFrame"), | |||
function(df, path = NULL, source = NULL, mode = "error", ...) { | |||
if (!is.character(path) && !is.null(path)) { | |||
stop("path should be charactor, null or omitted.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"character"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor point: is it more efficient to flip the checks, ie.
if (!is.null(path) && !is.character(path))
, since path defaults to NULL?
stop("path should be charactor, null or omitted.") | ||
} | ||
if (!is.character(source) && !is.null(source)) { | ||
stop("source should be charactor, null or omitted. It is 'parquet' by default.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strictly speaking, it's spark.sql.sources.default
property, and when it is not set, then it is parquet
@@ -698,6 +698,21 @@ isSparkRShell <- function() { | |||
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) | |||
} | |||
|
|||
captureJVMException <- function(e) { | |||
stacktrace <- as.character(e) | |||
if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there cases where the IllegalArgument should be checked on the R side first to avoid the exception in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! @felixcheung I will address all other comments above. However, for this one, I was thinking hard but it seems not easy because we won't know if given data source is valid or not in R side first.
I might be able to do this only for internal data sources or known databricks datasources such as "redshift" or "xml" like.. creating a map for our internal data sources and then checking a path is given or not. However, I am not sure if it is a good idea to manage another list for datasources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I don't think we should couple the R code to the underlining data source implementations, and was not suggesting that :)
I guess I'm saying there are still many (other) cases where the parameters are unchecked and would be good to see if this check to convert JVM IllegalArgumentException is sufficient or more checks should be added to the R side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Yeap. This might be about best effort thing. I think I tried (if I am right) all combinations of parameters mssing/wrong in the APIs. One exceptional case for both APIs is, they throw an exception, ClassCastException
when the extra options are wrongly typed, which I think we should check within R side and this will be handled in #15239
I might better open another PR for validating parameters across SparkR if you think it is okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great, thanks - generally I'd prefer having parameter checks in R; though in this case I think we need balance the added code complicity and reduced usability (by checking more, it might fail where it didn't before).
so I'm not 100% sure we should add parameter checks all across the board.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, I do understand and will investigate it with keeping this in mind :)
@@ -698,6 +698,21 @@ isSparkRShell <- function() { | |||
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) | |||
} | |||
|
|||
captureJVMException <- function(e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll be great to add some tests that would trigger tryCatch and this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since there are 3 (or 2) cases here and their handling seem nontrivial, could you add a test for each of the exception main types?
The commits I just pushed address the comments except for #15231 (comment) |
Test build #65905 has finished for PR 15231 at commit
|
Test build #65904 has finished for PR 15231 at commit
|
since #12601 is merged, could you check if this should be updated or tested? |
I just make this updated and manually tested the jdbc datasource with h2 database as below with {"NAME": "bcd"}
{"NAME": "abc"}
{"NAME": "Hyukjin"} with the temp table below: val url = "jdbc:h2:mem:testdb0"
val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass"
var conn: java.sql.Connection = null
Utils.classForName("org.h2.Driver")
// Extra properties that will be specified for our database. We need these to test
// usage of parameters from OPTIONS clause in queries.
val properties = new Properties()
properties.setProperty("user", "testUser")
properties.setProperty("password", "testPass")
properties.setProperty("rowId", "false")
conn = DriverManager.getConnection(url, properties)
conn.prepareStatement(
"create table people (name TEXT(32) NOT NULL)").executeUpdate()
conn.prepareStatement("insert into people values ('fred')").executeUpdate()
conn.prepareStatement("insert into people values ('mary')").executeUpdate()
conn.commit() with the codes below: > df <- read.df(source="jdbc",
+ driver="org.h2.Driver",
+ url="jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+ dbtable="PEOPLE")
> collect(df)
NAME
1 fred
2 mary
> jsonDF <- read.json("test.json")
> write.df(jsonDF,
+ source = "jdbc",
+ mode = "overwrite",
+ driver = "org.h2.Driver",
+ url = "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+ dbtable = "PEOPLE")
> readback <- read.df(source = "jdbc",
+ driver = "org.h2.Driver",
+ url = "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
+ dbtable = "PEOPLE")
> collect(readback)
NAME
1 bcd
2 abc
3 Hyukjin |
@felixcheung BTW, it seems pretty difficult to add a test for JDBC. Maybe, I could do this later in another PR. |
Test build #66169 has finished for PR 15231 at commit
|
We talked about loading a jar that is running as h2 source, and then calling read.jdbc/write.jdbc from R. |
I tested this, and this is the message I see:
I think we should lose the first part, "in value[3L]"? Perhaps we have the function name instead "getSQLDataType" instead? Also I think it'd be important to differentiate where the message is coming from, so how about this add that to the stop calls, so how about something like
|
@felixcheung I see. Yes, I think that is a good idea. I just read the codes further. How about the one like the original message before such as ..
for now and then dealing with printing the original name ( It seems one of the easy ways to reach the goal would be a wrapper for |
I'd prefer Since this is regarding @HyukjinKwon How about we make the message change here and merge this PR ASAP? Am I understanding the context or level of the changes required for this? |
@felixcheung ah, yes, maybe, I just wanted to avoid a change a lot but it seems making the change here is more sensible. Let me try to fix this here together. Thank you! |
@felixcheung I just updated this and now it'd print the exceptions as below: > read.df(source="json")
Error in loadDF : analysis error - Unable to infer schema for JSON at . It must be specified manually;
> read.df("arbitrary_path")
Error in loadDF : analysis error - Path does not exist: file: .../arbitrary_path;
> write.df(df, source="csv")
Error in save : illegal argument - 'path' is not specified Could you please take another look? |
Test build #66311 has finished for PR 15231 at commit
|
Test build #66312 has finished for PR 15231 at commit
|
Test build #66313 has finished for PR 15231 at commit
|
# "Error in invokeJava(...)". Here, it replaces the characters to | ||
# `paste("Error in", method, ":")` in order to identify which function | ||
# was called in JVM side. | ||
stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor nit: you could probably replace the double pass with grep above and strsplit with just the result from strsplit
@@ -167,10 +167,13 @@ test_that("convertToJSaveMode", { | |||
}) | |||
|
|||
test_that("captureJVMException", { | |||
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSQLDataType", | |||
method <- "getSQLDataType" | |||
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change this test to handledCallJStatic
too?
I think also it would be great to add to examples one for read.df and one for write.df without the path parameter (like a jdbc one) |
I've merged this to master. We could take some example improvements along with SPARK-17665. |
Thanks again for your close review. Will keep in mind the comments. |
…SparkR ## What changes were proposed in this pull request? `write.df`/`read.df` API require path which is not actually always necessary in Spark. Currently, it only affects the datasources implementing `CreatableRelationProvider`. Currently, Spark currently does not have internal data sources implementing this but it'd affect other external datasources. In addition we'd be able to use this way in Spark's JDBC datasource after apache#12601 is merged. **Before** - `read.df` ```r > read.df(source = "json") Error in dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", : argument "x" is missing with no default ``` ```r > read.df(path = c(1, 2)) Error in dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", : argument "x" is missing with no default ``` ```r > read.df(c(1, 2)) Error in invokeJava(isStatic = TRUE, className, methodName, ...) : java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.String at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:300) at ... In if (is.na(object)) { : ... ``` - `write.df` ```r > write.df(df, source = "json") Error in (function (classes, fdef, mtable) : unable to find an inherited method for function ‘write.df’ for signature ‘"function", "missing"’ ``` ```r > write.df(df, source = c(1, 2)) Error in (function (classes, fdef, mtable) : unable to find an inherited method for function ‘write.df’ for signature ‘"SparkDataFrame", "missing"’ ``` ```r > write.df(df, mode = TRUE) Error in (function (classes, fdef, mtable) : unable to find an inherited method for function ‘write.df’ for signature ‘"SparkDataFrame", "missing"’ ``` **After** - `read.df` ```r > read.df(source = "json") Error in loadDF : analysis error - Unable to infer schema for JSON at . It must be specified manually; ``` ```r > read.df(path = c(1, 2)) Error in f(x, ...) : path should be charactor, null or omitted. ``` ```r > read.df(c(1, 2)) Error in f(x, ...) : path should be charactor, null or omitted. ``` - `write.df` ```r > write.df(df, source = "json") Error in save : illegal argument - 'path' is not specified ``` ```r > write.df(df, source = c(1, 2)) Error in .local(df, path, ...) : source should be charactor, null or omitted. It is 'parquet' by default. ``` ```r > write.df(df, mode = TRUE) Error in .local(df, path, ...) : mode should be charactor or omitted. It is 'error' by default. ``` ## How was this patch tested? Unit tests in `test_sparkSQL.R` Author: hyukjinkwon <gurwls223@gmail.com> Closes apache#15231 from HyukjinKwon/write-default-r.
What changes were proposed in this pull request?
write.df
/read.df
API require path which is not actually always necessary in Spark. Currently, it only affects the datasources implementingCreatableRelationProvider
. Currently, Spark currently does not have internal data sources implementing this but it'd affect other external datasources.In addition we'd be able to use this way in Spark's JDBC datasource after #12601 is merged.
Before
read.df
write.df
After
read.df
write.df
How was this patch tested?
Unit tests in
test_sparkSQL.R