Skip to content

Commit

Permalink
Merge pull request apache#112 from sun-rui/outer_join
Browse files Browse the repository at this point in the history
Add leftOuterJoin() and rightOuterJoin() to the RDD class.
  • Loading branch information
concretevitamin committed Dec 3, 2014
2 parents ba01358 + 5c8e46e commit d0347ce
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 17 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ exportMethods(
"lapply",
"lapplyPartition",
"lapplyPartitionsWithIndex",
"leftOuterJoin",
"lookup",
"map",
"mapPartitions",
Expand All @@ -37,6 +38,7 @@ exportMethods(
"persist",
"reduce",
"reduceByKey",
"rightOuterJoin",
"sampleRDD",
"take",
"takeSample",
Expand Down
175 changes: 161 additions & 14 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1483,8 +1483,13 @@ setMethod("unionRDD",

#' Join two RDDs
#'
#' @param rdd1 An RDD.
#' @param rdd2 An RDD.
#' This function joins two RDDs where every element is of the form list(K, V).
#' The key types of the two RDDs should be the same.
#'
#' @param rdd1 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param rdd2 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param numPartitions Number of partitions to create.
#' @return a new RDD containing all pairs of elements with matching keys in
#' two input RDDs.
Expand All @@ -1504,29 +1509,171 @@ setGeneric("join", function(rdd1, rdd2, numPartitions) { standardGeneric("join")
setMethod("join",
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
function(rdd1, rdd2, numPartitions) {
if (rdd1@env$serialized != rdd2@env$serialized) {
# One of the RDDs is not serialized, we need to serialize it first.
if (!rdd1@env$serialized) {
rdd1 <- reserialize(rdd1)
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
length(t1) <- index1 - 1
length(t2) <- index2 - 1

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
})

#' Left outer join two RDDs
#'
#' This function left-outer-joins two RDDs where every element is of the form list(K, V).
#' The key types of the two RDDs should be the same.
#'
#' @param rdd1 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param rdd2 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param numPartitions Number of partitions to create.
#' @return For each element (k, v) in rdd1, the resulting RDD will either contain
#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
#' if no elements in rdd2 have key k.
#' @rdname leftOuterJoin
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
#' leftOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
#'}
setGeneric("leftOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("leftOuterJoin") })

#' @rdname leftOuterJoin
#' @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
function(rdd1, rdd2, numPartitions) {
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
length(t1) <- index1 - 1
len2 <- index2 - 1
if (len2 == 0) {
t2 <- list(NULL)
} else {
rdd2 <- reserialize(rdd2)
length(t2) <- len2
}

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
}
result
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
})

#' Right outer join two RDDs
#'
#' This function right-outer-joins two RDDs where every element is of the form list(K, V).
#' The key types of the two RDDs should be the same.
#'
#' @param rdd1 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param rdd2 An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param numPartitions Number of partitions to create.
#' @return For each element (k, w) in rdd2, the resulting RDD will either contain
#' all pairs (k, (v, w)) for (k, v) in rdd1, or the pair (k, (NULL, w))
#' if no elements in rdd1 have key k.
#' @rdname rightOuterJoin
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' rightOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
#'}
setGeneric("rightOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("rightOuterJoin") })

#' @rdname rightOuterJoin
#' @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
function(rdd1, rdd2, numPartitions) {
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- Filter(function(x) { x[[1]] == 1L }, v)
t1 <- lapply(t1, function(x) { x[[2]] })
t2 <- Filter(function(x) { x[[1]] == 2L }, v)
t2 <- lapply(t2, function(x) { x[[2]] })
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
len1 <- index1 - 1
if (len1 == 0) {
t1 <- list(NULL)
} else {
length(t1) <- len1
}
length(t2) <- index2 - 1

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1L
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1L
index <- index + 1
}
}
result
Expand Down
54 changes: 54 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -268,5 +268,59 @@ test_that("join() on pairwise RDDs", {
rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
actual <- collect(join(rdd1, rdd2, 2L))
expect_equal(actual, list(list("a", list(1, 2)), list("a", list(1, 3))))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(join(rdd1, rdd2, 2L))
expect_equal(actual, list())

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(join(rdd1, rdd2, 2L))
expect_equal(actual, list())
})

test_that("leftOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3))))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list(1, list(1, NULL)), list(2, list(2, NULL))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list("b", list(2, NULL)), list("a", list(1, NULL))))
})

test_that("rightOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))))

rdd1 <- parallelize(sc, list(list("a",2), list("a",3)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1))))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list(3, list(NULL, 3)), list(4, list(NULL, 4))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
expect_equal(actual, list(list("d", list(NULL, 4)), list("c", list(NULL, 3))))
})

9 changes: 6 additions & 3 deletions pkg/man/join.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ join(rdd1, rdd2, numPartitions)
\S4method{join}{RDD,RDD,integer}(rdd1, rdd2, numPartitions)
}
\arguments{
\item{rdd1}{An RDD.}
\item{rdd1}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{rdd2}{An RDD.}
\item{rdd2}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{numPartitions}{Number of partitions to create.}
}
Expand All @@ -22,7 +24,8 @@ a new RDD containing all pairs of elements with matching keys in
two input RDDs.
}
\description{
Join two RDDs
This function joins two RDDs where every element is of the form list(K, V).
The key types of the two RDDs should be the same.
}
\examples{
\dontrun{
Expand Down
39 changes: 39 additions & 0 deletions pkg/man/leftOuterJoin.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{leftOuterJoin}
\alias{leftOuterJoin}
\alias{leftOuterJoin,RDD,RDD,integer-method}
\alias{leftOuterJoin,RDD,RDD-method}
\title{Left outer join two RDDs}
\usage{
leftOuterJoin(rdd1, rdd2, numPartitions)

\S4method{leftOuterJoin}{RDD,RDD,integer}(rdd1, rdd2, numPartitions)
}
\arguments{
\item{rdd1}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{rdd2}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{numPartitions}{Number of partitions to create.}
}
\value{
For each element (k, v) in rdd1, the resulting RDD will either contain
all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
if no elements in rdd2 have key k.
}
\description{
This function left-outer-joins two RDDs where every element is of the form list(K, V).
The key types of the two RDDs should be the same.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
leftOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
}
}

39 changes: 39 additions & 0 deletions pkg/man/rightOuterJoin.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{rightOuterJoin}
\alias{rightOuterJoin}
\alias{rightOuterJoin,RDD,RDD,integer-method}
\alias{rightOuterJoin,RDD,RDD-method}
\title{Right outer join two RDDs}
\usage{
rightOuterJoin(rdd1, rdd2, numPartitions)

\S4method{rightOuterJoin}{RDD,RDD,integer}(rdd1, rdd2, numPartitions)
}
\arguments{
\item{rdd1}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{rdd2}{An RDD to be joined. Should be an RDD where each element is
list(K, V).}

\item{numPartitions}{Number of partitions to create.}
}
\value{
For each element (k, w) in rdd2, the resulting RDD will either contain
all pairs (k, (v, w)) for (k, v) in rdd1, or the pair (k, (NULL, w))
if no elements in rdd1 have key k.
}
\description{
This function right-outer-joins two RDDs where every element is of the form list(K, V).
The key types of the two RDDs should be the same.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rightOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
}
}

0 comments on commit d0347ce

Please sign in to comment.