Skip to content

Commit

Permalink
Merge branch 'master' into caseWhen
Browse files Browse the repository at this point in the history
Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
  • Loading branch information
concretevitamin committed Jun 11, 2014
2 parents 9f84b40 + fe78b8b commit 2cf08bb
Show file tree
Hide file tree
Showing 148 changed files with 4,097 additions and 586 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
sbt/*.jar
.settings
.cache
.generated-mima-excludes
.generated-mima*
/build/
work/
out/
Expand Down
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
test("large number of iterations") {
// This tests whether jobs with a large number of iterations finish in a reasonable time,
// because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
failAfter(10 seconds) {
failAfter(30 seconds) {
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
Expand All @@ -101,7 +101,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 50
val numSupersteps = 20
val result =
Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
Expand Down
14 changes: 12 additions & 2 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fi
. $FWDIR/bin/load-spark-env.sh

# Figure out which Python executable to use
if [ -z "$PYSPARK_PYTHON" ] ; then
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON="python"
fi
export PYSPARK_PYTHON
Expand All @@ -59,7 +59,7 @@ export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py

# If IPython options are specified, assume user wants to run IPython
if [ -n "$IPYTHON_OPTS" ]; then
if [[ -n "$IPYTHON_OPTS" ]]; then
IPYTHON=1
fi

Expand All @@ -76,6 +76,16 @@ for i in "$@"; do
done
export PYSPARK_SUBMIT_ARGS

# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
fi
exit
fi

# If a python file is provided, directly run spark-submit.
if [[ "$1" =~ \.py$ ]]; then
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,11 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
}
Expand All @@ -837,8 +839,10 @@ class SparkContext(config: SparkConf) extends Logging {
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap

/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
import org.apache.spark.annotation.Experimental


/**
* :: Experimental ::
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
case Success(c) => c
case Failure(err) =>
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
private def convertWritable(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => iw.get()
case dw: DoubleWritable => dw.get()
case lw: LongWritable => lw.get()
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case other => other
}
}

def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
case _ =>
obj
}
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

/**
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
import collection.JavaConversions._
val copy = new Configuration(left)
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
copy
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}

}
Loading

0 comments on commit 2cf08bb

Please sign in to comment.