Skip to content


merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
lianhuiwang committed Apr 21, 2016
2 parents 7ea7274 + 24f338b commit d16b5f3
Show file tree
Hide file tree
Showing 1,077 changed files with 10,369 additions and 10,577 deletions.
3 changes: 3 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ exportMethods("arrange",
Expand All @@ -125,6 +126,7 @@ exportMethods("%in%",
Expand Down Expand Up @@ -284,6 +286,7 @@ export("as.DataFrame",
Expand Down
47 changes: 40 additions & 7 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2296,12 +2296,8 @@ setMethod("fillna",
#' }
signature(x = "DataFrame"),
function(x, ...) {
# Check if additional parameters have been passed
if (length(list(...)) > 0) {
stop(paste("Unused argument(s): ", paste(list(...), collapse = ", ")))
function(x, row.names = NULL, optional = FALSE, ...) {, row.names, optional, ...)

#' The specified DataFrame is attached to the R search path. This means that
Expand Down Expand Up @@ -2363,7 +2359,7 @@ setMethod("with",
#' @examples \dontrun{
#' # Create a DataFrame from the Iris dataset
#' irisDF <- createDataFrame(sqlContext, iris)
#' # Show the structure of the DataFrame
#' str(irisDF)
#' }
Expand Down Expand Up @@ -2468,3 +2464,40 @@ setMethod("drop",
function(x) {

#' Saves the content of the DataFrame to an external database table via JDBC
#' Additional JDBC database connection properties can be set (...)
#' Also, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes: \cr
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
#' error: An exception is expected to be thrown. \cr
#' ignore: The save operation is expected to not save the contents of the DataFrame
#' and to not change the existing data. \cr
#' @param x A SparkSQL DataFrame
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
#' @param tableName The name of the table in the external database
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @family DataFrame functions
#' @rdname write.jdbc
#' @name write.jdbc
#' @export
#' @examples
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
#' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
#' }
signature(x = "DataFrame", url = "character", tableName = "character"),
function(x, url, tableName, mode = "error", ...){
jmode <- convertToJSaveMode(mode)
jprops <- varargsToJProperties(...)
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
58 changes: 58 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -583,3 +583,61 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)

#' Create a DataFrame representing the database table accessible via JDBC URL
#' Additional JDBC database connection properties can be set (...)
#' Only one of partitionColumn or predicates should be set. Partitions of the table will be
#' retrieved in parallel based on the `numPartitions` or by the predicates.
#' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
#' your external database systems.
#' @param sqlContext SQLContext to use
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
#' @param tableName the name of the table in the external database
#' @param partitionColumn the name of a column of integral type that will be used for partitioning
#' @param lowerBound the minimum value of `partitionColumn` used to decide partition stride
#' @param upperBound the maximum value of `partitionColumn` used to decide partition stride
#' @param numPartitions the number of partitions, This, along with `lowerBound` (inclusive),
#' `upperBound` (exclusive), form partition strides for generated WHERE
#' clause expressions used to split the column `partitionColumn` evenly.
#' This defaults to SparkContext.defaultParallelism when unset.
#' @param predicates a list of conditions in the where clause; each one defines one partition
#' @return DataFrame
#' @rdname read.jdbc
#' @name read.jdbc
#' @export
#' @examples
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
#' df <- read.jdbc(sqlContext, jdbcUrl, "table", predicates = list("field<=123"), user = "username")
#' df2 <- read.jdbc(sqlContext, jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
#' upperBound = 10000, user = "username", password = "password")
#' }

read.jdbc <- function(sqlContext, url, tableName,
partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
numPartitions = 0L, predicates = list(), ...) {
jprops <- varargsToJProperties(...)

read <- callJMethod(sqlContext, "read")
if (!is.null(partitionColumn)) {
if (is.null(numPartitions) || numPartitions == 0) {
sc <- callJMethod(sqlContext, "sparkContext")
numPartitions <- callJMethod(sc, "defaultParallelism")
} else {
numPartitions <- numToInt(numPartitions)
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
} else if (length(predicates) > 0) {
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
} else {
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
22 changes: 21 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ setMethod("rint",

#' round
#' Returns the value of the column `e` rounded to 0 decimal places.
#' Returns the value of the column `e` rounded to 0 decimal places using HALF_UP rounding mode.
#' @rdname round
#' @name round
Expand All @@ -1008,6 +1008,26 @@ setMethod("round",

#' bround
#' Returns the value of the column `e` rounded to `scale` decimal places using HALF_EVEN rounding
#' mode if `scale` >= 0 or at integral part when `scale` < 0.
#' Also known as Gaussian rounding or bankers' rounding that rounds to the nearest even number.
#' bround(2.5, 0) = 2, bround(3.5, 0) = 4.
#' @rdname bround
#' @name bround
#' @family math_funcs
#' @export
#' @examples \dontrun{bround(df$c, 0)}
signature(x = "Column"),
function(x, scale = 0) {
jc <- callJStatic("org.apache.spark.sql.functions", "bround", x@jc, as.integer(scale))

#' rtrim
#' Trim the spaces from right end for the specified string value.
Expand Down
15 changes: 14 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })

#' @rdname
#' @export
function(x, row.names = NULL, optional = FALSE, ...) {

#' @rdname attach
#' @export
Expand Down Expand Up @@ -577,6 +580,12 @@ setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {

#' @rdname write.jdbc
#' @export
setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
Expand Down Expand Up @@ -751,6 +760,10 @@ setGeneric("bin", function(x) { standardGeneric("bin") })
#' @export
setGeneric("bitwiseNOT", function(x) { standardGeneric("bitwiseNOT") })

#' @rdname bround
#' @export
setGeneric("bround", function(x, ...) { standardGeneric("bround") })

#' @rdname cbrt
#' @export
setGeneric("cbrt", function(x) { standardGeneric("cbrt") })
Expand Down
11 changes: 11 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,14 @@ convertToJSaveMode <- function(mode) {
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)

varargsToJProperties <- function(...) {
pairs <- list(...)
props <- newJObject("java.util.Properties")
if (length(pairs) > 0) {
lapply(ls(pairs), function(k) {
callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ test_that("Check masked functions", {
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window")
"summary", "transform", "drop", "window", "")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,11 @@ test_that("column functions", {
expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
expect_equal(collect(select(df, last("age")))[[1]], 19)
expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19)

# Test bround()
df <- createDataFrame(sqlContext, data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

test_that("column binary mathfunctions", {
Expand Down Expand Up @@ -1863,6 +1868,9 @@ test_that("Method as a synonym for collect()", {
expect_equal(, collect(irisDF))
irisDF2 <- irisDF[irisDF$Species == "setosa", ]
expect_equal(, collect(irisDF2))

# Make sure in the R base package is not covered
expect_that(, 2)), not(throws_error()))

test_that("attach() on a DataFrame", {
Expand Down
24 changes: 24 additions & 0 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,27 @@ test_that("cleanClosure on R functions", {
expect_equal(ls(env), "aBroadcast")
expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)

test_that("varargsToJProperties", {
jprops <- newJObject("java.util.Properties")
expect_true(class(jprops) == "jobj")

jprops <- varargsToJProperties(abc = "123")
expect_true(class(jprops) == "jobj")
expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")

jprops <- varargsToJProperties(abc = "abc", b = 1)
expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
expect_equal(callJMethod(jprops, "getProperty", "b"), "1")

jprops <- varargsToJProperties()
expect_equal(callJMethod(jprops, "size"), 0L)

test_that("convertToJSaveMode", {
s <- convertToJSaveMode("error")
expect_true(class(s) == "jobj")
expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
* from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
* from Spark's IndexShuffleBlockResolver.
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
Expand Down Expand Up @@ -185,8 +185,6 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {

if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
} else if ("hash".equals(executor.shuffleManager)) {
return getHashBasedShuffleBlockData(executor, blockId);
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle manager: " + executor.shuffleManager);
Expand Down Expand Up @@ -250,15 +248,6 @@ private void deleteExecutorDirs(String[] dirs) {

* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockResolver.
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());

* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "". This logic is from IndexShuffleBlockResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
/** Number of subdirectories created within each localDir. */
public final int subDirsPerLocalDir;
/** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
/** Shuffle manager (SortShuffleManager) that the executor is using. */
public final String shuffleManager;

Expand Down

0 comments on commit d16b5f3

Please sign in to comment.