-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types #15239
[SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types #15239
Changes from all commits
9c1439e
837e321
08b8795
c8baba5
1ac3c7b
28df54b
20cda71
ba772b3
8e10415
63b55f2
f268305
eeb7db5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { | |||||
.Object | ||||||
}) | ||||||
|
||||||
#' Set options/mode and then return the write object | ||||||
#' @noRd | ||||||
setWriteOptions <- function(write, path = NULL, mode = "error", ...) { | ||||||
options <- varargsToStrEnv(...) | ||||||
if (!is.null(path)) { | ||||||
options[["path"]] <- path | ||||||
} | ||||||
jmode <- convertToJSaveMode(mode) | ||||||
write <- callJMethod(write, "mode", jmode) | ||||||
write <- callJMethod(write, "options", options) | ||||||
write | ||||||
} | ||||||
|
||||||
#' @export | ||||||
#' @param sdf A Java object reference to the backing Scala DataFrame | ||||||
#' @param isCached TRUE if the SparkDataFrame is cached | ||||||
|
@@ -727,6 +740,8 @@ setMethod("toJSON", | |||||
#' | ||||||
#' @param x A SparkDataFrame | ||||||
#' @param path The directory where the file is saved | ||||||
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) | ||||||
#' @param ... additional argument(s) passed to the method. | ||||||
#' | ||||||
#' @family SparkDataFrame functions | ||||||
#' @rdname write.json | ||||||
|
@@ -743,8 +758,9 @@ setMethod("toJSON", | |||||
#' @note write.json since 1.6.0 | ||||||
setMethod("write.json", | ||||||
signature(x = "SparkDataFrame", path = "character"), | ||||||
function(x, path) { | ||||||
function(x, path, mode = "error", ...) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this change the default on the JVM side when mode was previously unset? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default is [1]
[2]
|
||||||
write <- callJMethod(x@sdf, "write") | ||||||
write <- setWriteOptions(write, mode = mode, ...) | ||||||
invisible(callJMethod(write, "json", path)) | ||||||
}) | ||||||
|
||||||
|
@@ -755,6 +771,8 @@ setMethod("write.json", | |||||
#' | ||||||
#' @param x A SparkDataFrame | ||||||
#' @param path The directory where the file is saved | ||||||
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) | ||||||
#' @param ... additional argument(s) passed to the method. | ||||||
#' | ||||||
#' @family SparkDataFrame functions | ||||||
#' @aliases write.orc,SparkDataFrame,character-method | ||||||
|
@@ -771,8 +789,9 @@ setMethod("write.json", | |||||
#' @note write.orc since 2.0.0 | ||||||
setMethod("write.orc", | ||||||
signature(x = "SparkDataFrame", path = "character"), | ||||||
function(x, path) { | ||||||
function(x, path, mode = "error", ...) { | ||||||
write <- callJMethod(x@sdf, "write") | ||||||
write <- setWriteOptions(write, mode = mode, ...) | ||||||
invisible(callJMethod(write, "orc", path)) | ||||||
}) | ||||||
|
||||||
|
@@ -783,6 +802,8 @@ setMethod("write.orc", | |||||
#' | ||||||
#' @param x A SparkDataFrame | ||||||
#' @param path The directory where the file is saved | ||||||
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) | ||||||
#' @param ... additional argument(s) passed to the method. | ||||||
#' | ||||||
#' @family SparkDataFrame functions | ||||||
#' @rdname write.parquet | ||||||
|
@@ -800,8 +821,9 @@ setMethod("write.orc", | |||||
#' @note write.parquet since 1.6.0 | ||||||
setMethod("write.parquet", | ||||||
signature(x = "SparkDataFrame", path = "character"), | ||||||
function(x, path) { | ||||||
function(x, path, mode = "error", ...) { | ||||||
write <- callJMethod(x@sdf, "write") | ||||||
write <- setWriteOptions(write, mode = mode, ...) | ||||||
invisible(callJMethod(write, "parquet", path)) | ||||||
}) | ||||||
|
||||||
|
@@ -825,6 +847,8 @@ setMethod("saveAsParquetFile", | |||||
#' | ||||||
#' @param x A SparkDataFrame | ||||||
#' @param path The directory where the file is saved | ||||||
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) | ||||||
#' @param ... additional argument(s) passed to the method. | ||||||
#' | ||||||
#' @family SparkDataFrame functions | ||||||
#' @aliases write.text,SparkDataFrame,character-method | ||||||
|
@@ -841,8 +865,9 @@ setMethod("saveAsParquetFile", | |||||
#' @note write.text since 2.0.0 | ||||||
setMethod("write.text", | ||||||
signature(x = "SparkDataFrame", path = "character"), | ||||||
function(x, path) { | ||||||
function(x, path, mode = "error", ...) { | ||||||
write <- callJMethod(x@sdf, "write") | ||||||
write <- setWriteOptions(write, mode = mode, ...) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess similarly here, what if someone calls
? |
||||||
invisible(callJMethod(write, "text", path)) | ||||||
}) | ||||||
|
||||||
|
@@ -2637,15 +2662,9 @@ setMethod("write.df", | |||||
if (is.null(source)) { | ||||||
source <- getDefaultSqlSource() | ||||||
} | ||||||
jmode <- convertToJSaveMode(mode) | ||||||
options <- varargsToEnv(...) | ||||||
if (!is.null(path)) { | ||||||
options[["path"]] <- path | ||||||
} | ||||||
write <- callJMethod(df@sdf, "write") | ||||||
write <- callJMethod(write, "format", source) | ||||||
write <- callJMethod(write, "mode", jmode) | ||||||
write <- callJMethod(write, "options", options) | ||||||
write <- setWriteOptions(write, path = path, mode = mode, ...) | ||||||
write <- handledCallJMethod(write, "save") | ||||||
}) | ||||||
|
||||||
|
@@ -2701,7 +2720,7 @@ setMethod("saveAsTable", | |||||
source <- getDefaultSqlSource() | ||||||
} | ||||||
jmode <- convertToJSaveMode(mode) | ||||||
options <- varargsToEnv(...) | ||||||
options <- varargsToStrEnv(...) | ||||||
|
||||||
write <- callJMethod(df@sdf, "write") | ||||||
write <- callJMethod(write, "format", source) | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"), | |
#' It goes through the entire dataset once to determine the schema. | ||
#' | ||
#' @param path Path of file to read. A vector of multiple paths is allowed. | ||
#' @param ... additional external data source specific named properties. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is odd - was it not complaining about this missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it was not. I am not very sure on this (as actually I am not used to this CRAN check). My guess is, it seems they combine the arguments? For example, Parquet API is as below: #' @param path path of file to read. A vector of multiple paths is allowed.
#' @return SparkDataFrame
#' @rdname read.parquet
...
read.parquet.default <- function(path, ...) { #' @param ... argument(s) passed to the method.
#' @rdname read.parquet
...
parquetFile.default <- function(...) { It complained about duplicated On the other hand, for JSON APIs, #' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.json
...
read.json.default <- function(path, ...) { #' @rdname read.json
#' @name jsonFile
...
jsonFile.default <- function(path) { It seems If you meant another problem, could you please guide me? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right - when 2 functions share the same @Rdname, they are documented on the same Rd page and CRAN checks requirement is to have 1 and only 1 I haven't check, but my guess is you need to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your advice. I will try to deal with this as far as I can! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, do you think it is okay as it is? I tried to make them look more consistent and clean but it kind of failed. |
||
#' @return SparkDataFrame | ||
#' @rdname read.json | ||
#' @export | ||
|
@@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"), | |
#' @name read.json | ||
#' @method read.json default | ||
#' @note read.json since 1.6.0 | ||
read.json.default <- function(path) { | ||
read.json.default <- function(path, ...) { | ||
sparkSession <- getSparkSession() | ||
options <- varargsToStrEnv(...) | ||
# Allow the user to have a more flexible definiton of the text file path | ||
paths <- as.list(suppressWarnings(normalizePath(path))) | ||
read <- callJMethod(sparkSession, "read") | ||
read <- callJMethod(read, "options", options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just notice this in read.* function:
Which path will be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be an existing problem but would go down a different code path in data sources depending on their implementation... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me test this first and will try to show together! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. In R, it seems we can't duplicatedly set paths. In Scala and Python, for reading it takes all paths set in option and as arguments for reading. For writing, the argument overwrites the path set in option. For R, in more details, It seems we can't simply specify the same keyword argument both. With the data below,
{"NAME": "Hyukjin"}
{"NAME": "Felix"}
In the case of With a single keyword argument, it seems > tmp <- function(path, ...) {
+ print(path)
+ }
>
> tmp("a", path = "b")
[1] "b" For > varargsToStrEnv("a", path="b")
Error in env[[name]] <- value :
attempt to use zero-length variable name" However, it seems fine if they are all non-keywords arguments or keywords arguments as below: > varargsToStrEnv("a", 1, 2, 3)
<environment: 0x7f815ba34d18> > varargsToStrEnv(a="a", b=1, c=2, d=3)
<environment: 0x7f815ba6a440> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, thank you for the very detailed analysis and tests.
This is because of the parameter hack.
Think read.df is unique somewhat in the sense the first parameter is named
This case is somewhat dangerous - I think we end by passing a list of properties without name to the JVM side - it might be a good idea to check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeap, let me try to organise the unsolved comments here and #15231 if there is any! Thank you. |
||
sdf <- callJMethod(read, "json", paths) | ||
dataFrame(sdf) | ||
} | ||
|
@@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { | |
#' Loads an ORC file, returning the result as a SparkDataFrame. | ||
#' | ||
#' @param path Path of file to read. | ||
#' @param ... additional external data source specific named properties. | ||
#' @return SparkDataFrame | ||
#' @rdname read.orc | ||
#' @export | ||
#' @name read.orc | ||
#' @note read.orc since 2.0.0 | ||
read.orc <- function(path) { | ||
read.orc <- function(path, ...) { | ||
sparkSession <- getSparkSession() | ||
options <- varargsToStrEnv(...) | ||
# Allow the user to have a more flexible definiton of the ORC file path | ||
path <- suppressWarnings(normalizePath(path)) | ||
read <- callJMethod(sparkSession, "read") | ||
read <- callJMethod(read, "options", options) | ||
sdf <- callJMethod(read, "orc", path) | ||
dataFrame(sdf) | ||
} | ||
|
@@ -430,11 +436,13 @@ read.orc <- function(path) { | |
#' @name read.parquet | ||
#' @method read.parquet default | ||
#' @note read.parquet since 1.6.0 | ||
read.parquet.default <- function(path) { | ||
read.parquet.default <- function(path, ...) { | ||
sparkSession <- getSparkSession() | ||
options <- varargsToStrEnv(...) | ||
# Allow the user to have a more flexible definiton of the Parquet file path | ||
paths <- as.list(suppressWarnings(normalizePath(path))) | ||
read <- callJMethod(sparkSession, "read") | ||
read <- callJMethod(read, "options", options) | ||
sdf <- callJMethod(read, "parquet", paths) | ||
dataFrame(sdf) | ||
} | ||
|
@@ -467,6 +475,7 @@ parquetFile <- function(x, ...) { | |
#' Each line in the text file is a new row in the resulting SparkDataFrame. | ||
#' | ||
#' @param path Path of file to read. A vector of multiple paths is allowed. | ||
#' @param ... additional external data source specific named properties. | ||
#' @return SparkDataFrame | ||
#' @rdname read.text | ||
#' @export | ||
|
@@ -479,11 +488,13 @@ parquetFile <- function(x, ...) { | |
#' @name read.text | ||
#' @method read.text default | ||
#' @note read.text since 1.6.1 | ||
read.text.default <- function(path) { | ||
read.text.default <- function(path, ...) { | ||
sparkSession <- getSparkSession() | ||
options <- varargsToStrEnv(...) | ||
# Allow the user to have a more flexible definiton of the text file path | ||
paths <- as.list(suppressWarnings(normalizePath(path))) | ||
read <- callJMethod(sparkSession, "read") | ||
read <- callJMethod(read, "options", options) | ||
sdf <- callJMethod(read, "text", paths) | ||
dataFrame(sdf) | ||
} | ||
|
@@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string | |
"in 'spark.sql.sources.default' configuration by default.") | ||
} | ||
sparkSession <- getSparkSession() | ||
options <- varargsToEnv(...) | ||
options <- varargsToStrEnv(...) | ||
if (!is.null(path)) { | ||
options[["path"]] <- path | ||
} | ||
|
@@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) { | |
#' @note createExternalTable since 1.4.0 | ||
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) { | ||
sparkSession <- getSparkSession() | ||
options <- varargsToEnv(...) | ||
options <- varargsToStrEnv(...) | ||
if (!is.null(path)) { | ||
options[["path"]] <- path | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think if it make sense to have a generic write method? ie. include
write <- callJMethod(x@sdf, "write")
andinvisible(callJMethod(write, source, path))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that sounds good. Actually, I was thinking single common method for options in both
read
/write
(maybe inutils.R
?) and two common methods for reading/writing inread
/write
. I am wondering maybe if you think it is okay for me to try this in another PR after this one/#15231 are hopefully merged.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine