-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB #15375
Changes from 4 commits
140755c
f4b1638
8691569
8e065c1
4aab6cf
5cbbd7c
6ea7fb3
ef989c1
9df49ea
766d903
5524e75
9f103c6
62ab47b
836e874
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,9 @@ objectFile <- function(sc, path, minPartitions = NULL) { | |
#' in the list are split into \code{numSlices} slices and distributed to nodes | ||
#' in the cluster. | ||
#' | ||
#' If size of serialized slices is larger than 2GB (or INT_MAX bytes), the function | ||
#' will write it to disk and send the file name to JVM. | ||
#' | ||
#' @param sc SparkContext to use | ||
#' @param coll collection to parallelize | ||
#' @param numSlices number of partitions to create in the RDD | ||
|
@@ -123,19 +126,46 @@ parallelize <- function(sc, coll, numSlices = 1) { | |
if (numSlices > length(coll)) | ||
numSlices <- length(coll) | ||
|
||
sizeLimit <- as.numeric( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This number is not serialized anywhere. I think as.numeric is fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed, probably not a big deal, an user could set |
||
sparkR.conf("spark.r.maxAllocationLimit", toString(.Machine$integer.max - 10240))) | ||
objectSize <- object.size(coll) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the guess of size could easily be wrong, and writing them into disk is not that bad anyway, should we have a much smaller default value (for example, 100M)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
||
|
||
# For large objects we make sure the size of each slice is also smaller than sizeLimit | ||
numSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) | ||
|
||
sliceLen <- ceiling(length(coll) / numSlices) | ||
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)]) | ||
|
||
# Serialize each slice: obtain a list of raws, or a list of lists (slices) of | ||
# 2-tuples of raws | ||
serializedSlices <- lapply(slices, serialize, connection = NULL) | ||
|
||
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", | ||
"createRDDFromArray", sc, serializedSlices) | ||
# The PRC backend cannot handle arguments larger than 2GB (INT_MAX) | ||
# If serialized data is safely less than that threshold we send it over the PRC channel. | ||
# Otherwise, we write it to a file and send the file name | ||
if (objectSize < sizeLimit) { | ||
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices) | ||
} else { | ||
fileName <- writeToTempFile(serializedSlices) | ||
jrdd <- callJStatic( | ||
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)) | ||
file.remove(fileName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the JVM call throws an exception, I don't think this line will execute, perhaps wrap this in tryCatch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Done! |
||
} | ||
|
||
RDD(jrdd, "byte") | ||
} | ||
|
||
writeToTempFile <- function(serializedSlices) { | ||
fileName <- tempfile() | ||
conn <- file(fileName, "wb") | ||
for (slice in serializedSlices) { | ||
writeBin(as.integer(length(slice)), conn, endian = "big") | ||
writeBin(slice, conn, endian = "big") | ||
} | ||
close(conn) | ||
fileName | ||
} | ||
|
||
#' Include this specified package on all workers | ||
#' | ||
#' This function can be used to include a package on all workers before the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be 200MB now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.