Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47199][PYTHON][TESTS] Add prefix into TemporaryDirectory to avoid flakiness #45298

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/connect-check-protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run_cmd(cmd):

def check_connect_protos():
print("Start checking the generated codes in pyspark-connect.")
with tempfile.TemporaryDirectory() as tmp:
with tempfile.TemporaryDirectory(prefix="check_connect_protos") as tmp:
run_cmd(f"{SPARK_HOME}/dev/connect-gen-protos.sh {tmp}")
result = filecmp.dircmp(
f"{SPARK_HOME}/python/pyspark/sql/connect/proto/",
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def dump(self, value: T, f: BinaryIO) -> None:

Write a pickled representation of `b` to the open temp file.

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="dump") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down Expand Up @@ -215,7 +215,7 @@ def load_from_path(self, path: str) -> T:

Read the pickled representation of value from temp file.

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="load_from_path") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down Expand Up @@ -250,7 +250,7 @@ def load(self, file: BinaryIO) -> T:

Read the pickled representation of value from the open temp file.

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="load") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
Expand Down
26 changes: 13 additions & 13 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def pickleFile(self, name: str, minPartitions: Optional[int] = None) -> RDD[Any]
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="pickleFile") as d:
... # Write a temporary pickled file
... path1 = os.path.join(d, "pickled1")
... sc.parallelize(range(10)).saveAsPickleFile(path1, 3)
Expand Down Expand Up @@ -962,7 +962,7 @@ def textFile(
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="textFile") as d:
... path1 = os.path.join(d, "text1")
... path2 = os.path.join(d, "text2")
...
Expand Down Expand Up @@ -1052,7 +1052,7 @@ def wholeTextFiles(
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="wholeTextFiles") as d:
... # Write a temporary text file
... with open(os.path.join(d, "1.txt"), "w") as f:
... _ = f.write("123")
Expand Down Expand Up @@ -1107,7 +1107,7 @@ def binaryFiles(self, path: str, minPartitions: Optional[int] = None) -> RDD[Tup
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="binaryFiles") as d:
... # Write a temporary binary file
... with open(os.path.join(d, "1.bin"), "wb") as f1:
... _ = f1.write(b"binary data I")
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def binaryRecords(self, path: str, recordLength: int) -> RDD[bytes]:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="binaryRecords") as d:
... # Write a temporary file
... with open(os.path.join(d, "1.bin"), "w") as f:
... for i in range(3):
Expand Down Expand Up @@ -1247,7 +1247,7 @@ def sequenceFile(

>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="sequenceFile") as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1345,7 +1345,7 @@ def newAPIHadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopFile") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1437,7 +1437,7 @@ def newAPIHadoopRDD(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopRDD") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -1544,7 +1544,7 @@ def hadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="hadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -1634,7 +1634,7 @@ def hadoopRDD(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="hadoopRDD") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -1694,7 +1694,7 @@ def union(self, rdds: List[RDD[T]]) -> RDD[T]:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="union") as d:
... # generate a text RDD
... with open(os.path.join(d, "union-text.txt"), "w") as f:
... _ = f.write("Hello")
Expand Down Expand Up @@ -1860,7 +1860,7 @@ def addFile(self, path: str, recursive: bool = False) -> None:
>>> import tempfile
>>> from pyspark import SparkFiles

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="addFile") as d:
... path1 = os.path.join(d, "test1.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
Expand Down Expand Up @@ -1984,7 +1984,7 @@ def addArchive(self, path: str) -> None:
>>> import zipfile
>>> from pyspark import SparkFiles

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="addArchive") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "w") as f:
... _ = f.write("100")
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get(cls, filename: str) -> str:
>>> import tempfile
>>> from pyspark import SparkFiles

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="get") as d:
... path1 = os.path.join(d, "test.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_multi_classes_logistic_regression(self):
self._check_result(local_transform_result, expected_predictions, expected_probabilities)

def test_save_load(self):
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_save_load") as tmp_dir:
estimator = LORV2(maxIter=2, numTrainWorkers=2, learningRate=0.001)
local_path = os.path.join(tmp_dir, "estimator")
estimator.saveToLocal(local_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_regressor_evaluator(self):
np.testing.assert_almost_equal(r2_local, expected_r2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_regressor_evaluator") as tmp_dir:
r2_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "r2"
Expand Down Expand Up @@ -133,7 +133,7 @@ def test_binary_classifier_evaluator(self):
np.testing.assert_almost_equal(auprc_local, expected_auprc, decimal=2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_binary_classifier_evaluator") as tmp_dir:
auprc_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "areaUnderPR"
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_multiclass_classifier_evaluator(self):
np.testing.assert_almost_equal(accuracy_local, expected_accuracy, decimal=2)

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_multiclass_classifier_evaluator") as tmp_dir:
accuracy_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "accuracy"
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_max_abs_scaler(self):

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_max_abs_scaler") as tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = MaxAbsScaler.loadFromLocal(estimator_path)
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_standard_scaler(self):

np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_standard_scaler") as tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = StandardScaler.loadFromLocal(estimator_path)
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_array_assembler(self):
result2[1][1] = np.nan
np.testing.assert_allclose(result2, expected_result)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_array_assembler") as tmp_dir:
save_path = os.path.join(tmp_dir, "assembler")
assembler1.saveToLocal(save_path)
loaded_assembler = ArrayAssembler.loadFromLocal(save_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_pipeline(self):
pd.testing.assert_frame_equal(local_eval_dataset, local_eval_dataset_copy)
self._check_result(local_transform_result2, expected_predictions, expected_probabilities)

with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_pipeline") as tmp_dir:
pipeline_local_path = os.path.join(tmp_dir, "pipeline")
pipeline.saveToLocal(pipeline_local_path)
loaded_pipeline = Pipeline.loadFromLocal(pipeline_local_path)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _verify_cv_saved_params(instance, loaded_instance):
assert instance.getEstimatorParamMaps() == loaded_instance.getEstimatorParamMaps()

# Test save / load
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory(prefix="test_crossvalidator_on_pipeline") as tmp_dir:
cv.saveToLocal(f"{tmp_dir}/cv")
loaded_cv = CrossValidator.loadFromLocal(f"{tmp_dir}/cv")

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/tests/test_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_ambiguous_column(self):
seed=42,
).fit(data)

with tempfile.TemporaryDirectory() as d:
with tempfile.TemporaryDirectory(prefix="test_ambiguous_column") as d:
model.write().overwrite().save(d)
loaded_model = ALSModel().load(d)

Expand Down
18 changes: 9 additions & 9 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ def saveAsNewAPIHadoopDataset(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopDataset") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -3059,7 +3059,7 @@ def saveAsNewAPIHadoopFile(

>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopFile") as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -3129,7 +3129,7 @@ def saveAsHadoopDataset(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopDataset") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
Expand Down Expand Up @@ -3224,7 +3224,7 @@ def saveAsHadoopFile(
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
Expand Down Expand Up @@ -3290,7 +3290,7 @@ def saveAsSequenceFile(

Set the related classes

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsSequenceFile") as d:
... path = os.path.join(d, "sequence_file")
...
... # Write a temporary sequence file
Expand Down Expand Up @@ -3332,7 +3332,7 @@ def saveAsPickleFile(self, path: str, batchSize: int = 10) -> None:
--------
>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="saveAsPickleFile") as d:
... path = os.path.join(d, "pickle_file")
...
... # Write a temporary pickled file
Expand Down Expand Up @@ -3374,7 +3374,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)
>>> import tempfile
>>> from fileinput import input
>>> from glob import glob
>>> with tempfile.TemporaryDirectory() as d1:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile1") as d1:
... path1 = os.path.join(d1, "text_file1")
...
... # Write a temporary text file
Expand All @@ -3386,7 +3386,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)

Empty lines are tolerated when saving to text files.

>>> with tempfile.TemporaryDirectory() as d2:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile2") as d2:
... path2 = os.path.join(d2, "text2_file2")
...
... # Write another temporary text file
Expand All @@ -3399,7 +3399,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None)
Using compressionCodecClass

>>> from fileinput import input, hook_compressed
>>> with tempfile.TemporaryDirectory() as d3:
>>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile3") as d3:
... path3 = os.path.join(d3, "text3")
... codec = "org.apache.hadoop.io.compress.GzipCodec"
...
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ def createTable(
Creating an external table

>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="createTable") as d:
... _ = spark.catalog.createTable(
... "tbl2", schema=spark.range(1).schema, path=d, source='parquet')
>>> _ = spark.sql("DROP TABLE tbl2")
Expand Down Expand Up @@ -1119,7 +1119,7 @@ def refreshTable(self, tableName: str) -> None:
The example below caches a table, and then removes the data.

>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="refreshTable") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d))
Expand Down Expand Up @@ -1170,7 +1170,7 @@ def recoverPartitions(self, tableName: str) -> None:
the partitioned table. After that, it recovers the partitions.

>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="recoverPartitions") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... spark.range(1).selectExpr(
... "id as key", "id as value").write.partitionBy("key").mode("overwrite").save(d)
Expand Down Expand Up @@ -1209,7 +1209,7 @@ def refreshByPath(self, path: str) -> None:
The example below caches a table, and then removes the data.

>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="refreshByPath") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d))
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def writeStream(self) -> DataStreamWriter:
>>> type(df.writeStream)
<class '...streaming.readwriter.DataStreamWriter'>

>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="writeStream") as d:
... # Create a table with Rate source.
... df.writeStream.toTable(
... "my_table", checkpointLocation=d)
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def checkpoint(self, eager: bool = True) -> "DataFrame":
>>> import tempfile
>>> df = spark.createDataFrame([
... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="checkpoint") as d:
... spark.sparkContext.setCheckpointDir("/tmp/bb")
... df.checkpoint(False)
DataFrame[age: bigint, name: string]
Expand Down Expand Up @@ -6766,7 +6766,7 @@ def inputFiles(self) -> List[str]:
Examples
--------
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
>>> with tempfile.TemporaryDirectory(prefix="inputFiles") as d:
... # Write a single-row DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
Expand Down
Loading