Skip to content

Commit

Permalink
rebase with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
JihongMA committed Nov 5, 2015
2 parents 402971c + d0b5633 commit e3417aa
Show file tree
Hide file tree
Showing 47 changed files with 1,527 additions and 1,400 deletions.
10 changes: 5 additions & 5 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.10.4 - http://www.scala-lang.org/)
(BSD-like) Scala Actors library (org.scala-lang:scala-actors:2.10.5 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-compiler:2.10.5 - http://www.scala-lang.org/)
(BSD-like) Scala Compiler (org.scala-lang:scala-reflect:2.10.5 - http://www.scala-lang.org/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.10.5 - http://www.scala-lang.org/)
(BSD-like) Scalap (org.scala-lang:scalap:2.10.5 - http://www.scala-lang.org/)
(BSD-style) scalacheck (org.scalacheck:scalacheck_2.10:1.10.0 - http://www.scalacheck.org)
(BSD-style) spire (org.spire-math:spire_2.10:0.7.1 - http://spire-math.org)
(BSD-style) spire-macros (org.spire-math:spire-macros_2.10:0.7.1 - http://spire-math.org)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
final boolean fastMergeIsSupported =
!compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
final boolean fastMergeIsSupported = !compressionEnabled ||
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
Expand Down
58 changes: 15 additions & 43 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.bitset.BitSet;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
Expand Down Expand Up @@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private boolean canGrowArray = true;

/**
* A {@link BitSet} used to track location of the map where the key is set.
* Size of the bitset should be half of the size of the long array.
*/
@Nullable private BitSet bitset;

private final double loadFactor;

/**
Expand Down Expand Up @@ -427,7 +420,6 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength) {
* This is a thread-safe version of `lookup`, could be used by multiple threads.
*/
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc) {
assert(bitset != null);
assert(longArray != null);

if (enablePerfMetrics) {
Expand All @@ -440,7 +432,7 @@ public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location l
if (enablePerfMetrics) {
numProbes++;
}
if (!bitset.isSet(pos)) {
if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hashcode, false);
return;
Expand Down Expand Up @@ -644,7 +636,6 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
assert (!isDefined) : "Can only set value once for a key";
assert (keyLength % 8 == 0);
assert (valueLength % 8 == 0);
assert(bitset != null);
assert(longArray != null);

if (numElements == MAX_CAPACITY || !canGrowArray) {
Expand Down Expand Up @@ -678,7 +669,6 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
pageCursor += recordLength;
numElements++;
bitset.set(pos);
final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
currentPage, recordOffset);
longArray.set(pos * 2, storedKeyAddress);
Expand Down Expand Up @@ -734,7 +724,6 @@ private void allocate(int capacity) {
assert (capacity <= MAX_CAPACITY);
acquireMemory(capacity * 16);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));

this.growthThreshold = (int) (capacity * loadFactor);
this.mask = capacity - 1;
Expand All @@ -749,7 +738,6 @@ public void freeArray() {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
bitset = null;
}
}

Expand Down Expand Up @@ -795,9 +783,7 @@ public long getTotalMemoryConsumption() {
for (MemoryBlock dataPage : dataPages) {
totalDataPagesSize += dataPage.size();
}
return totalDataPagesSize +
((bitset != null) ? bitset.memoryBlock().size() : 0L) +
((longArray != null) ? longArray.memoryBlock().size() : 0L);
return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
}

private void updatePeakMemoryUsed() {
Expand Down Expand Up @@ -852,7 +838,6 @@ public int getNumDataPages() {
*/
@VisibleForTesting
void growAndRehash() {
assert(bitset != null);
assert(longArray != null);

long resizeStartTime = -1;
Expand All @@ -861,39 +846,26 @@ void growAndRehash() {
}
// Store references to the old data structures to be used when we re-hash
final LongArray oldLongArray = longArray;
final BitSet oldBitSet = bitset;
final int oldCapacity = (int) oldBitSet.capacity();
final int oldCapacity = (int) oldLongArray.size() / 2;

// Allocate the new data structures
try {
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
} catch (OutOfMemoryError oom) {
longArray = oldLongArray;
bitset = oldBitSet;
throw oom;
}
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));

// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {
final long keyPointer = oldLongArray.get(pos * 2);
final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
for (int i = 0; i < oldLongArray.size(); i += 2) {
final long keyPointer = oldLongArray.get(i);
if (keyPointer == 0) {
continue;
}
final int hashcode = (int) oldLongArray.get(i + 1);
int newPos = hashcode & mask;
int step = 1;
boolean keepGoing = true;

// No need to check for equality here when we insert so this has one less if branch than
// the similar code path in addWithoutResize.
while (keepGoing) {
if (!bitset.isSet(newPos)) {
bitset.set(newPos);
longArray.set(newPos * 2, keyPointer);
longArray.set(newPos * 2 + 1, hashcode);
keepGoing = false;
} else {
newPos = (newPos + step) & mask;
step++;
}
while (longArray.get(newPos * 2) != 0) {
newPos = (newPos + step) & mask;
step++;
}
longArray.set(newPos * 2, keyPointer);
longArray.set(newPos * 2 + 1, hashcode);
}
releaseMemory(oldLongArray.memoryBlock().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,14 @@ public void closeCurrentPage() {
*/
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
assert(inMemSorter != null);
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
} else {

}
return 0L;
return 0L; // this should throw exception
}

if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ trait CompressionCodec {
private[spark] object CompressionCodec {

private val configKey = "spark.io.compression.codec"

private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
}

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,13 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
outputCommitCoordinator.stageStart(stage.id)
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type PartitionId = Int
private type TaskAttemptNumber = Int

private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1

/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
* output for that partition.
Expand All @@ -56,9 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap =
mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]()

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
Expand Down Expand Up @@ -95,9 +95,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}

// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
/**
* Called by the DAGScheduler when a stage starts.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
private[scheduler] def stageStart(
stage: StageId,
maxPartitionId: Int): Unit = {
val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
synchronized {
authorizedCommittersByStage(stage) = arr
}
}

// Called by DAGScheduler
Expand All @@ -122,10 +134,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
if (authorizedCommitters(partition) == attemptNumber) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
}
}
}
Expand All @@ -145,16 +157,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
authorizedCommitters(partition) match {
case NO_AUTHORIZED_COMMITTER =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
true
case existingCommitter =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,10 @@ class CompressionCodecSuite extends SparkFunSuite {
testCodec(codec)
}

test("snappy does not support concatenation of serialized streams") {
test("snappy supports concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
intercept[Exception] {
testConcatenationOfSerializedStreams(codec)
}
testConcatenationOfSerializedStreams(codec)
}

test("bad compression codec") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val partition: Int = 2
val authorizedCommitter: Int = 3
val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2)

assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
Expand Down
2 changes: 1 addition & 1 deletion dev/audit-release/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ run them locally by setting appropriate environment variables.

```
$ cd sbt_app_core
$ SCALA_VERSION=2.10.4 \
$ SCALA_VERSION=2.10.5 \
SPARK_VERSION=1.0.0-SNAPSHOT \
SPARK_RELEASE_REPOSITORY=file:///home/patrick/.ivy2/local \
sbt run
Expand Down
2 changes: 1 addition & 1 deletion dev/audit-release/audit_release.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
RELEASE_KEY = "XXXXXXXX" # Your 8-digit hex
RELEASE_REPOSITORY = "https://repository.apache.org/content/repositories/orgapachespark-1033"
RELEASE_VERSION = "1.1.1"
SCALA_VERSION = "2.10.4"
SCALA_VERSION = "2.10.5"
SCALA_BINARY_VERSION = "2.10"

# Do not set these
Expand Down
2 changes: 1 addition & 1 deletion docker/spark-test/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ RUN apt-get update && \
apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server && \
rm -rf /var/lib/apt/lists/*

ENV SCALA_VERSION 2.10.4
ENV SCALA_VERSION 2.10.5
ENV CDH_VERSION cdh4
ENV SCALA_HOME /opt/scala-$SCALA_VERSION
ENV SPARK_HOME /opt/spark
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ include:
SPARK_VERSION: 1.6.0-SNAPSHOT
SPARK_VERSION_SHORT: 1.6.0
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
SCALA_VERSION: "2.10.5"
MESOS_VERSION: 0.21.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.api.python

import org.apache.spark.mllib.fpm.PrefixSpanModel
import org.apache.spark.rdd.RDD

/**
* A Wrapper of PrefixSpanModel to provide helper method for Python
*/
private[python] class PrefixSpanModelWrapper(model: PrefixSpanModel[Any])
extends PrefixSpanModel(model.freqSequences) {

def getFreqSequences: RDD[Array[Any]] = {
SerDe.fromTuple2RDD(model.freqSequences.map(x => (x.javaSequence, x.freq)))
}
}
Loading

0 comments on commit e3417aa

Please sign in to comment.