Skip to content

Commit

Permalink
Merge pull request #238 from sun-rui/SPARKR-154_4
Browse files Browse the repository at this point in the history
[SPARKR-154] Phase 3: implement intersection().
  • Loading branch information
shivaram authored and Davies Liu committed Apr 14, 2015
1 parent c9497a3 commit ba54e34
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 1 deletion.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exportMethods(
"fullOuterJoin",
"glom",
"groupByKey",
"intersection",
"join",
"keyBy",
"keys",
Expand Down
37 changes: 37 additions & 0 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1560,3 +1560,40 @@ setMethod("zipRDD",

PipelinedRDD(zippedRDD, partitionFunc)
})

#' Intersection of this RDD and another one.
#'
#' Return the intersection of this RDD and another one.
#' The output will not contain any duplicate elements,
#' even if the input RDDs did. Performs a hash partition
#' across the cluster.
#' Note that this method performs a shuffle internally.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
#' @rdname intersection
#' @aliases intersection,RDD
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR::numPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

filterFunction <- function(elem) {
iters <- elem[[2]]
all(as.vector(
lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
}

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
6 changes: 5 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") }


############ Binary Functions #############

#' @rdname countByKey
#' @export
setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
Expand All @@ -242,6 +241,11 @@ setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
#' @export
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
standardGeneric("intersection") })

#' @rdname keys
#' @export
setGeneric("keys", function(x) { standardGeneric("keys") })
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,22 @@ test_that("zipRDD() on RDDs", {
unlink(fileName)
})

test_that("intersection() on RDDs", {
# intersection with self
actual <- collect(intersection(rdd, rdd))
expect_equal(sort(as.integer(actual)), nums)

# intersection with an empty RDD
emptyRdd <- parallelize(sc, list())
actual <- collect(intersection(rdd, emptyRdd))
expect_equal(actual, list())

rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
actual <- collect(intersection(rdd1, rdd2))
expect_equal(sort(as.integer(actual)), 1:3)
})

test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
Expand Down
38 changes: 38 additions & 0 deletions pkg/man/intersection.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{intersection,RDD,RDD-method}
\alias{intersection}
\alias{intersection,RDD}
\alias{intersection,RDD,RDD-method}
\title{Intersection of this RDD and another one.}
\usage{
\S4method{intersection}{RDD,RDD}(x, other,
numPartitions = SparkR::numPartitions(x))

intersection(x, other, numPartitions = 1L)
}
\arguments{
\item{x}{An RDD.}

\item{other}{An RDD.}
}
\value{
An RDD which is the intersection of these two RDDs.
}
\description{
Return the intersection of this RDD and another one.
The output will not contain any duplicate elements,
even if the input RDDs did. Performs a hash partition
across the cluster.
Note that this method performs a shuffle internally.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
# list(1, 2, 3)
}
}

0 comments on commit ba54e34

Please sign in to comment.