Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 18, 2015
2 parents 0eda358 + 775e6f9 commit f261797
Show file tree
Hide file tree
Showing 48 changed files with 1,338 additions and 952 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
} onComplete {
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
}
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
}
}(ThreadUtils.sameThread)
}

def extractLogUrls: Map[String, String] = {
Expand Down
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{InputStream, OutputStream}
import java.io.{IOException, InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
Expand Down Expand Up @@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
new SnappyOutputStream(s, blockSize)
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
*/
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

private[this] var closed: Boolean = false

override def write(b: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte]): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b, off, len)
}

override def flush(): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.flush()
}

override def close(): Unit = {
if (!closed) {
closed = true
os.close()
}
}
}
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.rdd

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.util.ThreadUtils

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
Expand Down Expand Up @@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val f = new ComplexFutureAction[Seq[T]]

f.run {
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
// is a cached thread pool.
val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
Expand Down Expand Up @@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
partsScanned += numPartsToTry
}
results.toSeq
}
}(AsyncRDDActions.futureExecutionContext)

f
}
Expand All @@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
(index, data) => Unit, Unit)
}
}

private object AsyncRDDActions {
val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
}
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {

private var valueObjectDeserialized = false
private var valueObject: T = _

def this() = this(null.asInstanceOf[ByteBuffer], null, null)

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
Expand Down Expand Up @@ -72,10 +75,26 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
}
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
valueObjectDeserialized = false
}

/**
* When `value()` is called at the first time, it needs to deserialize `valueObject` from
* `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at
* the first time, the caller should avoid to block other threads.
*
* After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
*/
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
resultSer.deserialize(valueBytes)
if (valueObjectDeserialized) {
valueObject
} else {
// This should not run when holding a lock because it may cost dozens of seconds for a large
// value.
val resultSer = SparkEnv.get.serializer.newInstance()
valueObject = resultSer.deserialize(valueBytes)
valueObjectDeserialized = true
valueObject
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
import scala.util.Random

Expand Down Expand Up @@ -77,6 +76,9 @@ private[spark] class BlockManager(

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

// Actual storage of where blocks are kept
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
Expand Down Expand Up @@ -266,11 +268,13 @@ private[spark] class BlockManager(
asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
reregister()
asyncReregisterLock.synchronized {
asyncReregisterTask = null
}
}
}(futureExecutionContext)
}
}
}
Expand Down Expand Up @@ -744,7 +748,11 @@ private[spark] class BlockManager(
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future { replicate(blockId, bufferView, putLevel) }
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}

Expand Down Expand Up @@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.spark.storage

import scala.collection.Iterable
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.RpcUtils
import org.apache.spark.util.{ThreadUtils, RpcUtils}

private[spark]
class BlockManagerMaster(
Expand Down Expand Up @@ -102,8 +103,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand All @@ -114,8 +115,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand All @@ -128,8 +129,8 @@ class BlockManagerMaster(
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand Down Expand Up @@ -169,11 +170,17 @@ class BlockManagerMaster(
val response = driverEndpoint.
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
implicit val sameThread = ThreadUtils.sameThread
val cbf =
implicitly[
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
val blockStatus = Await.result(
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -97,13 +96,6 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down
6 changes: 5 additions & 1 deletion ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from urllib.request import urlopen, Request
from urllib.error import HTTPError

SPARK_EC2_VERSION = "1.2.1"
SPARK_EC2_VERSION = "1.3.1"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

VALID_SPARK_VERSIONS = set([
Expand All @@ -65,6 +65,8 @@
"1.1.1",
"1.2.0",
"1.2.1",
"1.3.0",
"1.3.1",
])

SPARK_TACHYON_MAP = {
Expand All @@ -75,6 +77,8 @@
"1.1.1": "0.5.0",
"1.2.0": "0.5.0",
"1.2.1": "0.5.0",
"1.3.0": "0.5.0",
"1.3.1": "0.5.0",
}

DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public String call(Row row) {

System.out.println("=== Data source: Parquet File ===");
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");
schemaPeople.write().parquet("people.parquet");

// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
Expand Down Expand Up @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());

// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
Expand Down
Loading

0 comments on commit f261797

Please sign in to comment.