Skip to content

Commit

Permalink
Merge pull request #237 from sun-rui/SPARKR-154_3
Browse files Browse the repository at this point in the history
[SPARKR-154] Phase 2: implement cartesian().
  • Loading branch information
Davies Liu committed Apr 14, 2015
1 parent 1bdcb63 commit ae78312
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 62 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ exportMethods(
"aggregateByKey",
"aggregateRDD",
"cache",
"cartesian",
"checkpoint",
"coalesce",
"cogroup",
Expand Down
98 changes: 36 additions & 62 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1496,69 +1496,43 @@ setMethod("zipRDD",
stop("Can only zip RDDs which have the same number of partitions.")
}

if (getSerializedMode(x) != getSerializedMode(other) ||
getSerializedMode(x) == "byte") {
# Append the number of elements in each partition to that partition so that we can later
# check if corresponding partitions of both RDDs have the same number of elements.
#
# Note that this appending also serves the purpose of reserialization, because even if
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
# may be encoded as multiple byte arrays.
appendLength <- function(part) {
part[[length(part) + 1]] <- length(part) + 1
part
}
x <- lapplyPartition(x, appendLength)
other <- lapplyPartition(other, appendLength)
}

zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
# The zippedRDD's elements are of scala Tuple2 type. The serialized
# flag Here is used for the elements inside the tuples.
serializerMode <- getSerializedMode(x)
zippedRDD <- RDD(zippedJRDD, serializerMode)

partitionFunc <- function(split, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
lengthOfValues <- part[[len]]
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# check if corresponding partitions of both RDDs have the same number of elements.
if (lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
}

if (lengthOfKeys > 1) {
keys <- part[1 : (lengthOfKeys - 1)]
values <- part[(lengthOfKeys + 1) : (len - 1)]
} else {
keys <- list()
values <- list()
}
} else {
# Keys, values must have same length here, because this has
# been validated inside the JavaRDD.zip() function.
keys <- part[c(TRUE, FALSE)]
values <- part[c(FALSE, TRUE)]
}
mapply(
function(k, v) {
list(k, v)
},
keys,
values,
SIMPLIFY = FALSE,
USE.NAMES = FALSE)
} else {
part
}
}
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

mergePartitions(rdd, TRUE)
})

PipelinedRDD(zippedRDD, partitionFunc)
#' Cartesian product of this RDD and another one.
#'
#' Return the Cartesian product of this RDD and another one,
#' that is, the RDD of all pairs of elements (a, b) where a
#' is in this and b is in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

mergePartitions(rdd, FALSE)
})

#' Subtract an RDD with another RDD.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
#' @export
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })

#' @rdname cartesian
#' @export
setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") })

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
Expand Down
80 changes: 80 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,83 @@ cleanClosure <- function(func, checkedFuncs = new.env()) {
}
func
}

# Append partition lengths to each partition in two input RDDs if needed.
# param
# x An RDD.
# Other An RDD.
# return value
# A list of two result RDDs.
appendPartitionLengths <- function(x, other) {
if (getSerializedMode(x) != getSerializedMode(other) ||
getSerializedMode(x) == "byte") {
# Append the number of elements in each partition to that partition so that we can later
# know the boundary of elements from x and other.
#
# Note that this appending also serves the purpose of reserialization, because even if
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
# may be encoded as multiple byte arrays.
appendLength <- function(part) {
len <- length(part)
part[[len + 1]] <- len + 1
part
}
x <- lapplyPartition(x, appendLength)
other <- lapplyPartition(other, appendLength)
}
list (x, other)
}

# Perform zip or cartesian between elements from two RDDs in each partition
# param
# rdd An RDD.
# zip A boolean flag indicating this call is for zip operation or not.
# return value
# A result RDD.
mergePartitions <- function(rdd, zip) {
serializerMode <- getSerializedMode(rdd)
partitionFunc <- function(split, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
lengthOfValues <- part[[len]]
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
}

if (lengthOfKeys > 1) {
keys <- part[1 : (lengthOfKeys - 1)]
} else {
keys <- list()
}
if (lengthOfValues > 1) {
values <- part[(lengthOfKeys + 1) : (len - 1)]
} else {
values <- list()
}

if (!zip) {
return(mergeCompactLists(keys, values))
}
} else {
keys <- part[c(TRUE, FALSE)]
values <- part[c(FALSE, TRUE)]
}
mapply(
function(k, v) { list(k, v) },
keys,
values,
SIMPLIFY = FALSE,
USE.NAMES = FALSE)
} else {
part
}
}

PipelinedRDD(rdd, partitionFunc)
}
43 changes: 43 additions & 0 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,49 @@ test_that("zipRDD() on RDDs", {
unlink(fileName)
})

test_that("cartesian() on RDDs", {
rdd <- parallelize(sc, 1:3)
actual <- collect(cartesian(rdd, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(1, 1), list(1, 2), list(1, 3),
list(2, 1), list(2, 2), list(2, 3),
list(3, 1), list(3, 2), list(3, 3)))

# test case where one RDD is empty
emptyRdd <- parallelize(sc, list())
actual <- collect(cartesian(rdd, emptyRdd))
expect_equal(actual, list())

mockFile = c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

rdd <- textFile(sc, fileName)
actual <- collect(cartesian(rdd, rdd))
expected <- list(
list("Spark is awesome.", "Spark is pretty."),
list("Spark is awesome.", "Spark is awesome."),
list("Spark is pretty.", "Spark is pretty."),
list("Spark is pretty.", "Spark is awesome."))
expect_equal(sortKeyValueList(actual), expected)

rdd1 <- parallelize(sc, 0:1)
actual <- collect(cartesian(rdd1, rdd))
expect_equal(sortKeyValueList(actual),
list(
list(0, "Spark is pretty."),
list(0, "Spark is awesome."),
list(1, "Spark is pretty."),
list(1, "Spark is awesome.")))

rdd1 <- map(rdd, function(x) { x })
actual <- collect(cartesian(rdd, rdd1))
expect_equal(sortKeyValueList(actual), expected)

unlink(fileName)
})

test_that("subtract() on RDDs", {
l <- list(1, 1, 2, 2, 3, 4)
rdd1 <- parallelize(sc, l)
Expand Down
33 changes: 33 additions & 0 deletions pkg/man/cartesian.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{cartesian,RDD,RDD-method}
\alias{cartesian}
\alias{cartesian,RDD,RDD-method}
\title{Cartesian product of this RDD and another one.}
\usage{
\S4method{cartesian}{RDD,RDD}(x, other)

cartesian(x, other)
}
\arguments{
\item{x}{An RDD.}

\item{other}{An RDD.}
}
\value{
A new RDD which is the Cartesian product of these two RDDs.
}
\description{
Return the Cartesian product of this RDD and another one,
that is, the RDD of all pairs of elements (a, b) where a
is in this and b is in other.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:2)
sortByKey(cartesian(rdd, rdd))
# list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
}
}

0 comments on commit ae78312

Please sign in to comment.