Skip to content

Commit

Permalink
Use Utils.createTempDir() to replace other temp file mechanisms used …
Browse files Browse the repository at this point in the history
…in some tests, to further ensure they are cleaned up, and simplify
  • Loading branch information
srowen committed Mar 16, 2015
1 parent 00e730b commit 57609e4
Show file tree
Hide file tree
Showing 23 changed files with 72 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.util.Utils

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
Expand Down Expand Up @@ -405,8 +406,7 @@ private[spark] object SparkDocker {

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "")
outFile.deleteOnExit()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
if (line.startsWith("CONTAINER_IP=")) {
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {

override def beforeEach() {
super.beforeEach()
checkpointDir = File.createTempFile("temp", "")
checkpointDir.deleteOnExit()
checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
checkpointDir.delete()
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File

import org.scalatest.FunSuite

import org.apache.spark.util.Utils

class SecurityManagerSuite extends FunSuite {

test("set security with conf") {
Expand Down Expand Up @@ -160,8 +162,7 @@ class SecurityManagerSuite extends FunSuite {
}

test("ssl off setup") {
val file = File.createTempFile("SSLOptionsSuite", "conf")
file.deleteOnExit()
val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())

System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
val conf = new SparkConf()
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file1 = File.createTempFile("someprefix1", "somesuffix1")
val dir = Utils.createTempDir()

val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath

val pluto = Utils.createTempDir()
val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

Expand Down Expand Up @@ -129,7 +130,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles

val tmpDir = Utils.createTempDir()

// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "")
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
Expand All @@ -420,7 +422,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps("spark.files") should be(Utils.resolveURIs(files))

// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "")
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
Expand All @@ -437,7 +439,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))

// Test python files
val f3 = File.createTempFile("test-submit-python-files", "")
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
import org.apache.spark._
import org.scalatest.FunSuite

import scala.collection.Map
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

import org.apache.spark._
import org.apache.spark.util.Utils

class PipedRDDSuite extends FunSuite with SharedSparkContext {

test("basic pipe") {
Expand Down Expand Up @@ -141,7 +143,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
new File("tasks").delete()
Utils.deleteRecursively(new File("tasks"))
} else {
assert(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
package org.apache.spark.storage

import org.scalatest.FunSuite
import java.io.File

import org.scalatest.FunSuite

import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

class BlockObjectWriterSuite extends FunSuite {
test("verify write metrics") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand All @@ -47,8 +49,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("verify write metrics on revert") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand All @@ -71,8 +72,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("Reopening a closed block writer") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolic

class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {

val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
val testFile = new File(Utils.createTempDir(), "FileAppenderSuite-test").getAbsoluteFile

before {
cleanup()
Expand Down
7 changes: 3 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {

test("reading offset bytes of a file") {
val tmpDir2 = Utils.createTempDir()
tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
Expand Down Expand Up @@ -151,7 +150,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {

test("reading offset bytes across multiple files") {
val tmpDir = Utils.createTempDir()
tmpDir.deleteOnExit()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
Files.write("0123456789", files(0), UTF_8)
Files.write("abcdefghij", files(1), UTF_8)
Expand Down Expand Up @@ -357,7 +355,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}

test("loading properties from file") {
val outFile = File.createTempFile("test-load-spark-properties", "test")
val tmpDir = Utils.createTempDir()
val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
Expand All @@ -370,7 +369,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
outFile.delete()
Utils.deleteRecursively(tmpDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.google.common.io.Files
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {

Expand All @@ -60,18 +59,15 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
)

ssc = new StreamingContext(sparkConf, Milliseconds(500))
tempDirectory = Files.createTempDir()
tempDirectory = Utils.createTempDir()
ssc.checkpoint(tempDirectory.getAbsolutePath)
}

after {
if (ssc != null) {
ssc.stop()
}
if (tempDirectory != null && tempDirectory.exists()) {
FileUtils.deleteDirectory(tempDirectory)
tempDirectory = null
}
Utils.deleteRecursively(tempDirectory)
tearDownKafka()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.graphx

import org.scalatest.FunSuite

import com.google.common.io.Files

import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class GraphSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -369,8 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}

test("checkpoint") {
val checkpointDir = Files.createTempDir()
checkpointDir.deleteOnExit()
val checkpointDir = Utils.createTempDir()
withSpark { sc =>
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
Expand All @@ -389,6 +387,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(graph.isCheckpointed)
assert(graph.getCheckpointFiles.size === 2)
}
Utils.deleteRecursively(checkpointDir)
}

test("cache, getStorageLevel") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import java.io._
import java.net.URLClassLoader

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.tools.nsc.interpreter.SparkILoop

import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -196,8 +194,7 @@ class ReplSuite extends FunSuite {
}

test("interacting with files") {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val tempDir = Utils.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
out.write("What's up?\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql

import java.io.File

import org.apache.spark.util.Utils

import scala.beans.{BeanInfo, BeanProperty}

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -98,13 +100,13 @@ class UserDefinedTypeSuite extends QueryTest {


test("UDTs with Parquet") {
val tempDir = File.createTempFile("parquet", "test")
val tempDir = Utils.createTempDir()
tempDir.delete()
pointsRDD.saveAsParquetFile(tempDir.getCanonicalPath)
}

test("Repartition UDTs with Parquet") {
val tempDir = File.createTempFile("parquet", "test")
val tempDir = Utils.createTempDir()
tempDir.delete()
pointsRDD.repartition(1).saveAsParquetFile(tempDir.getCanonicalPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
Utils.registerShutdownDeleteDir(new File(metastorePath))
}

val testTempDir = File.createTempFile("testTempFiles", "spark.hive.tmp")
testTempDir.delete()
testTempDir.mkdir()
Utils.registerShutdownDeleteDir(testTempDir)
val testTempDir = Utils.createTempDir()

// For some hive test case which contain ${system:test.tmp.dir}
System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import java.io.File

import org.scalatest.BeforeAndAfter

import com.google.common.io.Files

import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/* Implicits */
import org.apache.spark.sql.hive.test.TestHive._
Expand Down Expand Up @@ -112,7 +111,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {

test("SPARK-4203:random partition directory order") {
sql("CREATE TABLE tmp_table (key int, value string)")
val tmpDir = Files.createTempDir()
val tmpDir = Utils.createTempDir()
sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='2') SELECT 'blarr' FROM tmp_table")
Expand All @@ -136,6 +135,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
assert(listFolders(tmpDir,List()).sortBy(_.toString()) == expected.sortBy(_.toString))
sql("DROP TABLE table_with_partition")
sql("DROP TABLE tmp_table")
Utils.deleteRecursively(tmpDir)
}

test("Insert ArrayType.containsNull == false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}

test("check change without refresh") {
val tempDir = File.createTempFile("sparksql", "json")
val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
Expand Down Expand Up @@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}

test("drop, change, recreate") {
val tempDir = File.createTempFile("sparksql", "json")
val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
Expand Down
Loading

0 comments on commit 57609e4

Please sign in to comment.