Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-c…
Browse files Browse the repository at this point in the history
…heck
  • Loading branch information
jkbradley committed Aug 18, 2014
2 parents dafebe2 + 3abd0c1 commit ea5c047
Show file tree
Hide file tree
Showing 16 changed files with 613 additions and 108 deletions.
41 changes: 14 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private[spark] class Worker(
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
var activeMasterUrl: String = ""
Expand Down Expand Up @@ -145,18 +144,16 @@ private[spark] class Worker(
}

def changeMaster(url: String, uiUrl: String) {
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
}

def tryRegisterAllMasters() {
Expand Down Expand Up @@ -199,9 +196,7 @@ private[spark] class Worker(
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
if (connected) { master ! Heartbeat(workerId) }

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
Expand Down Expand Up @@ -244,27 +239,21 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
}
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
Expand Down Expand Up @@ -330,9 +319,7 @@ private[spark] class Worker(
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
// Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
Expand All @@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
val increment = (max-min)/bucketCount.toDouble
val range = if (increment != 0) {
Range.Double.inclusive(min, max, increment)
val range = if (min != max) {
// Range.Double.inclusive(min, max, increment)
// The above code doesn't always work. See Scala bug #SI-8782.
// https://issues.scala-lang.org/browse/SI-8782
customRange(min, max, bucketCount)
} else {
List(min, min)
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithoutBucketsForLargerDatasets") {
// Verify the case of slighly larger datasets
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(8)
val expectedHistogramResults =
Array(12, 12, 11, 12, 12, 11, 12, 12)
val expectedHistogramBuckets =
Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithoutBucketsWithIrrationalBucketEdges") {
// Verify the case of buckets with irrational edges. See #SPARK-2862.
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(9)
val expectedHistogramResults =
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets(0) === 6.0)
assert(histogramBuckets(9) === 99.0)
}

// Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") {
// infinity
Expand Down
5 changes: 5 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.

You may also use the beeline script comes with Hive.

To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
users can set the `spark.sql.thriftserver.scheduler.pool` variable:

SET spark.sql.thriftserver.scheduler.pool=accounting;

### Migration Guide for Shark Users

#### Reducer number
Expand Down
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ object SQL {
object Hive {

lazy val settings = Seq(

javaOptions += "-XX:MaxPermSize=1g",
// Multiple queries rely on the TestHive singleton. See comments there for more details.
parallelExecution in Test := false,
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children = child :: Nil
def execute() = child.execute()
}
Loading

0 comments on commit ea5c047

Please sign in to comment.