Skip to content

Commit

Permalink
Fixed merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
djalova committed Oct 20, 2015
2 parents 9d66e81 + 67d468f commit aa87c40
Show file tree
Hide file tree
Showing 236 changed files with 4,147 additions and 3,097 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ setClass("PipelineModel", representation(model = "jobj"))
#' summary(model)
#'}
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0) {
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
solver = "auto") {
family <- match.arg(family)
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", deparse(formula), data@sdf, family, lambda,
alpha)
alpha, solver)
return(new("PipelineModel", model = model))
})

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test_that("feature interaction vs native glm", {

test_that("summary coefficients match with native glm", {
training <- createDataFrame(sqlContext, iris)
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "l-bfgs"))
coefs <- as.vector(stats$coefficients)
rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)))
expect_true(all(abs(rCoefs - coefs) < 1e-6))
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.4</version>
<version>4.9</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
Expand All @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.2.1</version>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.Private
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
@DeveloperApi
@Private
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ private[spark] object SparkConf extends Logging {
"spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4")),
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5"))
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
)

/**
Expand Down
22 changes: 6 additions & 16 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

val startTime = System.currentTimeMillis()

private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
Expand All @@ -116,16 +111,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param config a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
* @param preferredNodeLocationData not used. Left for backward compatibility.
*/
@deprecated("Passing in preferred locations has no effect at all, see SPARK-8949", "1.5.0")
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
Expand All @@ -147,10 +139,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
* @param preferredNodeLocationData not used. Left for backward compatibility.
*/
@deprecated("Passing in preferred locations has no effect at all, see SPARK-10921", "1.6.0")
def this(
master: String,
appName: String,
Expand All @@ -163,7 +154,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (preferredNodeLocationData.nonEmpty) {
logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
}
this.preferredNodeLocationData = preferredNodeLocationData
}

// NOTE: The below constructors could be consolidated using default arguments. Due to
Expand All @@ -177,7 +167,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map())
this(master, appName, null, Nil, Map())

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -187,7 +177,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map())
this(master, appName, sparkHome, Nil, Map())

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -199,7 +189,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())
this(master, appName, sparkHome, jars, Map())

// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import java.util.Arrays
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -154,4 +158,51 @@ private[spark] object TestUtils {
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

/**
* Run some code involving jobs submitted to the given context and assert that the jobs spilled.
*/
def assertSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages > 0, s"expected $identifier to spill, but did not")
}

/**
* Run some code involving jobs submitted to the given context and assert that the jobs
* did not spill.
*/
def assertNotSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
}

}


/**
* A [[SparkListener]] that detects whether spills have occurred in Spark jobs.
*/
private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int]

def numSpilledStages: Int = spilledStageIds.size

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate(
taskEnd.stageId, new ArrayBuffer[TaskMetrics]) += taskEnd.taskMetrics
}

override def onStageCompleted(stageComplete: SparkListenerStageCompleted): Unit = {
val stageId = stageComplete.stageInfo.stageId
val metrics = stageIdToTaskMetrics.remove(stageId).toSeq.flatten
val spilled = metrics.map(_.memoryBytesSpilled).sum > 0
if (spilled) {
spilledStageIds += stageId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ private[deploy] object DeployMessages {

// Master to Worker

sealed trait RegisterWorkerResponse

case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
with RegisterWorkerResponse

case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse

case class RegisterWorkerFailed(message: String) extends DeployMessage
case object MasterInStandby extends DeployMessage with RegisterWorkerResponse

case class ReconnectWorker(masterUrl: String) extends DeployMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.mutable
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.AccessControlException
import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down
50 changes: 25 additions & 25 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,31 +233,6 @@ private[deploy] class Master(
System.exit(0)
}

case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}

case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
Expand Down Expand Up @@ -387,6 +362,31 @@ private[deploy] class Master(
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}

case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
Expand Down
Loading

0 comments on commit aa87c40

Please sign in to comment.