Skip to content

Commit

Permalink
[SPARK-47199][PYTHON][TESTS] Add prefix into TemporaryDirectory to av…
Browse files Browse the repository at this point in the history
…oid flakiness

### What changes were proposed in this pull request?

This PR proposes to set `prefix` for `TemporaryDirectory` to deflake the tests. Sometimes the test fail because the temporary directory names are same (https://github.com/apache/spark/actions/runs/8066850485/job/22036007390).

```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
    with tempfile.TemporaryDirectory() as d:
        # Create a table with Rate source.
        df.writeStream.toTable(
            "my_table", checkpointLocation=d)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib/python3.11/doctest.py", line 1353, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module>
        with tempfile.TemporaryDirectory() as d:
      File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
        self.cleanup()
      File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
        self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
      File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
        _rmtree(name, onerror=onerror)
      File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
        onerror(os.rmdir, path, sys.exc_info())
      File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
        os.rmdir(path, dir_fd=dir_fd)
    OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
```

### Why are the changes needed?

To make the tests more robust.

### Does this PR introduce _any_ user-facing change?

No, test-only. There's a bit of user-facing documentation change but pretty trivial.

### How was this patch tested?

Manually tested. CI in this PR should test them out as well.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45298 from HyukjinKwon/SPARK-47199.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
HyukjinKwon authored and ericm-db committed Mar 5, 2024
1 parent 6f583cf commit 5a3bccd
Show file tree
Hide file tree
Showing 29 changed files with 154 additions and 149 deletions.
2 changes: 1 addition & 1 deletion dev/connect-check-protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run_cmd(cmd):

def check_connect_protos():
print("Start checking the generated codes in pyspark-connect.")
with tempfile.TemporaryDirectory() as tmp:
with tempfile.TemporaryDirectory(prefix="check_connect_protos") as tmp:
run_cmd(f"{SPARK_HOME}/dev/connect-gen-protos.sh {tmp}")
result = filecmp.dircmp(
f"{SPARK_HOME}/python/pyspark/sql/connect/proto/",
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def dump(self, value: T, f: BinaryIO) -> None:
Write a pickled representation of `b` to the open temp file.
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="dump") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down Expand Up @@ -215,7 +215,7 @@ def load_from_path(self, path: str) -> T:
Read the pickled representation of value from temp file.
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="load_from_path") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down Expand Up @@ -250,7 +250,7 @@ def load(self, file: BinaryIO) -> T:
Read the pickled representation of value from the open temp file.
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="load") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down
26 changes: 13 additions & 13 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def pickleFile(self, name: str, minPartitions: Optional[int] = None) -> RDD[Any]
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="pickleFile") as d:
... # Write a temporary pickled file
... path1 = os.path.join(d, "pickled1")
... sc.parallelize(range(10)).saveAsPickleFile(path1, 3)
Expand Down Expand Up @@ -962,7 +962,7 @@ def textFile(
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="textFile") as d:
... path1 = os.path.join(d, "text1")
... path2 = os.path.join(d, "text2")
...
Expand Down Expand Up @@ -1052,7 +1052,7 @@ def wholeTextFiles(
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="wholeTextFiles") as d:
... # Write a temporary text file
... with open(os.path.join(d, "1.txt"), "w") as f:
... _ = f.write("123")
Expand Down Expand Up @@ -1107,7 +1107,7 @@ def binaryFiles(self, path: str, minPartitions: Optional[int] = None) -> RDD[Tup
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="binaryFiles") as d:
... # Write a temporary binary file
... with open(os.path.join(d, "1.bin"), "wb") as f1:
... _ = f1.write(b"binary data I")
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def binaryRecords(self, path: str, recordLength: int) -> RDD[bytes]:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="binaryRecords") as d:
... # Write a temporary file
... with open(os.path.join(d, "1.bin"), "w") as f:
... for i in range(3):
Expand Down Expand Up @@ -1247,7 +1247,7 @@ def sequenceFile(
>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="sequenceFile") as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1345,7 +1345,7 @@ def newAPIHadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopFile") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1437,7 +1437,7 @@ def newAPIHadoopRDD(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopRDD") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -1544,7 +1544,7 @@ def hadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="hadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1634,7 +1634,7 @@ def hadoopRDD(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="hadoopRDD") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -1694,7 +1694,7 @@ def union(self, rdds: List[RDD[T]]) -> RDD[T]:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="union") as d:
... # generate a text RDD
... with open(os.path.join(d, "union-text.txt"), "w") as f:
... _ = f.write("Hello")
Expand Down Expand Up @@ -1860,7 +1860,7 @@ def addFile(self, path: str, recursive: bool = False) -> None:
>>> import tempfile
>>> from pyspark import SparkFiles
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="addFile") as d:
... path1 = os.path.join(d, "test1.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
Expand Down Expand Up @@ -1984,7 +1984,7 @@ def addArchive(self, path: str) -> None:
>>> import zipfile
>>> from pyspark import SparkFiles
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="addArchive") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "w") as f:
... _ = f.write("100")
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get(cls, filename: str) -> str:
>>> import tempfile
>>> from pyspark import SparkFiles
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="get") as d:
... path1 = os.path.join(d, "test.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_multi_classes_logistic_regression(self):
self._check_result(local_transform_result, expected_predictions, expected_probabilities)

def test_save_load(self):
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_save_load") as tmp_dir:
estimator = LORV2(maxIter=2, numTrainWorkers=2, learningRate=0.001)
local_path = os.path.join(tmp_dir, "estimator")
estimator.saveToLocal(local_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_regressor_evaluator(self):
np.testing.assert_almost_equal(r2_local, expected_r2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_regressor_evaluator") as tmp_dir:
r2_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "r2"
Expand Down Expand Up @@ -133,7 +133,7 @@ def test_binary_classifier_evaluator(self):
np.testing.assert_almost_equal(auprc_local, expected_auprc, decimal=2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_binary_classifier_evaluator") as tmp_dir:
auprc_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "areaUnderPR"
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_multiclass_classifier_evaluator(self):
np.testing.assert_almost_equal(accuracy_local, expected_accuracy, decimal=2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_multiclass_classifier_evaluator") as tmp_dir:
accuracy_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "accuracy"
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_max_abs_scaler(self):

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_max_abs_scaler") as tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = MaxAbsScaler.loadFromLocal(estimator_path)
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_standard_scaler(self):

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_standard_scaler") as tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = StandardScaler.loadFromLocal(estimator_path)
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_array_assembler(self):
result2[1][1] = np.nan
np.testing.assert_allclose(result2, expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_array_assembler") as tmp_dir:
save_path = os.path.join(tmp_dir, "assembler")
assembler1.saveToLocal(save_path)
loaded_assembler = ArrayAssembler.loadFromLocal(save_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_pipeline(self):
pd.testing.assert_frame_equal(local_eval_dataset, local_eval_dataset_copy)
self._check_result(local_transform_result2, expected_predictions, expected_probabilities)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_pipeline") as tmp_dir:
pipeline_local_path = os.path.join(tmp_dir, "pipeline")
pipeline.saveToLocal(pipeline_local_path)
loaded_pipeline = Pipeline.loadFromLocal(pipeline_local_path)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _verify_cv_saved_params(instance, loaded_instance):
assert instance.getEstimatorParamMaps() == loaded_instance.getEstimatorParamMaps()

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_crossvalidator_on_pipeline") as tmp_dir:
cv.saveToLocal(f"{tmp_dir}/cv")
loaded_cv = CrossValidator.loadFromLocal(f"{tmp_dir}/cv")

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/tests/test_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_ambiguous_column(self):
seed=42,
).fit(data)

with tempfile.TemporaryDirectory() as d:
with tempfile.TemporaryDirectory(prefix="test_ambiguous_column") as d:
model.write().overwrite().save(d)
loaded_model = ALSModel().load(d)

Expand Down
18 changes: 9 additions & 9 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ def saveAsNewAPIHadoopDataset(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopDataset") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -3059,7 +3059,7 @@ def saveAsNewAPIHadoopFile(
>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopFile") as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -3129,7 +3129,7 @@ def saveAsHadoopDataset(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopDataset") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -3224,7 +3224,7 @@ def saveAsHadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -3290,7 +3290,7 @@ def saveAsSequenceFile(
Set the related classes
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsSequenceFile") as d:
... path = os.path.join(d, "sequence_file")
...
... # Write a temporary sequence file
Expand Down Expand Up @@ -3332,7 +3332,7 @@ def saveAsPickleFile(self, path: str, batchSize: int = 10) -> None:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsPickleFile") as d:
... path = os.path.join(d, "pickle_file")
...
... # Write a temporary pickled file
Expand Down Expand Up @@ -3374,7 +3374,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)
>>> import tempfile
>>> from fileinput import input
>>> from glob import glob
>>> with tempfile.TemporaryDirectory() as d1:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile1") as d1:
... path1 = os.path.join(d1, "text_file1")
...
... # Write a temporary text file
Expand All @@ -3386,7 +3386,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)
Empty lines are tolerated when saving to text files.
>>> with tempfile.TemporaryDirectory() as d2:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile2") as d2:
... path2 = os.path.join(d2, "text2_file2")
...
... # Write another temporary text file
Expand All @@ -3399,7 +3399,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)
Using compressionCodecClass
>>> from fileinput import input, hook_compressed
>>> with tempfile.TemporaryDirectory() as d3:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile3") as d3:
... path3 = os.path.join(d3, "text3")
... codec = "org.apache.hadoop.io.compress.GzipCodec"
...
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ def createTable(
Creating an external table
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="createTable") as d:
... _ = spark.catalog.createTable(
... "tbl2", schema=spark.range(1).schema, path=d, source='parquet')
>>> _ = spark.sql("DROP TABLE tbl2")
Expand Down Expand Up @@ -1119,7 +1119,7 @@ def refreshTable(self, tableName: str) -> None:
The example below caches a table, and then removes the data.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="refreshTable") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d))
Expand Down Expand Up @@ -1170,7 +1170,7 @@ def recoverPartitions(self, tableName: str) -> None:
the partitioned table. After that, it recovers the partitions.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="recoverPartitions") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... spark.range(1).selectExpr(
... "id as key", "id as value").write.partitionBy("key").mode("overwrite").save(d)
Expand Down Expand Up @@ -1209,7 +1209,7 @@ def refreshByPath(self, path: str) -> None:
The example below caches a table, and then removes the data.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="refreshByPath") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d))
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def writeStream(self) -> DataStreamWriter:
>>> type(df.writeStream)
<class '...streaming.readwriter.DataStreamWriter'>
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="writeStream") as d:
... # Create a table with Rate source.
... df.writeStream.toTable(
... "my_table", checkpointLocation=d)
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def checkpoint(self, eager: bool = True) -> "DataFrame":
>>> import tempfile
>>> df = spark.createDataFrame([
... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="checkpoint") as d:
... spark.sparkContext.setCheckpointDir("/tmp/bb")
... df.checkpoint(False)
DataFrame[age: bigint, name: string]
Expand Down Expand Up @@ -6766,7 +6766,7 @@ def inputFiles(self) -> List[str]:
Examples
--------
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="inputFiles") as d:
... # Write a single-row DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
Expand Down
Loading

0 comments on commit 5a3bccd

Please sign in to comment.