Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/incubator-spark into kill
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
  • Loading branch information
rxin committed Oct 15, 2013
2 parents 88866ea + 3b11f43 commit 9cd8786
Show file tree
Hide file tree
Showing 53 changed files with 652 additions and 457 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;

import org.apache.spark.storage.BlockId;

abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {

Expand All @@ -33,7 +34,7 @@ public boolean isComplete() {
}

public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
public abstract void handleError(String blockId);
public abstract void handleError(BlockId blockId);

@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.DefaultFileRegion;

import org.apache.spark.storage.BlockId;

class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {

Expand All @@ -34,8 +35,9 @@ public FileServerHandler(PathResolver pResolver){
}

@Override
public void messageReceived(ChannelHandlerContext ctx, String blockId) {
String path = pResolver.getAbsolutePath(blockId);
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
String path = pResolver.getAbsolutePath(blockId.name());
// if getFilePath returns null, close the channel
if (path == null) {
//ctx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.HashMap

import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator


Expand All @@ -49,22 +49,21 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}

val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map {
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}

def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
blockId match {
case regex(shufId, mapId, _) =>
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
case _ =>
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}
import org.apache.spark.storage.{BlockManager, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, RDDBlockId}
import org.apache.spark.rdd.RDD


Expand All @@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
private val loading = new HashSet[String]()
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
case Some(values) =>
Expand Down Expand Up @@ -73,7 +73,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.util.Duration

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}


private[spark] sealed trait MapOutputTrackerMessage
Expand Down Expand Up @@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)

// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
Expand Down
37 changes: 16 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util._
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
import scala.Some
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
Expand Down Expand Up @@ -119,7 +123,7 @@ class SparkContext(

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)

// Initalize the Spark UI
private[spark] val ui = new SparkUI(this)
Expand Down Expand Up @@ -331,7 +335,7 @@ class SparkContext(
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
Expand All @@ -357,24 +361,15 @@ class SparkContext(
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}

/**
* Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
* that has already been broadcast, assuming that it's safe to use it to construct a
* HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
*/
def hadoopFile[K, V](
path: String,
confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
): RDD[(K, V)] = {
new HadoopFileRDD(
this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minSplits)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
import scala.math

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.Utils

private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
Expand All @@ -36,7 +36,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:

def value = value_

def blockId: String = "broadcast_" + id
def blockId = BroadcastBlockId(id)

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.{HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}

import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}

private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def value = value_

def blockId: String = "broadcast_" + id
def blockId = BroadcastBlockId(id)

HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down Expand Up @@ -82,7 +81,7 @@ private object HttpBroadcast extends Logging {
private var server: HttpServer = null

private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)

private lazy val compressionCodec = CompressionCodec.createCodec()

Expand Down Expand Up @@ -121,7 +120,7 @@ private object HttpBroadcast extends Logging {
}

def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
val file = new File(broadcastDir, BroadcastBlockId(id).name)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
Expand All @@ -137,7 +136,7 @@ private object HttpBroadcast extends Logging {
}

def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
val url = serverUri + "/" + BroadcastBlockId(id).name
val in = {
if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ package org.apache.spark.broadcast

import java.io._
import java.net._
import java.util.{Comparator, Random, UUID}

import scala.collection.mutable.{ListBuffer, Map, Set}
import scala.math
import scala.collection.mutable.{ListBuffer, Set}

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.Utils

private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def value = value_

def blockId = "broadcast_" + id
def blockId = BroadcastBlockId(id)

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.mutable.HashMap

import org.apache.spark.scheduler._
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -234,7 +234,7 @@ private[spark] class Executor(
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = "taskresult_" + taskId
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
Expand Down
22 changes: 10 additions & 12 deletions core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ package org.apache.spark.network.netty
import io.netty.buffer._

import org.apache.spark.Logging
import org.apache.spark.storage.{TestBlockId, BlockId}

private[spark] class FileHeader (
val fileLen: Int,
val blockId: String) extends Logging {
val blockId: BlockId) extends Logging {

lazy val buffer = {
val buf = Unpooled.buffer()
buf.capacity(FileHeader.HEADER_SIZE)
buf.writeInt(fileLen)
buf.writeInt(blockId.length)
blockId.foreach((x: Char) => buf.writeByte(x))
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
//padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
Expand All @@ -57,18 +58,15 @@ private[spark] object FileHeader {
for (i <- 1 to idLength) {
idBuilder += buf.readByte().asInstanceOf[Char]
}
val blockId = idBuilder.toString()
val blockId = BlockId(idBuilder.toString())
new FileHeader(length, blockId)
}


def main (args:Array[String]){

val header = new FileHeader(25,"block_0");
val buf = header.buffer;
val newheader = FileHeader.create(buf);
System.out.println("id="+newheader.blockId+",size="+newheader.fileLen)

def main (args:Array[String]) {
val header = new FileHeader(25, TestBlockId("my_block"))
val buf = header.buffer
val newHeader = FileHeader.create(buf)
System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen)
}
}

Loading

0 comments on commit 9cd8786

Please sign in to comment.