Skip to content

Commit

Permalink
[SPARK-23253][CORE][SHUFFLE] Only write shuffle temporary index file …
Browse files Browse the repository at this point in the history
…when there is not an existing one

## What changes were proposed in this pull request?

Shuffle Index temporay file is used for atomic creating shuffle index file, it is not needed when the index file already exists after another attempts of same task had it done.

## How was this patch tested?

exitsting ut

cc squito

Author: Kent Yao <yaooqinn@hotmail.com>

Closes #20422 from yaooqinn/SPARK-23253.
  • Loading branch information
yaooqinn authored and squito committed Feb 2, 2018
1 parent b9503fc commit dd52681
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,6 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
Expand All @@ -166,10 +153,22 @@ private[spark] class IndexShuffleBlockResolver(
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

if (indexFile.exists()) {
indexFile.delete()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.sort

import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{DataInputStream, File, FileInputStream, FileOutputStream}

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
Expand Down Expand Up @@ -64,6 +64,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}

test("commit shuffle files multiple times") {
val shuffleId = 1
val mapId = 2
val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
val lengths = Array[Long](10, 0, 20)
val dataTmp = File.createTempFile("shuffle", null, tempDir)
Expand All @@ -73,9 +76,13 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)

val dataFile = resolver.getDataFile(1, 2)
val indexFile = new File(tempDir.getAbsolutePath, idxName)
val dataFile = resolver.getDataFile(shuffleId, mapId)

assert(indexFile.exists())
assert(indexFile.length() === (lengths.length + 1) * 8)
assert(dataFile.exists())
assert(dataFile.length() === 30)
assert(!dataTmp.exists())
Expand All @@ -89,26 +96,37 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out2.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)

assert(indexFile.length() === (lengths.length + 1) * 8)
assert(lengths2.toSeq === lengths.toSeq)
assert(dataFile.exists())
assert(dataFile.length() === 30)
assert(!dataTmp2.exists())

// The dataFile should be the previous one
val firstByte = new Array[Byte](1)
val in = new FileInputStream(dataFile)
val dataIn = new FileInputStream(dataFile)
Utils.tryWithSafeFinally {
in.read(firstByte)
dataIn.read(firstByte)
} {
in.close()
dataIn.close()
}
assert(firstByte(0) === 0)

// The index file should not change
val indexIn = new DataInputStream(new FileInputStream(indexFile))
Utils.tryWithSafeFinally {
indexIn.readLong() // the first offset is always 0
assert(indexIn.readLong() === 10, "The index file should not change")
} {
indexIn.close()
}

// remove data file
dataFile.delete()

val lengths3 = Array[Long](10, 10, 15)
val lengths3 = Array[Long](7, 10, 15, 3)
val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
val out3 = new FileOutputStream(dataTmp3)
Utils.tryWithSafeFinally {
Expand All @@ -117,20 +135,29 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out3.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths3, dataTmp3)
assert(indexFile.length() === (lengths3.length + 1) * 8)
assert(lengths3.toSeq != lengths.toSeq)
assert(dataFile.exists())
assert(dataFile.length() === 35)
assert(!dataTmp2.exists())
assert(!dataTmp3.exists())

// The dataFile should be the previous one
val firstByte2 = new Array[Byte](1)
val in2 = new FileInputStream(dataFile)
// The dataFile should be the new one, since we deleted the dataFile from the first attempt
val dataIn2 = new FileInputStream(dataFile)
Utils.tryWithSafeFinally {
dataIn2.read(firstByte)
} {
dataIn2.close()
}
assert(firstByte(0) === 2)

// The index file should be updated, since we deleted the dataFile from the first attempt
val indexIn2 = new DataInputStream(new FileInputStream(indexFile))
Utils.tryWithSafeFinally {
in2.read(firstByte2)
indexIn2.readLong() // the first offset is always 0
assert(indexIn2.readLong() === 7, "The index file should be updated")
} {
in2.close()
indexIn2.close()
}
assert(firstByte2(0) === 2)
}
}

0 comments on commit dd52681

Please sign in to comment.