From f4b16380490b9ef6ea9737d4c9d8bf8d8952f580 Mon Sep 17 00:00:00 2001 From: Hossein Date: Wed, 5 Oct 2016 23:58:11 -0700 Subject: [PATCH] Adjusting number of slices according to size --- R/pkg/R/context.R | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index c3bedb96a9ebe..7ed2cac9d626a 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -126,6 +126,13 @@ parallelize <- function(sc, coll, numSlices = 1) { if (numSlices > length(coll)) numSlices <- length(coll) + sizeLimit <- .Machine$integer.max - 10240 # Safe margin bellow maximum allocation limit + objectSize <- object.size(coll) + + # For large objects we make sure the size of each slice is also smaller than sizeLimit + numSlices <- max(numSlices, ceiling(objectSize / sizeLimit)) + + sliceLen <- ceiling(length(coll) / numSlices) slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)]) @@ -136,8 +143,7 @@ parallelize <- function(sc, coll, numSlices = 1) { # The PRC backend cannot handle arguments larger than 2GB (INT_MAX) # If serialized data is safely less than that threshold we send it over the PRC channel. # Otherwise, we write it to a file and send the file name - sizeLimit <- .Machine$integer.max - 1024 # Safe margin bellow maximum allocation limit - if (object.size(serializedSlices) < sizeLimit) { + if (objectSize < sizeLimit) { jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices) } else { fileName <- writeToTempFile(serializedSlices)