Skip to content

Commit

Permalink
[SPARK-17665][SPARKR] Support options/mode all for read/write APIs an…
Browse files Browse the repository at this point in the history
…d options in other types

## What changes were proposed in this pull request?

This PR includes the changes below:

  - Support `mode`/`options` in `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json` APIs

  - Support other types (logical, numeric and string) as options for `write.df`, `read.df`, `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json`

## How was this patch tested?

Unit tests in `test_sparkSQL.R`/ `utils.R`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15239 from HyukjinKwon/SPARK-17665.
  • Loading branch information
HyukjinKwon authored and Felix Cheung committed Oct 7, 2016
1 parent bb1aaf2 commit 9d8ae85
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 22 deletions.
43 changes: 31 additions & 12 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})

#' Set options/mode and then return the write object
#' @noRd
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
jmode <- convertToJSaveMode(mode)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write
}

#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
Expand Down Expand Up @@ -727,6 +740,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.json
Expand All @@ -743,8 +758,9 @@ setMethod("toJSON",
#' @note write.json since 1.6.0
setMethod("write.json",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
})

Expand All @@ -755,6 +771,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.orc,SparkDataFrame,character-method
Expand All @@ -771,8 +789,9 @@ setMethod("write.json",
#' @note write.orc since 2.0.0
setMethod("write.orc",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
})

Expand All @@ -783,6 +802,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.parquet
Expand All @@ -800,8 +821,9 @@ setMethod("write.orc",
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
})

Expand All @@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.text,SparkDataFrame,character-method
Expand All @@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
#' @note write.text since 2.0.0
setMethod("write.text",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
})

Expand Down Expand Up @@ -2637,15 +2662,9 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})

Expand Down Expand Up @@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
Expand Down
23 changes: 17 additions & 6 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
#' It goes through the entire dataset once to determine the schema.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.json
#' @export
Expand All @@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
read.json.default <- function(path) {
read.json.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "json", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
#' Loads an ORC file, returning the result as a SparkDataFrame.
#'
#' @param path Path of file to read.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.orc
#' @export
#' @name read.orc
#' @note read.orc since 2.0.0
read.orc <- function(path) {
read.orc <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the ORC file path
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
dataFrame(sdf)
}
Expand All @@ -430,11 +436,13 @@ read.orc <- function(path) {
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
read.parquet.default <- function(path) {
read.parquet.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.text
#' @export
Expand All @@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
read.text.default <- function(path) {
read.text.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down Expand Up @@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,25 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })

#' @rdname write.orc
#' @export
setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
setGeneric("write.parquet", function(x, path, ...) {
standardGeneric("write.parquet")
})

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })

#' @rdname schema
#' @export
Expand Down
22 changes: 22 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
env
}

# Utility function to capture the varargs into environment object but all values are converted
# into string.
varargsToStrEnv <- function(...) {
pairs <- list(...)
env <- new.env()
for (name in names(pairs)) {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."))
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
}
}
env
}

getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"MEMORY_AND_DISK",
Expand Down
75 changes: 75 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,23 @@ test_that("read/write csv as DataFrame", {
unlink(csvPath2)
})

test_that("Support other types for options", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
"2015,Chevy,Volt",
"NA,Dummy,Placeholder")
writeLines(mockLinesCsv, csvPath)

csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
expect_equal(collect(csvDf), collect(expected))

expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
unlink(csvPath)
})

test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(rdd, list("a", "b"))
Expand Down Expand Up @@ -497,6 +514,19 @@ test_that("read/write json files", {
unlink(jsonPath3)
})

test_that("read/write json files - compression option", {
df <- read.df(jsonPath, "json")

jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
write.json(df, jsonPath, compression = "gzip")
jsonDF <- read.json(jsonPath)
expect_is(jsonDF, "SparkDataFrame")
expect_equal(count(jsonDF), count(df))
expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)

unlink(jsonPath)
})

test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
Expand Down Expand Up @@ -1786,6 +1816,21 @@ test_that("read/write ORC files", {
unsetHiveContext()
})

test_that("read/write ORC files - compression option", {
setHiveContext(sc)
df <- read.df(jsonPath, "json")

orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
write.orc(df, orcPath2, compression = "ZLIB")
orcDF <- read.orc(orcPath2)
expect_is(orcDF, "SparkDataFrame")
expect_equal(count(orcDF), count(df))
expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)

unlink(orcPath2)
unsetHiveContext()
})

test_that("read/write Parquet files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
Expand Down Expand Up @@ -1817,6 +1862,23 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})

test_that("read/write Parquet files - compression option/mode", {
df <- read.df(jsonPath, "json")
tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")

# Test write.df and read.df
write.parquet(df, tempPath, compression = "GZIP")
df2 <- read.parquet(tempPath)
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)
expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)

write.parquet(df, tempPath, mode = "overwrite")
df3 <- read.parquet(tempPath)
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
})

test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(jsonPath, "text")
Expand All @@ -1838,6 +1900,19 @@ test_that("read/write text files", {
unlink(textPath2)
})

test_that("read/write text files - compression option", {
df <- read.df(jsonPath, "text")

textPath <- tempfile(pattern = "textPath", fileext = ".txt")
write.text(df, textPath, compression = "GZIP")
textDF <- read.text(textPath)
expect_is(textDF, "SparkDataFrame")
expect_equal(count(textDF), count(df))
expect_true(length(list.files(textPath, pattern = ".gz")) > 0)

unlink(textPath)
})

test_that("describe() and summarize() on a DataFrame", {
df <- read.json(jsonPath)
stats <- describe(df, "age")
Expand Down
Loading

0 comments on commit 9d8ae85

Please sign in to comment.