diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 450aacfb51718..920e1b7f598de 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -28,6 +28,7 @@ exportMethods( "fullOuterJoin", "glom", "groupByKey", + "intersection", "join", "keyBy", "keys", diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index eb970a09c0f17..125ea0110001a 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1560,3 +1560,40 @@ setMethod("zipRDD", PipelinedRDD(zippedRDD, partitionFunc) }) + +#' Intersection of this RDD and another one. +#' +#' Return the intersection of this RDD and another one. +#' The output will not contain any duplicate elements, +#' even if the input RDDs did. Performs a hash partition +#' across the cluster. +#' Note that this method performs a shuffle internally. +#' +#' @param x An RDD. +#' @param other An RDD. +#' @param numPartitions The number of partitions in the result RDD. +#' @return An RDD which is the intersection of these two RDDs. +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) +#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) +#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x })) +#' # list(1, 2, 3) +#'} +#' @rdname intersection +#' @aliases intersection,RDD +setMethod("intersection", + signature(x = "RDD", other = "RDD"), + function(x, other, numPartitions = SparkR::numPartitions(x)) { + rdd1 <- map(x, function(v) { list(v, NA) }) + rdd2 <- map(other, function(v) { list(v, NA) }) + + filterFunction <- function(elem) { + iters <- elem[[2]] + all(as.vector( + lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical")) + } + + keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction)) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index b7d03944431e8..99e377cda4817 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -233,7 +233,6 @@ setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") } ############ Binary Functions ############# - #' @rdname countByKey #' @export setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) @@ -242,6 +241,11 @@ setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) #' @export setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") }) +#' @rdname intersection +#' @export +setGeneric("intersection", function(x, other, numPartitions = 1L) { + standardGeneric("intersection") }) + #' @rdname keys #' @export setGeneric("keys", function(x) { standardGeneric("keys") }) diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R index f75e0817b9406..fc0607ae1de60 100644 --- a/R/pkg/inst/tests/test_rdd.R +++ b/R/pkg/inst/tests/test_rdd.R @@ -468,6 +468,22 @@ test_that("zipRDD() on RDDs", { unlink(fileName) }) +test_that("intersection() on RDDs", { + # intersection with self + actual <- collect(intersection(rdd, rdd)) + expect_equal(sort(as.integer(actual)), nums) + + # intersection with an empty RDD + emptyRdd <- parallelize(sc, list()) + actual <- collect(intersection(rdd, emptyRdd)) + expect_equal(actual, list()) + + rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) + rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) + actual <- collect(intersection(rdd1, rdd2)) + expect_equal(sort(as.integer(actual)), 1:3) +}) + test_that("join() on pairwise RDDs", { rdd1 <- parallelize(sc, list(list(1,1), list(2,4))) rdd2 <- parallelize(sc, list(list(1,2), list(1,3))) diff --git a/pkg/man/intersection.Rd b/pkg/man/intersection.Rd new file mode 100644 index 0000000000000..7ec6debb3dc08 --- /dev/null +++ b/pkg/man/intersection.Rd @@ -0,0 +1,38 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{intersection,RDD,RDD-method} +\alias{intersection} +\alias{intersection,RDD} +\alias{intersection,RDD,RDD-method} +\title{Intersection of this RDD and another one.} +\usage{ +\S4method{intersection}{RDD,RDD}(x, other, + numPartitions = SparkR::numPartitions(x)) + +intersection(x, other, numPartitions = 1L) +} +\arguments{ +\item{x}{An RDD.} + +\item{other}{An RDD.} +} +\value{ +An RDD which is the intersection of these two RDDs. +} +\description{ +Return the intersection of this RDD and another one. +The output will not contain any duplicate elements, +even if the input RDDs did. Performs a hash partition +across the cluster. +Note that this method performs a shuffle internally. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) +rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) +collect(sortBy(intersection(rdd1, rdd2), function(x) { x })) +# list(1, 2, 3) +} +} +