diff --git a/assembly/pom.xml b/assembly/pom.xml index 963357b9ab167..0c60b66c3daca 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index 355f437c5b16a..c8e39a415af28 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml diff --git a/bin/run-example b/bin/run-example index 7caab31daef39..e7a5fe3914fbd 100755 --- a/bin/run-example +++ b/bin/run-example @@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS" fi -./bin/spark-submit \ +"$FWDIR"/bin/spark-submit \ --master $EXAMPLE_MASTER \ --class $EXAMPLE_CLASS \ "$SPARK_EXAMPLES_JAR" \ diff --git a/core/pom.xml b/core/pom.xml index 0777c5b1f03d4..c3d6b00a443f1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT ../pom.xml @@ -235,7 +235,7 @@ org.easymock - easymock + easymockclassextension test diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9155159cf6aeb..e7f75481939a8 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner { case _ => false } + + override def hashCode: Int = numPartitions } /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. + * + * Note that the actual number of partitions created by the RangePartitioner might not be the same + * as the `partitions` parameter, in the case where the number of sampled records is less than + * the value of `partitions`. */ class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, @@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } - def numPartitions = partitions + def numPartitions = rangeBounds.length + 1 private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] @@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V]( case _ => false } + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + var i = 0 + while (i < rangeBounds.length) { + result = prime * result + rangeBounds(i).hashCode + i += 1 + } + result = prime * result + ascending.hashCode + result + } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d941aea9d7eb2..d721aba709600 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString) + minPartitions).map(pair => pair._2.toString).setName(path) } /** @@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging { classOf[String], classOf[String], updateConf, - minPartitions) + minPartitions).setName(path) } /** @@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass, keyClass, valueClass, - minPartitions) + minPartitions).setName(path) } /** @@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 4c8f9ed6fbc02..7dcfbf741c4f1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided - * Partitioner to partition the output RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, partitioner) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = + { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) } /** - * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) } - /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the - * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { - rdd.countApproxDistinctByKey(relativeSD, numPartitions) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)) } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index dc698dea75e43..23d13710794af 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) + + /** + * Randomly splits this RDD with the provided weights. + * + * @param weights weights for splits, will be normalized if they don't sum to 1 + * + * @return split RDDs in an array + */ + def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] = + randomSplit(weights, Utils.random.nextLong) + + /** + * Randomly splits this RDD with the provided weights. + * + * @param weights weights for splits, will be normalized if they don't sum to 1 + * @param seed random seed + * + * @return split RDDs in an array + */ + def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] = + rdd.randomSplit(weights, seed).map(wrapRDD) + /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 619bfd75be8eb..330569a8d8837 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ - def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala index 95bec5030bfdd..e230d222b8604 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala @@ -50,4 +50,6 @@ private[spark] class PythonPartitioner( case _ => false } + + override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 153eee3bc5889..f1032ea8dbada 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -360,6 +360,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | + | --help, -h Show this help message and exit + | --verbose, -v Print additional debug output + | | Spark standalone with cluster deploy mode only: | --driver-cores NUM Cores for driver (Default: 1). | --supervise If given, restarts the driver on failure. diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 223fef79261d0..8909980957058 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.SerializableHyperLogLog /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. @@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * :: Experimental :: + * * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. Uses the provided - * Partitioner to partition the output RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + * @param p The precision value for the normal set. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If `sp` equals 0, the sparse representation is skipped. + * @param partitioner Partitioner to use for the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { - val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) - val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) - val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) + @Experimental + def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + require(p >= 4, s"p ($p) must be >= 4") + require(sp <= 32, s"sp ($sp) must be <= 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val createHLL = (v: V) => { + val hll = new HyperLogLogPlus(p, sp) + hll.offer(v) + hll + } + val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { + hll.offer(v) + hll + } + val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h1 + } + + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) + } - combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + /** + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param partitioner partitioner of the resulting RDD + */ + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + assert(p <= 32) + countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) } /** * Return approximate number of distinct values for each key in this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. HashPartitions the - * output RDD into numPartitions. * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + * @param numPartitions number of partitions of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } /** - * Return approximate number of distinct values for each key this RDD. - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vice versa. The default value of - * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism - * level. + * Return approximate number of distinct values for each key in this RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. */ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) @@ -689,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance - if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } @@ -755,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") - if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) && + outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(conf) conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index aa03e9276fb34..54bdc3e7cbc7a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -19,12 +19,11 @@ package org.apache.spark.rdd import java.util.Random -import scala.collection.Map -import scala.collection.mutable +import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} -import com.clearspring.analytics.stream.cardinality.HyperLogLog +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} @@ -655,7 +654,19 @@ abstract class RDD[T: ClassTag]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) + def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { + zipPartitions(other, true) { (thisIter, otherIter) => + new Iterator[(T, U)] { + def hasNext = (thisIter.hasNext, otherIter.hasNext) match { + case (true, true) => true + case (false, false) => false + case _ => throw new SparkException("Can only zip RDDs with " + + "same number of elements in each partition") + } + def next = (thisIter.next, otherIter.next) + } + } + } /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by @@ -921,15 +932,49 @@ abstract class RDD[T: ClassTag]( * :: Experimental :: * Return approximate number of distinct elements in the RDD. * - * The accuracy of approximation can be controlled through the relative standard deviation - * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of - * relativeSD is 0.05. + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` + * would trigger sparse representation of registers, which may reduce the memory consumption + * and increase accuracy when the cardinality is small. + * + * @param p The precision value for the normal set. + * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). + * @param sp The precision value for the sparse set, between 0 and 32. + * If `sp` equals 0, the sparse representation is skipped. */ @Experimental + def countApproxDistinct(p: Int, sp: Int): Long = { + require(p >= 4, s"p ($p) must be greater than 0") + require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") + val zeroCounter = new HyperLogLogPlus(p, sp) + aggregate(zeroCounter)( + (hll: HyperLogLogPlus, v: T) => { + hll.offer(v) + hll + }, + (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { + h1.addAll(h2) + h2 + }).cardinality() + } + + /** + * Return approximate number of distinct elements in the RDD. + * + * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: + * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available + * here. + * + * @param relativeSD Relative accuracy. Smaller values create counters that require more space. + * It must be greater than 0.000017. + */ def countApproxDistinct(relativeSD: Double = 0.05): Long = { - val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) - aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() + val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt + countApproxDistinct(p, 0) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala deleted file mode 100644 index b8110ffc42f2d..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.io.{IOException, ObjectOutputStream} - -import scala.reflect.ClassTag - -import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} - -private[spark] class ZippedPartition[T: ClassTag, U: ClassTag]( - idx: Int, - @transient rdd1: RDD[T], - @transient rdd2: RDD[U] - ) extends Partition { - - var partition1 = rdd1.partitions(idx) - var partition2 = rdd2.partitions(idx) - override val index: Int = idx - - def partitions = (partition1, partition2) - - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent partition at the time of task serialization - partition1 = rdd1.partitions(idx) - partition2 = rdd2.partitions(idx) - oos.defaultWriteObject() - } -} - -private[spark] class ZippedRDD[T: ClassTag, U: ClassTag]( - sc: SparkContext, - var rdd1: RDD[T], - var rdd2: RDD[U]) - extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - - override def getPartitions: Array[Partition] = { - if (rdd1.partitions.size != rdd2.partitions.size) { - throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") - } - val array = new Array[Partition](rdd1.partitions.size) - for (i <- 0 until rdd1.partitions.size) { - array(i) = new ZippedPartition(i, rdd1, rdd2) - } - array - } - - override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions - rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) - } - - override def getPreferredLocations(s: Partition): Seq[String] = { - val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions - val pref1 = rdd1.preferredLocations(partition1) - val pref2 = rdd2.preferredLocations(partition2) - // Check whether there are any hosts that match both RDDs; otherwise return the union - val exactMatchLocations = pref1.intersect(pref2) - if (!exactMatchLocations.isEmpty) { - exactMatchLocations - } else { - (pref1 ++ pref2).distinct - } - } - - override def clearDependencies() { - super.clearDependencies() - rdd1 = null - rdd2 = null - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ccff6a3d1aebc..e09a4221e8315 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{SystemClock, Clock, Utils} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -61,7 +61,8 @@ class DAGScheduler( listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, - env: SparkEnv) + env: SparkEnv, + clock: Clock = SystemClock) extends Logging { import DAGScheduler._ @@ -781,7 +782,7 @@ class DAGScheduler( logDebug("New pending tasks: " + myPending) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) @@ -807,11 +808,11 @@ class DAGScheduler( def markStageAsFinished(stage: Stage) = { val serviceTime = stageToInfos(stage).submissionTime match { - case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) + case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(stage).completionTime = Some(clock.getTime()) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) runningStages -= stage } @@ -1015,7 +1016,7 @@ class DAGScheduler( return } val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq - stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis()) + stageToInfos(failedStage).completionTime = Some(clock.getTime()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index cbe9bb093d1c9..9f45400bcf852 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -207,10 +207,12 @@ private[spark] class CoarseMesosSchedulerBackend( .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", sc.executorMemory)) .build() - d.launchTasks(offer.getId, Collections.singletonList(task), filters) + d.launchTasks( + Collections.singleton(offer.getId), Collections.singletonList(task), filters) } else { // Filter it out - d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) + d.launchTasks( + Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index f08b19e6782e3..a089a02d42170 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -223,7 +223,7 @@ private[spark] class MesosSchedulerBackend( // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId, mesosTasks(i), filters) + d.launchTasks(Collections.singleton(offers(i).getId), mesosTasks(i), filters) } } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a41286d3e4a00..9cd79d262ea53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1024,7 +1024,7 @@ private[spark] class BlockManager( if (blockId.isShuffle) { // Reducer may need to read many local shuffle blocks and will wrap them into Iterators // at the beginning. The wrapping will cost some memory (compression instance - // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // initialization, etc.). Reducer reads shuffle blocks one by one so we could do the // wrapping lazily to save memory. class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { lazy val proxy = f diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a43314f48112f..1b104253d545d 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -168,7 +168,7 @@ private[spark] object UIUtils extends Logging {