Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types #15239

Closed
wants to merge 12 commits into from
43 changes: 31 additions & 12 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})

#' Set options/mode and then return the write object
#' @noRd
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
jmode <- convertToJSaveMode(mode)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think if it make sense to have a generic write method? ie. include write <- callJMethod(x@sdf, "write") and invisible(callJMethod(write, source, path))

Copy link
Member Author

@HyukjinKwon HyukjinKwon Sep 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that sounds good. Actually, I was thinking single common method for options in both read/write (maybe in utils.R?) and two common methods for reading/writing in read/write. I am wondering maybe if you think it is okay for me to try this in another PR after this one/#15231 are hopefully merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine

}

#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
Expand Down Expand Up @@ -727,6 +740,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.json
Expand All @@ -743,8 +758,9 @@ setMethod("toJSON",
#' @note write.json since 1.6.0
setMethod("write.json",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this change the default on the JVM side when mode was previously unset?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Sep 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is SaveMode.ErrorIfExists[1] which error means[2].

[1]

private var mode: SaveMode = SaveMode.ErrorIfExists

[2]
case "error" | "default" => SaveMode.ErrorIfExists

write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
})

Expand All @@ -755,6 +771,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.orc,SparkDataFrame,character-method
Expand All @@ -771,8 +789,9 @@ setMethod("write.json",
#' @note write.orc since 2.0.0
setMethod("write.orc",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
})

Expand All @@ -783,6 +802,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.parquet
Expand All @@ -800,8 +821,9 @@ setMethod("write.orc",
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
})

Expand All @@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.text,SparkDataFrame,character-method
Expand All @@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
#' @note write.text since 2.0.0
setMethod("write.text",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess similarly here, what if someone calls

write.text(df, path = "a", path = "b")

?

invisible(callJMethod(write, "text", path))
})

Expand Down Expand Up @@ -2637,15 +2662,9 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})

Expand Down Expand Up @@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
Expand Down
23 changes: 17 additions & 6 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
#' It goes through the entire dataset once to determine the schema.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is odd - was it not complaining about this missing?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Sep 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was not. I am not very sure on this (as actually I am not used to this CRAN check). My guess is, it seems they combine the arguments? For example, Parquet API is as below:

 #' @param path path of file to read. A vector of multiple paths is allowed.
 #' @return SparkDataFrame
 #' @rdname read.parquet
...
read.parquet.default <- function(path, ...) {
 #' @param ... argument(s) passed to the method.
 #' @rdname read.parquet
...
 parquetFile.default <- function(...) {

It complained about duplicated @params when I tried to add @param ... to read.parquet.default. So, I ended up with removing this back.

On the other hand, for JSON APIs,

#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.json
...
read.json.default <- function(path, ...) {
 #' @rdname read.json
 #' @name jsonFile
...
 jsonFile.default <- function(path) {

It seems jsonFile does not describe @param. So, I think it passed.

If you meant another problem, could you please guide me?

Copy link
Member

@felixcheung felixcheung Sep 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - when 2 functions share the same @Rdname, they are documented on the same Rd page and CRAN checks requirement is to have 1 and only 1 @param ... if either/both function has ... as parameter.

I haven't check, but my guess is you need to add @param ... for @rdname read.json since ... is new.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your advice. I will try to deal with this as far as I can!

Copy link
Member Author

@HyukjinKwon HyukjinKwon Sep 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, do you think it is okay as it is? I tried to make them look more consistent and clean but it kind of failed.

#' @return SparkDataFrame
#' @rdname read.json
#' @export
Expand All @@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
read.json.default <- function(path) {
read.json.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just notice this in read.* function:
what if someone calls

read.json(path = "a", path = "b")

Which path will be used?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an existing problem but would go down a different code path in data sources depending on their implementation...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me test this first and will try to show together!

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

In R, it seems we can't duplicatedly set paths.

In Scala and Python, for reading it takes all paths set in option and as arguments for reading. For writing, the argument overwrites the path set in option.

For R, in more details, It seems we can't simply specify the same keyword argument both.

With the data below,

hyukjin.json

{"NAME": "Hyukjin"}

felix.json

{"NAME": "Felix"}
  • read.json()

    Duplicated keywords

    > read.json(path = "hyukjin.json", path = "felix.json")
    Error in dispatchFunc("read.json(path)", x, ...) :
      argument "x" is missing, with no default

    With a single keyword argument

    > collect(read.json("hyukjin.json", path = "felix.json"))
       NAME
    1 Felix
  • read.df()

    Duplicated keywords

    > read.df(path = "hyukjin.json", path = "felix.json", source = "json")
    Error in f(x, ...) :
      formal argument "path" matched by multiple actual arguments

    With a single keyword argument

    > read.df("hyukjin.json", path = "felix.json", source = "json")
    Error: class(schema) == "structType" is not TRUE

    This case, it seems "hyukjin.json" became the third argument, schema.

In the case of With a single keyword argument, it seems path becomes felix.json. For example, as below:

> tmp <- function(path, ...) {
+   print(path)
+ }
>
> tmp("a", path = "b")
[1] "b"

For ... arguments, it seems it throws an exception when we use some variables mix-and-matched as below:

> varargsToStrEnv("a", path="b")
Error in env[[name]] <- value :
  attempt to use zero-length variable name"

However, it seems fine if they are all non-keywords arguments or keywords arguments as below:

> varargsToStrEnv("a", 1, 2, 3)
<environment: 0x7f815ba34d18>
> varargsToStrEnv(a="a", b=1, c=2, d=3)
<environment: 0x7f815ba6a440>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thank you for the very detailed analysis and tests.
I think generally it would be great to match the scala/python behavior (but not only because to match it) for read to include all path(s).

> read.json(path = "hyukjin.json", path = "felix.json")
Error in dispatchFunc("read.json(path)", x, ...) :
  argument "x" is missing, with no default

This is because of the parameter hack.

> read.df(path = "hyukjin.json", path = "felix.json", source = "json")
Error in f(x, ...) :
  formal argument "path" matched by multiple actual arguments

Think read.df is unique somewhat in the sense the first parameter is named path - this is both helpful (if we don't want to support multiple path like this) or bad (user can't specify multiple paths)

> varargsToStrEnv("a", 1, 2, 3)
<environment: 0x7f815ba34d18>

This case is somewhat dangerous - I think we end by passing a list of properties without name to the JVM side - it might be a good idea to check for zero-length variable name - perhaps could you open a JIRA on that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, let me try to organise the unsolved comments here and #15231 if there is any! Thank you.

sdf <- callJMethod(read, "json", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
#' Loads an ORC file, returning the result as a SparkDataFrame.
#'
#' @param path Path of file to read.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.orc
#' @export
#' @name read.orc
#' @note read.orc since 2.0.0
read.orc <- function(path) {
read.orc <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the ORC file path
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
dataFrame(sdf)
}
Expand All @@ -430,11 +436,13 @@ read.orc <- function(path) {
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
read.parquet.default <- function(path) {
read.parquet.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.text
#' @export
Expand All @@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
read.text.default <- function(path) {
read.text.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down Expand Up @@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,25 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })

#' @rdname write.orc
#' @export
setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
setGeneric("write.parquet", function(x, path, ...) {
standardGeneric("write.parquet")
})

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })

#' @rdname schema
#' @export
Expand Down
22 changes: 22 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
env
}

# Utility function to capture the varargs into environment object but all values are converted
# into string.
varargsToStrEnv <- function(...) {
pairs <- list(...)
env <- new.env()
for (name in names(pairs)) {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."))
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
}
}
env
}

getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"MEMORY_AND_DISK",
Expand Down
75 changes: 75 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,23 @@ test_that("read/write csv as DataFrame", {
unlink(csvPath2)
})

test_that("Support other types for options", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
"2015,Chevy,Volt",
"NA,Dummy,Placeholder")
writeLines(mockLinesCsv, csvPath)

csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
expect_equal(collect(csvDf), collect(expected))

expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
unlink(csvPath)
})

test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(rdd, list("a", "b"))
Expand Down Expand Up @@ -497,6 +514,19 @@ test_that("read/write json files", {
unlink(jsonPath3)
})

test_that("read/write json files - compression option", {
df <- read.df(jsonPath, "json")

jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
write.json(df, jsonPath, compression = "gzip")
jsonDF <- read.json(jsonPath)
expect_is(jsonDF, "SparkDataFrame")
expect_equal(count(jsonDF), count(df))
expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)

unlink(jsonPath)
})

test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
Expand Down Expand Up @@ -1786,6 +1816,21 @@ test_that("read/write ORC files", {
unsetHiveContext()
})

test_that("read/write ORC files - compression option", {
setHiveContext(sc)
df <- read.df(jsonPath, "json")

orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
write.orc(df, orcPath2, compression = "ZLIB")
orcDF <- read.orc(orcPath2)
expect_is(orcDF, "SparkDataFrame")
expect_equal(count(orcDF), count(df))
expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)

unlink(orcPath2)
unsetHiveContext()
})

test_that("read/write Parquet files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
Expand Down Expand Up @@ -1817,6 +1862,23 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})

test_that("read/write Parquet files - compression option/mode", {
df <- read.df(jsonPath, "json")
tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")

# Test write.df and read.df
write.parquet(df, tempPath, compression = "GZIP")
df2 <- read.parquet(tempPath)
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)

write.parquet(df, tempPath, mode = "overwrite")
df3 <- read.parquet(tempPath)
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
})

test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(jsonPath, "text")
Expand All @@ -1838,6 +1900,19 @@ test_that("read/write text files", {
unlink(textPath2)
})

test_that("read/write text files - compression option", {
df <- read.df(jsonPath, "text")

textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.text(df, textPath, compression = "GZIP")
textDF <- read.text(textPath)
expect_is(textDF, "SparkDataFrame")
expect_equal(count(textDF), count(df))
expect_true(length(list.files(textPath, pattern = ".gz")) > 0)

unlink(textPath)
})

test_that("describe() and summarize() on a DataFrame", {
df <- read.json(jsonPath)
stats <- describe(df, "age")
Expand Down
Loading