Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ctasOptimizedTest…
Browse files Browse the repository at this point in the history
…Cases
  • Loading branch information
gatorsmile committed Oct 24, 2016
2 parents 765d104 + 4ecbe1b commit 1f2e7b8
Show file tree
Hide file tree
Showing 588 changed files with 17,194 additions and 6,059 deletions.
4 changes: 1 addition & 3 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

(Please fill in changes proposed in this fix)


## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)


(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
importFrom("methods", "is", "new", "signature", "show")
importFrom("stats", "gaussian", "setNames")
importFrom("utils", "download.file", "packageVersion", "untar")
importFrom("utils", "download.file", "object.size", "packageVersion", "untar")

# Disable native libraries till we figure out how to package it
# See SPARKR-7839
Expand Down Expand Up @@ -71,6 +71,7 @@ exportMethods("arrange",
"covar_samp",
"covar_pop",
"createOrReplaceTempView",
"crossJoin",
"crosstab",
"dapply",
"dapplyCollect",
Expand Down
104 changes: 78 additions & 26 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})

#' Set options/mode and then return the write object
#' @noRd
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
jmode <- convertToJSaveMode(mode)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write
}

#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
Expand Down Expand Up @@ -352,7 +365,7 @@ setMethod("colnames<-",

# Check if the column names have . in it
if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
stop("Colum names cannot contain the '.' symbol.")
stop("Column names cannot contain the '.' symbol.")
}

sdf <- callJMethod(x@sdf, "toDF", as.list(value))
Expand Down Expand Up @@ -727,6 +740,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.json
Expand All @@ -743,8 +758,9 @@ setMethod("toJSON",
#' @note write.json since 1.6.0
setMethod("write.json",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
})

Expand All @@ -755,6 +771,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.orc,SparkDataFrame,character-method
Expand All @@ -771,8 +789,9 @@ setMethod("write.json",
#' @note write.orc since 2.0.0
setMethod("write.orc",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
})

Expand All @@ -783,6 +802,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.parquet
Expand All @@ -800,8 +821,9 @@ setMethod("write.orc",
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
})

Expand All @@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.text,SparkDataFrame,character-method
Expand All @@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
#' @note write.text since 2.0.0
setMethod("write.text",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
})

Expand Down Expand Up @@ -2246,12 +2271,13 @@ setMethod("dropDuplicates",

#' Join
#'
#' Join two SparkDataFrames based on the given join expression.
#' Joins two SparkDataFrames based on the given join expression.
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
#' @param joinType The type of join to perform. The following join types are available:
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
Expand All @@ -2260,23 +2286,24 @@ setMethod("dropDuplicates",
#' @aliases join,SparkDataFrame,SparkDataFrame-method
#' @rdname join
#' @name join
#' @seealso \link{merge}
#' @seealso \link{merge} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' join(df1, df2) # Performs a Cartesian
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
#' join(df1, df2) # Attempts an inner join
#' }
#' @note join since 1.4.0
setMethod("join",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL) {
if (is.null(joinExpr)) {
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
# this may not fail until the planner checks for Cartesian join later on.
sdf <- callJMethod(x@sdf, "join", y@sdf)
} else {
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
if (is.null(joinType)) {
Expand All @@ -2297,22 +2324,52 @@ setMethod("join",
dataFrame(sdf)
})

#' CrossJoin
#'
#' Returns Cartesian Product on two SparkDataFrames.
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
#' @return A SparkDataFrame containing the result of the join operation.
#' @family SparkDataFrame functions
#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
#' @rdname crossJoin
#' @name crossJoin
#' @seealso \link{merge} \link{join}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' crossJoin(df1, df2) # Performs a Cartesian
#' }
#' @note crossJoin since 2.1.0
setMethod("crossJoin",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
dataFrame(sdf)
})

#' Merges two data frames
#'
#' @name merge
#' @param x the first data frame to be joined
#' @param y the second data frame to be joined
#' @param x the first data frame to be joined.
#' @param y the second data frame to be joined.
#' @param by a character vector specifying the join columns. If by is not
#' specified, the common column names in \code{x} and \code{y} will be used.
#' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
#' Product of x and y will be returned.
#' @param by.x a character vector specifying the joining columns for x.
#' @param by.y a character vector specifying the joining columns for y.
#' @param all a boolean value setting \code{all.x} and \code{all.y}
#' if any of them are unset.
#' @param all.x a boolean value indicating whether all the rows in x should
#' be including in the join
#' be including in the join.
#' @param all.y a boolean value indicating whether all the rows in y should
#' be including in the join
#' @param sort a logical argument indicating whether the resulting columns should be sorted
#' be including in the join.
#' @param sort a logical argument indicating whether the resulting columns should be sorted.
#' @param suffixes a string vector of length 2 used to make colnames of
#' \code{x} and \code{y} unique.
#' The first element is appended to each colname of \code{x}.
Expand All @@ -2326,20 +2383,21 @@ setMethod("join",
#' @family SparkDataFrame functions
#' @aliases merge,SparkDataFrame,SparkDataFrame-method
#' @rdname merge
#' @seealso \link{join}
#' @seealso \link{join} \link{crossJoin}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' merge(df1, df2) # Performs a Cartesian
#' merge(df1, df2) # Performs an inner join by common columns
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
#' merge(df1, df2, by = NULL) # Performs a Cartesian join
#' }
#' @note merge since 1.5.0
setMethod("merge",
Expand Down Expand Up @@ -2376,7 +2434,7 @@ setMethod("merge",
joinY <- by
} else {
# if by or both by.x and by.y have length 0, use Cartesian Product
joinRes <- join(x, y)
joinRes <- crossJoin(x, y)
return (joinRes)
}

Expand Down Expand Up @@ -2637,15 +2695,9 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})

Expand Down Expand Up @@ -2701,7 +2753,7 @@ setMethod("saveAsTable",
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
Expand Down
Loading

0 comments on commit 1f2e7b8

Please sign in to comment.