Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into object
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Apr 19, 2016
2 parents 43644ef + d29e429 commit db476f5
Show file tree
Hide file tree
Showing 334 changed files with 11,407 additions and 2,753 deletions.
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ export("as.DataFrame",
"tableToDF",
"tableNames",
"tables",
"uncacheTable")
"uncacheTable",
"print.summary.GeneralizedLinearRegressionModel")

export("structField",
"structField.jobj",
Expand Down
49 changes: 46 additions & 3 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,55 @@ setMethod("summary", signature(object = "GeneralizedLinearRegressionModel"),
jobj <- object@jobj
features <- callJMethod(jobj, "rFeatures")
coefficients <- callJMethod(jobj, "rCoefficients")
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
deviance.resid <- callJMethod(jobj, "rDevianceResiduals")
dispersion <- callJMethod(jobj, "rDispersion")
null.deviance <- callJMethod(jobj, "rNullDeviance")
deviance <- callJMethod(jobj, "rDeviance")
df.null <- callJMethod(jobj, "rResidualDegreeOfFreedomNull")
df.residual <- callJMethod(jobj, "rResidualDegreeOfFreedom")
aic <- callJMethod(jobj, "rAic")
iter <- callJMethod(jobj, "rNumIterations")
family <- callJMethod(jobj, "rFamily")

deviance.resid <- dataFrame(deviance.resid)
coefficients <- matrix(coefficients, ncol = 4)
colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
ans <- list(deviance.resid = deviance.resid, coefficients = coefficients,
dispersion = dispersion, null.deviance = null.deviance,
deviance = deviance, df.null = df.null, df.residual = df.residual,
aic = aic, iter = iter, family = family)
class(ans) <- "summary.GeneralizedLinearRegressionModel"
return(ans)
})

#' Print the summary of GeneralizedLinearRegressionModel
#'
#' @rdname print
#' @name print.summary.GeneralizedLinearRegressionModel
#' @export
print.summary.GeneralizedLinearRegressionModel <- function(x, ...) {
x$deviance.resid <- setNames(unlist(approxQuantile(x$deviance.resid, "devianceResiduals",
c(0.0, 0.25, 0.5, 0.75, 1.0), 0.01)), c("Min", "1Q", "Median", "3Q", "Max"))
x$deviance.resid <- zapsmall(x$deviance.resid, 5L)
cat("\nDeviance Residuals: \n")
cat("(Note: These are approximate quantiles with relative error <= 0.01)\n")
print.default(x$deviance.resid, digits = 5L, na.print = "", print.gap = 2L)

cat("\nCoefficients:\n")
print.default(x$coefficients, digits = 5L, na.print = "", print.gap = 2L)

cat("\n(Dispersion parameter for ", x$family, " family taken to be ", format(x$dispersion),
")\n\n", apply(cbind(paste(format(c("Null", "Residual"), justify = "right"), "deviance:"),
format(unlist(x[c("null.deviance", "deviance")]), digits = 5L),
" on", format(unlist(x[c("df.null", "df.residual")])), " degrees of freedom\n"),
1L, paste, collapse = " "), sep = "")
cat("AIC: ", format(x$aic, digits = 4L), "\n\n",
"Number of Fisher Scoring iterations: ", x$iter, "\n", sep = "")
cat("\n")
invisible(x)
}

#' Make predictions from a generalized linear model
#'
#' Makes predictions from a generalized linear model produced by glm(), similarly to R's predict().
Expand Down
49 changes: 49 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,55 @@ test_that("glm and predict", {
expect_equal(length(predict(lm(y ~ x))), 15)
})

test_that("glm summary", {
# gaussian family
training <- suppressWarnings(createDataFrame(sqlContext, iris))
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))

rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))

coefs <- unlist(stats$coefficients)
rCoefs <- unlist(rStats$coefficients)
expect_true(all(abs(rCoefs - coefs) < 1e-4))
expect_true(all(
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
expect_equal(stats$dispersion, rStats$dispersion)
expect_equal(stats$null.deviance, rStats$null.deviance)
expect_equal(stats$deviance, rStats$deviance)
expect_equal(stats$df.null, rStats$df.null)
expect_equal(stats$df.residual, rStats$df.residual)
expect_equal(stats$aic, rStats$aic)

# binomial family
df <- suppressWarnings(createDataFrame(sqlContext, iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
family = binomial(link = "logit")))

rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
family = binomial(link = "logit")))

coefs <- unlist(stats$coefficients)
rCoefs <- unlist(rStats$coefficients)
expect_true(all(abs(rCoefs - coefs) < 1e-4))
expect_true(all(
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Sepal_Width")))
expect_equal(stats$dispersion, rStats$dispersion)
expect_equal(stats$null.deviance, rStats$null.deviance)
expect_equal(stats$deviance, rStats$deviance)
expect_equal(stats$df.null, rStats$df.null)
expect_equal(stats$df.residual, rStats$df.residual)
expect_equal(stats$aic, rStats$aic)

# Test summary works on base GLM models
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
baseSummary <- summary(baseModel)
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
})

test_that("kmeans", {
newIris <- iris
newIris$Species <- NULL
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ if exist "%SPARK_HOME%\RELEASE" (
)

if not exist "%SPARK_JARS_DIR%"\ (
echo Failed to find Spark assembly JAR.
echo Failed to find Spark jars directory.
echo You need to build Spark before running this program.
exit /b 1
)
Expand Down
1 change: 0 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.10</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ private object ContextCleaner {
* Listener class used for testing when any item has been cleaned by the Cleaner class.
*/
private[spark] trait CleanerListener {
def rddCleaned(rddId: Int)
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
def rddCleaned(rddId: Int): Unit
def shuffleCleaned(shuffleId: Int): Unit
def broadcastCleaned(broadcastId: Long): Unit
def accumCleaned(accId: Long): Unit
def checkpointCleaned(rddId: Long): Unit
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Cancels the execution of this action.
*/
def cancel()
def cancel(): Unit

/**
* Blocks until this action completes.
Expand All @@ -65,7 +65,7 @@ trait FutureAction[T] extends Future[T] {
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit

/**
* Returns whether the action has already been completed with a value or an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ private[spark] object InternalAccumulator {

// Names of output metrics
object output {
val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
}

// Names of input metrics
object input {
val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
}
Expand Down Expand Up @@ -110,8 +108,6 @@ private[spark] object InternalAccumulator {
case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
case input.READ_METHOD => StringAccumulatorParam
case output.WRITE_METHOD => StringAccumulatorParam
case _ => LongAccumulatorParam
}
}
Expand Down Expand Up @@ -165,7 +161,6 @@ private[spark] object InternalAccumulator {
*/
def createInputAccums(): Seq[Accumulator[_]] = {
Seq[String](
input.READ_METHOD,
input.BYTES_READ,
input.RECORDS_READ).map(create)
}
Expand All @@ -175,7 +170,6 @@ private[spark] object InternalAccumulator {
*/
def createOutputAccums(): Seq[Accumulator[_]] = {
Seq[String](
output.WRITE_METHOD,
output.BYTES_WRITTEN,
output.RECORDS_WRITTEN).map(create)
}
Expand All @@ -187,7 +181,7 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
Expand Down
33 changes: 28 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

Expand Down Expand Up @@ -723,7 +723,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
Expand Down Expand Up @@ -762,7 +762,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
ret
}
}
})
}
}

/** Distribute a local Scala collection to form an RDD.
Expand Down Expand Up @@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)

case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
s"for the url $url:")
}
serviceLoaders.headOption
}
}

Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ case class ExceptionFailure(
this(e, accumUpdates, preserveCause = true)
}

def exception: Option[Throwable] = exceptionWrapper.flatMap {
(w: ThrowableSerializationWrapper) => Option(w.exception)
}
def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))

override def toErrorString: String =
if (fullStackTrace == null) {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
new JavaDoubleRDD(rdd.map(f.call(_).doubleValue()))
}

/**
Expand All @@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue()))
}

/**
Expand Down Expand Up @@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue()))
}

/**
Expand All @@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
.map(_.doubleValue()))
}

/**
Expand All @@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
rdd.foreachPartition((x => f.call(x.asJava)))
rdd.foreachPartition(x => f.call(x.asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object PythonRunner {
// ready to serve connections.
thread.join()

// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// Build up a PYTHONPATH that includes the Spark assembly (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= formattedPyFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
"""Usage: spark-submit [options] <app jar | python file> [app arguments]
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|Usage: spark-submit --status [submission ID] --master [spark://...]""".stripMargin)
|Usage: spark-submit --status [submission ID] --master [spark://...]
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
outStream.println(command)

val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit

def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
Loading

0 comments on commit db476f5

Please sign in to comment.