Skip to content

Commit

Permalink
SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sect…
Browse files Browse the repository at this point in the history
…ions of CoGroupedRDD and PairRDDFunctions
  • Loading branch information
sryza committed Jul 18, 2014
1 parent 30b8d36 commit be10f8a
Showing 1 changed file with 25 additions and 36 deletions.
61 changes: 25 additions & 36 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
Expand Down Expand Up @@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}

/**
Expand All @@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.map(v => (v, None))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
}
}
}
Expand All @@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.map(w => (None, w))
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
} else {
for (v <- vs; w <- ws) yield (Some(v), w)
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
}
}
}
Expand Down Expand Up @@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { case (k, v) => map.put(k, v) }
data.foreach { pair => map.put(pair._1, pair._2) }
map
}

Expand Down Expand Up @@ -571,12 +571,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]],
w3s.asInstanceOf[Seq[W3]])
}
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2], Seq[W3])] }
}

/**
Expand All @@ -589,9 +584,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
cg.mapValues { pair => pair.asInstanceOf[(Seq[V], Seq[W])] }
}

/**
Expand All @@ -604,11 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2])] }
}

/**
Expand Down Expand Up @@ -712,8 +701,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
Expand Down Expand Up @@ -858,8 +847,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
val pair = iter.next()
writer.write(pair._1, pair._2)
}
} finally {
writer.close(hadoopContext)
Expand Down

0 comments on commit be10f8a

Please sign in to comment.