Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REL-505 merge Apache branch-1.1 bug fixes and add new ByteswapPartitioner #27

Closed
wants to merge 78 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
b528367
Revert "[SPARK-4075] [Deploy] Jar url validation is not enough for Ja…
Nov 17, 2014
cf8d0ef
Revert "[maven-release-plugin] prepare for next development iteration"
Nov 17, 2014
e4f5695
Revert "[maven-release-plugin] prepare release v1.1.1-rc1"
Nov 17, 2014
aa9ebda
[SPARK-4467] Partial fix for fetch failure in sort-based shuffle (1.1)
Nov 18, 2014
91b5fa8
[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTa…
sarutak Nov 18, 2014
ae9b1f6
[SPARK-4433] fix a racing condition in zipWithIndex
mengxr Nov 19, 2014
f9739b9
[SPARK-4468][SQL] Backports #3334 to branch-1.1
liancheng Nov 19, 2014
e22a759
[SPARK-4380] Log more precise number of bytes spilled (1.1)
Nov 19, 2014
1713c7e
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Nov 19, 2014
16bf5f3
[SPARK-4480] Avoid many small spills in external data structures (1.1)
Nov 19, 2014
aa3c794
Update CHANGES.txt for 1.1.1-rc2
andrewor14 Nov 19, 2014
3693ae5
[maven-release-plugin] prepare release v1.1.1-rc2
andrewor14 Nov 19, 2014
1df1c1d
[maven-release-plugin] prepare for next development iteration
andrewor14 Nov 19, 2014
b838cef
Merge tag 'v1.1.1-rc2' of github.com:apache/spark into csd-1.1
markhamstra Nov 24, 2014
1b2b7dd
Fixed merge typo
markhamstra Nov 24, 2014
6371737
Update versions to 1.1.2-SNAPSHOT
Nov 24, 2014
7aa592c
[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDS…
tdas Nov 25, 2014
1a7f414
[HOTFIX] Fixing broken build due to missing imports.
tdas Nov 25, 2014
a59c445
[Release] Automate generation of contributors list
Nov 27, 2014
f8a4fd3
[BRANCH-1.1][SPARK-4626] Kill a task only if the executorId is (still…
roxchkplusony Nov 28, 2014
24b5c03
[SPARK-4597] Use proper exception and reset variable in Utils.createT…
viirya Nov 29, 2014
1a2508b
SPARK-2143 [WEB UI] Add Spark version to UI footer
srowen Nov 30, 2014
90d90b2
[HOTFIX] Fix build break in 1a2508b73f6d46a0faf7740b85a5c216c925c25a
JoshRosen Nov 30, 2014
91eadd2
[DOC] Fixes formatting typo in SQL programming guide
liancheng Dec 1, 2014
f333e4f
[SPARK-4686] Link to allowed master URLs is broken
kayousterhout Dec 2, 2014
aec20af
[Release] Translate unknown author names automatically
Dec 3, 2014
e484b8a
[SPARK-4701] Typo in sbt/sbt
tsudukim Dec 3, 2014
af76954
[SPARK-4715][Core] Make sure tryToAcquire won't return a negative value
zsxwing Dec 3, 2014
3e3cd5a
[SPARK-4642] Add description about spark.yarn.queue to running-on-YAR…
tsudukim Dec 3, 2014
17dfd41
[SPARK-4498][core] Don't transition ExecutorInfo to RUNNING until Dri…
markhamstra Dec 3, 2014
6c53225
[Release] Correctly translate contributors name in release notes
Dec 4, 2014
5ac55c8
[SPARK-4253] Ignore spark.driver.host in yarn-cluster and standalone-…
WangTaoTheTonic Dec 4, 2014
d01fdd3
[SPARK-4745] Fix get_existing_cluster() function with multiple securi…
alexdebrie Dec 4, 2014
e98aa54
[SPARK-4459] Change groupBy type parameter from K to U
Dec 4, 2014
bf637e0
[SPARK-4652][DOCS] Add docs about spark-git-repo option
Lewuathe Dec 4, 2014
b09382a
[SPARK-4421] Wrong link in spark-standalone.html
tsudukim Dec 5, 2014
8ee2d18
Fix typo in Spark SQL docs.
andyk Dec 5, 2014
a290486
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Dec 5, 2014
16bc77b
[SPARK-4764] Ensure that files are fetched atomically
Dec 8, 2014
fe7d7a9
SPARK-3926 [CORE] Reopened: result of JavaRDD collectAsMap() is not s…
srowen Dec 9, 2014
7bf3aa3
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Dec 9, 2014
9b99237
[SPARK-4714] BlockManager.dropFromMemory() should check whether block…
suyanNone Dec 9, 2014
6dcafa7
[SPARK-4772] Clear local copies of accumulators as soon as we're done…
Dec 10, 2014
273f2c8
[SPARK-4771][Docs] Document standalone cluster supervise mode
Dec 10, 2014
396de67
[SPARK-4759] Fix driver hanging from coalescing partitions
Dec 10, 2014
0faea17
fixed spelling errors in documentation
peterklipfel Dec 14, 2014
fa3b3e3
SPARK-785 [CORE] ClosureCleaner not invoked on most PairRDDFunctions
srowen Dec 16, 2014
892685b
SPARK-4814 [CORE] Enable assertions in SBT, Maven tests / AssertionEr…
srowen Dec 16, 2014
581f866
[Release] Major improvements to generate contributors script
Dec 17, 2014
991748d
[Release] Cache known author translations locally
Dec 17, 2014
0efd691
[Release] Update contributors list format and sort it
Dec 17, 2014
c15e7f2
[HOTFIX] Fix RAT exclusion for known_translations file
JoshRosen Dec 17, 2014
bed4807
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Dec 18, 2014
f4e6ffc
[SPARK-4884]: Improve Partition docs
msiddalingaiah Dec 19, 2014
2d66463
SPARK-3428. TaskMetrics for running tasks is missing GC time metrics
sryza Dec 19, 2014
546a239
[SPARK-4896] don’t redundantly overwrite executor JAR deps
ryan-williams Dec 19, 2014
3597c2e
SPARK-2641: Passing num executors to spark arguments from properties …
Dec 20, 2014
e5f2752
[Minor] Build Failed: value defaultProperties not found
SaintBacchus Dec 20, 2014
3bce43f
[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join
zsxwing Dec 22, 2014
b1de461
[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-regi…
ilayaperumalg Dec 23, 2014
dd0287c
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
Dec 24, 2014
d21347d
[SPARK-4537][Streaming] Expand StreamingSource to add more metrics
jerryshao Dec 26, 2014
3442b7b
[SPARK-4952][Core]Handle ConcurrentModificationExceptions in SparkEnv…
witgo Dec 27, 2014
d5e0a45
[HOTFIX] Add SPARK_VERSION to Spark package object.
JoshRosen Dec 30, 2014
822a0b4
[SPARK-4882] Register PythonBroadcast with Kryo so that PySpark works…
JoshRosen Dec 30, 2014
d6b8d2c
Revert "[SPARK-4882] Register PythonBroadcast with Kryo so that PySpa…
JoshRosen Dec 30, 2014
eac740e
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handl…
zsxwing Dec 30, 2014
babcafa
[SPARK-1010] Clean up uses of System.setProperty in unit tests
JoshRosen Dec 31, 2014
08d4f70
[SPARK-4298][Core] - The spark-submit cannot read Main-Class from Man…
Dec 31, 2014
1034707
[HOTFIX] Disable Spark UI in SparkSubmitSuite tests
JoshRosen Dec 12, 2014
61eb9be
[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializ…
JoshRosen Jan 1, 2015
c532acf
[HOTFIX] Bind web UI to ephemeral port in DriverSuite
JoshRosen Jan 1, 2015
6d1ca23
[SPARK-4787] Stop SparkContext if a DAGScheduler init error occurs
tigerquoll Jan 4, 2015
55325af
[SPARK-5132][Core]Correct stage Attempt Id key in stageInfofromJson
suyanNone Jan 7, 2015
c07a691
Merge branch 'branch-1.1' of github.com:apache/spark into csd-1.1
markhamstra Jan 9, 2015
677281e
Added ByteswapPartitioner and made it the defaultPartitioner
markhamstra Jan 2, 2015
470f026
Added spark.default.partitioner to conf
markhamstra Jan 5, 2015
99910b1
spark.default.partitioner docs
markhamstra Jan 6, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.ipr
*.iml
*.iws
*.pyc
.idea/
sbt/*.jar
.settings
Expand Down Expand Up @@ -45,7 +46,9 @@ dependency-reduced-pom.xml
checkpoint
derby.log
dist/
spark-*-bin.tar.gz
dev/create-release/*txt
dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
rat-results.txt
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
65 changes: 65 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,71 @@ Spark Change Log

Release 1.1.1

[SPARK-4480] Avoid many small spills in external data structures (1.1)
Andrew Or <andrew@databricks.com>
2014-11-19 10:45:42 -0800
Commit: 16bf5f3, github.com/apache/spark/pull/3354

[SPARK-4380] Log more precise number of bytes spilled (1.1)
Andrew Or <andrew@databricks.com>
2014-11-18 20:15:00 -0800
Commit: e22a759, github.com/apache/spark/pull/3355

[SPARK-4468][SQL] Backports #3334 to branch-1.1
Cheng Lian <lian@databricks.com>
2014-11-18 17:40:24 -0800
Commit: f9739b9, github.com/apache/spark/pull/3338

[SPARK-4433] fix a racing condition in zipWithIndex
Xiangrui Meng <meng@databricks.com>
2014-11-18 16:25:44 -0800
Commit: ae9b1f6, github.com/apache/spark/pull/3291

[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer (For branch-1.1)
Kousuke Saruta <sarutak@oss.nttdata.co.jp>
2014-11-18 12:09:18 -0800
Commit: 91b5fa8, github.com/apache/spark/pull/3321

[SPARK-4467] Partial fix for fetch failure in sort-based shuffle (1.1)
Andrew Or <andrew@databricks.com>
2014-11-17 18:10:49 -0800
Commit: aa9ebda, github.com/apache/spark/pull/3330

Revert "[SPARK-4075] [Deploy] Jar url validation is not enough for Jar file"
Andrew Or <andrew@databricks.com>
2014-11-17 11:25:38 -0800
Commit: b528367

[branch-1.1][SPARK-4355] OnlineSummarizer doesn't merge mean correctly
Xiangrui Meng <meng@databricks.com>
2014-11-13 15:36:03 -0800
Commit: 4b1c77c, github.com/apache/spark/pull/3251

[Release] Correct make-distribution.sh log path
Andrew Or <andrew@databricks.com>
2014-11-12 13:46:26 -0800
Commit: ba6d81d

[Release] Bring audit scripts up-to-date
Andrew Or <andrewor14@gmail.com>
2014-11-13 00:30:58 +0000
Commit: 88bc482

[Release] Log build output for each distribution
Andrew Or <andrew@databricks.com>
2014-11-11 18:02:59 -0800
Commit: e3a5ee9

Revert "SPARK-3039: Allow spark to be built using avro-mapred for hadoop2"
Andrew Or <andrew@databricks.com>
2014-11-12 00:04:30 -0800
Commit: 45a01b6

Update CHANGES.txt
Andrew Or <andrewor14@gmail.com>
2014-11-11 23:11:32 +0000
Commit: 131c626

[SPARK-4295][External]Fix exception in SparkSinkSuite
maji2014 <maji3@asiainfo.com>
2014-11-11 02:18:27 -0800
Expand Down
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.1-candidate-csd-4-SNAPSHOT</version>
<version>1.1.2-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.1-candidate-csd-4-SNAPSHOT</version>
<version>1.1.2-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.1-candidate-csd-4-SNAPSHOT</version>
<version>1.1.2-candidate-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -246,10 +247,12 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

def newId: Long = synchronized {
Expand All @@ -261,22 +264,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.remove(Thread.currentThread)
localAccums.get.clear
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark

/**
* A partition of an RDD.
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* Get the split's index within its parent RDD
* Get the partition's index within its parent RDD
*/
def index: Int

Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val shortPartitionerNames = Map(
"hash" -> "org.apache.spark.HashPartitioner",
"byteswap" -> "org.apache.spark.ByteswapPartitioner"
)
val defaultPartitionerName = rdd.conf.get("spark.default.partitioner", "hash")
val className =
shortPartitionerNames.getOrElse(defaultPartitionerName.toLowerCase, defaultPartitionerName)
val ctor = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
.getConstructor(classOf[Int])
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
ctor.newInstance(rdd.context.defaultParallelism: java.lang.Integer).asInstanceOf[Partitioner]
} else {
new HashPartitioner(bySize.head.partitions.size)
ctor.newInstance(bySize.head.partitions.size: java.lang.Integer).asInstanceOf[Partitioner]
}
}
}
Expand Down Expand Up @@ -93,6 +102,18 @@ class HashPartitioner(partitions: Int) extends Partitioner {
override def hashCode: Int = numPartitions
}

/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`. In order to spread-out hashCodes that are divisible by
* `numPartitions`, `byteswap32` is applied to the hashCodes before modding by `numPartitions`.
*/
class ByteswapPartitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(byteswap32(key.hashCode), numPartitions)
}
}

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,13 @@ class SparkContext(config: SparkConf) extends Logging {
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
Expand Down Expand Up @@ -1334,7 +1339,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.1.1"
private[spark] val SPARK_VERSION = "1.1.2-SNAPSHOT"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ object SparkEnv extends Logging {
val sparkProperties = (conf.getAll ++ schedulerMode).sorted

// System properties that are not java classpaths
val systemProperties = System.getProperties.iterator.toSeq
val systemProperties = Utils.getSystemProperties.toSeq
val otherProperties = systemProperties.filter { case (k, _) =>
k != "java.class.path" && !k.startsWith("spark.")
}.sorted
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
Expand All @@ -220,10 +221,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
}

/**
Expand Down Expand Up @@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
implicit val ctag: ClassTag[K] = fakeClassTag
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}

Expand Down
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.api.java

import com.google.common.base.Optional

import scala.collection.convert.Wrappers.MapWrapper
import java.{util => ju}
import scala.collection.mutable

private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
Expand All @@ -32,7 +33,64 @@ private[spark] object JavaUtils {
def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
new SerializableMapWrapper(underlying)

// Implementation is copied from scala.collection.convert.Wrappers.MapWrapper,
// but implements java.io.Serializable. It can't just be subclassed to make it
// Serializable since the MapWrapper class has no no-arg constructor. This class
// doesn't need a no-arg constructor though.
class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
extends MapWrapper(underlying) with java.io.Serializable
extends ju.AbstractMap[A, B] with java.io.Serializable { self =>

override def size = underlying.size

override def get(key: AnyRef): B = try {
underlying get key.asInstanceOf[A] match {
case None => null.asInstanceOf[B]
case Some(v) => v
}
} catch {
case ex: ClassCastException => null.asInstanceOf[B]
}

override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] {
def size = self.size

def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
val ui = underlying.iterator
var prev : Option[A] = None

def hasNext = ui.hasNext

def next() = {
val (k, v) = ui.next
prev = Some(k)
new ju.Map.Entry[A, B] {
import scala.util.hashing.byteswap32
def getKey = k
def getValue = v
def setValue(v1 : B) = self.put(k, v1)
override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) << 16)
override def equals(other: Any) = other match {
case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
case _ => false
}
}
}

def remove() {
prev match {
case Some(k) =>
underlying match {
case mm: mutable.Map[a, _] =>
mm remove k
prev = None
case _ =>
throw new UnsupportedOperationException("remove")
}
case _ =>
throw new IllegalStateException("next must be called at least once before remove")
}
}
}
}
}
}
Loading