Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB #15375

Closed
wants to merge 14 commits into from
36 changes: 34 additions & 2 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than 2GB (or INT_MAX bytes), the function
Copy link
Member

@felixcheung felixcheung Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be 200MB now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

#' will write it to disk and send the file name to JVM.
#'
#' @param sc SparkContext to use
#' @param coll collection to parallelize
#' @param numSlices number of partitions to create in the RDD
Expand Down Expand Up @@ -123,19 +126,48 @@ parallelize <- function(sc, coll, numSlices = 1) {
if (numSlices > length(coll))
numSlices <- length(coll)

sizeLimit <- as.numeric(sparkR.conf(
"spark.r.maxAllocationLimit",
toString(.Machine$integer.max / 2) # Default to a safe default: 200MB
))
objectSize <- object.size(coll)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the guess of size could easily be wrong, and writing them into disk is not that bad anyway, should we have a much smaller default value (for example, 100M)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


# For large objects we make sure the size of each slice is also smaller than sizeLimit
numSlices <- max(numSlices, ceiling(objectSize / sizeLimit))

sliceLen <- ceiling(length(coll) / numSlices)
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])

# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
# 2-tuples of raws
serializedSlices <- lapply(slices, serialize, connection = NULL)

jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
"createRDDFromArray", sc, serializedSlices)
# The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
# If serialized data is safely less than that threshold we send it over the PRC channel.
# Otherwise, we write it to a file and send the file name
if (objectSize < sizeLimit) {
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
} else {
fileName <- writeToTempFile(serializedSlices)
jrdd <- callJStatic(
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices))
file.remove(fileName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the JVM call throws an exception, I don't think this line will execute, perhaps wrap this in tryCatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done!

}

RDD(jrdd, "byte")
}

writeToTempFile <- function(serializedSlices) {
fileName <- tempfile()
conn <- file(fileName, "wb")
for (slice in serializedSlices) {
writeBin(as.integer(length(slice)), conn, endian = "big")
writeBin(slice, conn, endian = "big")
}
close(conn)
fileName
}

#' Include this specified package on all workers
#'
#' This function can be used to include a package on all workers before the
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ test_that("create DataFrame from RDD", {
unsetHiveContext()
})

test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")

df <- createDataFrame(iris)

expect_equal(dim(df), dim(iris))
})

test_that("read/write csv as DataFrame", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[r] class RBackendHandler(server: RBackend)
}
} catch {
case e: Exception =>
logError(s"$methodName on $objId failed")
logError(s"$methodName on $objId failed", e)
writeInt(dos, -1)
// Writing the error message of the cause for the exception. This will be returned
// to user in the R process.
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.PythonRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -140,4 +141,16 @@ private[r] object RRDD {
def createRDDFromArray(jsc: JavaSparkContext, arr: Array[Array[Byte]]): JavaRDD[Array[Byte]] = {
JavaRDD.fromRDD(jsc.sc.parallelize(arr, arr.length))
}

/**
* Create an RRDD given a temporary file name. This is used to create RRDD when parallelize is
* called on large R objects.
*
* @param fileName name of temporary file on driver machine
* @param parallelism number of slices defaults to 4
*/
def createRDDFromFile(jsc: JavaSparkContext, fileName: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
PythonRDD.readRDDFromFile(jsc, fileName, parallelism)
}
}