diff --git a/dev/connect-check-protos.py b/dev/connect-check-protos.py index 513938f8d4f8e..ffc74d7b16082 100755 --- a/dev/connect-check-protos.py +++ b/dev/connect-check-protos.py @@ -45,7 +45,7 @@ def run_cmd(cmd): def check_connect_protos(): print("Start checking the generated codes in pyspark-connect.") - with tempfile.TemporaryDirectory() as tmp: + with tempfile.TemporaryDirectory(prefix="check_connect_protos") as tmp: run_cmd(f"{SPARK_HOME}/dev/connect-gen-protos.sh {tmp}") result = filecmp.dircmp( f"{SPARK_HOME}/python/pyspark/sql/connect/proto/", diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 1f2b3263245ab..a5a68d7797816 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -174,7 +174,7 @@ def dump(self, value: T, f: BinaryIO) -> None: Write a pickled representation of `b` to the open temp file. - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="dump") as d: ... path = os.path.join(d, "test.txt") ... with open(path, "wb") as f: ... b.dump(b.value, f) @@ -215,7 +215,7 @@ def load_from_path(self, path: str) -> T: Read the pickled representation of value from temp file. - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="load_from_path") as d: ... path = os.path.join(d, "test.txt") ... with open(path, "wb") as f: ... b.dump(b.value, f) @@ -250,7 +250,7 @@ def load(self, file: BinaryIO) -> T: Read the pickled representation of value from the open temp file. - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="load") as d: ... path = os.path.join(d, "test.txt") ... with open(path, "wb") as f: ... b.dump(b.value, f) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 19d3608c3825b..bcc9fbf935bac 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -899,7 +899,7 @@ def pickleFile(self, name: str, minPartitions: Optional[int] = None) -> RDD[Any] -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="pickleFile") as d: ... # Write a temporary pickled file ... path1 = os.path.join(d, "pickled1") ... sc.parallelize(range(10)).saveAsPickleFile(path1, 3) @@ -962,7 +962,7 @@ def textFile( -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="textFile") as d: ... path1 = os.path.join(d, "text1") ... path2 = os.path.join(d, "text2") ... @@ -1052,7 +1052,7 @@ def wholeTextFiles( -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="wholeTextFiles") as d: ... # Write a temporary text file ... with open(os.path.join(d, "1.txt"), "w") as f: ... _ = f.write("123") @@ -1107,7 +1107,7 @@ def binaryFiles(self, path: str, minPartitions: Optional[int] = None) -> RDD[Tup -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="binaryFiles") as d: ... # Write a temporary binary file ... with open(os.path.join(d, "1.bin"), "wb") as f1: ... _ = f1.write(b"binary data I") @@ -1156,7 +1156,7 @@ def binaryRecords(self, path: str, recordLength: int) -> RDD[bytes]: -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="binaryRecords") as d: ... # Write a temporary file ... with open(os.path.join(d, "1.bin"), "w") as f: ... for i in range(3): @@ -1247,7 +1247,7 @@ def sequenceFile( >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="sequenceFile") as d: ... path = os.path.join(d, "hadoop_file") ... ... # Write a temporary Hadoop file @@ -1345,7 +1345,7 @@ def newAPIHadoopFile( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopFile") as d: ... path = os.path.join(d, "new_hadoop_file") ... ... # Write a temporary Hadoop file @@ -1437,7 +1437,7 @@ def newAPIHadoopRDD( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopRDD") as d: ... path = os.path.join(d, "new_hadoop_file") ... ... # Create the conf for writing @@ -1544,7 +1544,7 @@ def hadoopFile( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="hadoopFile") as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Write a temporary Hadoop file @@ -1634,7 +1634,7 @@ def hadoopRDD( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="hadoopRDD") as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Create the conf for writing @@ -1694,7 +1694,7 @@ def union(self, rdds: List[RDD[T]]) -> RDD[T]: -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="union") as d: ... # generate a text RDD ... with open(os.path.join(d, "union-text.txt"), "w") as f: ... _ = f.write("Hello") @@ -1860,7 +1860,7 @@ def addFile(self, path: str, recursive: bool = False) -> None: >>> import tempfile >>> from pyspark import SparkFiles - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="addFile") as d: ... path1 = os.path.join(d, "test1.txt") ... with open(path1, "w") as f: ... _ = f.write("100") @@ -1984,7 +1984,7 @@ def addArchive(self, path: str) -> None: >>> import zipfile >>> from pyspark import SparkFiles - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="addArchive") as d: ... path = os.path.join(d, "test.txt") ... with open(path, "w") as f: ... _ = f.write("100") diff --git a/python/pyspark/files.py b/python/pyspark/files.py index 8044cf48a3f5d..92130389d9753 100644 --- a/python/pyspark/files.py +++ b/python/pyspark/files.py @@ -73,7 +73,7 @@ def get(cls, filename: str) -> str: >>> import tempfile >>> from pyspark import SparkFiles - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="get") as d: ... path1 = os.path.join(d, "test.txt") ... with open(path1, "w") as f: ... _ = f.write("100") diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py index 26eb4230df35d..db9a298048085 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py @@ -132,7 +132,7 @@ def test_multi_classes_logistic_regression(self): self._check_result(local_transform_result, expected_predictions, expected_probabilities) def test_save_load(self): - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_save_load") as tmp_dir: estimator = LORV2(maxIter=2, numTrainWorkers=2, learningRate=0.001) local_path = os.path.join(tmp_dir, "estimator") estimator.saveToLocal(local_path) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py b/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py index 6a6d6b183d1b5..ae01031ff462c 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py @@ -87,7 +87,7 @@ def test_regressor_evaluator(self): np.testing.assert_almost_equal(r2_local, expected_r2) # Test save / load - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_regressor_evaluator") as tmp_dir: r2_evaluator.saveToLocal(f"{tmp_dir}/ev") loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev") assert loaded_evaluator.getMetricName() == "r2" @@ -133,7 +133,7 @@ def test_binary_classifier_evaluator(self): np.testing.assert_almost_equal(auprc_local, expected_auprc, decimal=2) # Test save / load - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_binary_classifier_evaluator") as tmp_dir: auprc_evaluator.saveToLocal(f"{tmp_dir}/ev") loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev") assert loaded_evaluator.getMetricName() == "areaUnderPR" @@ -170,7 +170,7 @@ def test_multiclass_classifier_evaluator(self): np.testing.assert_almost_equal(accuracy_local, expected_accuracy, decimal=2) # Test save / load - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_multiclass_classifier_evaluator") as tmp_dir: accuracy_evaluator.saveToLocal(f"{tmp_dir}/ev") loaded_evaluator = RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev") assert loaded_evaluator.getMetricName() == "accuracy" diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py index 8e07879d8a268..9565b3a09a5bc 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py @@ -69,7 +69,7 @@ def test_max_abs_scaler(self): np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result) - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_max_abs_scaler") as tmp_dir: estimator_path = os.path.join(tmp_dir, "estimator") scaler.saveToLocal(estimator_path) loaded_scaler = MaxAbsScaler.loadFromLocal(estimator_path) @@ -124,7 +124,7 @@ def test_standard_scaler(self): np.testing.assert_allclose(list(local_transform_result.scaled_features), expected_result) - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_standard_scaler") as tmp_dir: estimator_path = os.path.join(tmp_dir, "estimator") scaler.saveToLocal(estimator_path) loaded_scaler = StandardScaler.loadFromLocal(estimator_path) @@ -176,7 +176,7 @@ def test_array_assembler(self): result2[1][1] = np.nan np.testing.assert_allclose(result2, expected_result) - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_array_assembler") as tmp_dir: save_path = os.path.join(tmp_dir, "assembler") assembler1.saveToLocal(save_path) loaded_assembler = ArrayAssembler.loadFromLocal(save_path) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py index 0fd0fd63ffcef..104aff17e0b27 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py @@ -91,7 +91,7 @@ def test_pipeline(self): pd.testing.assert_frame_equal(local_eval_dataset, local_eval_dataset_copy) self._check_result(local_transform_result2, expected_predictions, expected_probabilities) - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_pipeline") as tmp_dir: pipeline_local_path = os.path.join(tmp_dir, "pipeline") pipeline.saveToLocal(pipeline_local_path) loaded_pipeline = Pipeline.loadFromLocal(pipeline_local_path) diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py index 69e33a36d4f3b..7f26788c137f8 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py @@ -229,7 +229,7 @@ def _verify_cv_saved_params(instance, loaded_instance): assert instance.getEstimatorParamMaps() == loaded_instance.getEstimatorParamMaps() # Test save / load - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_crossvalidator_on_pipeline") as tmp_dir: cv.saveToLocal(f"{tmp_dir}/cv") loaded_cv = CrossValidator.loadFromLocal(f"{tmp_dir}/cv") diff --git a/python/pyspark/ml/tests/test_als.py b/python/pyspark/ml/tests/test_als.py index 8eec0d937768a..3027b3ab9fd6a 100644 --- a/python/pyspark/ml/tests/test_als.py +++ b/python/pyspark/ml/tests/test_als.py @@ -39,7 +39,7 @@ def test_ambiguous_column(self): seed=42, ).fit(data) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_ambiguous_column") as d: model.write().overwrite().save(d) loaded_model = ALSModel().load(d) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bbc3432980ecb..cb5f1746b1183 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2970,7 +2970,7 @@ def saveAsNewAPIHadoopDataset( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopDataset") as d: ... path = os.path.join(d, "new_hadoop_file") ... ... # Create the conf for writing @@ -3059,7 +3059,7 @@ def saveAsNewAPIHadoopFile( >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopFile") as d: ... path = os.path.join(d, "hadoop_file") ... ... # Write a temporary Hadoop file @@ -3129,7 +3129,7 @@ def saveAsHadoopDataset( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopDataset") as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Create the conf for writing @@ -3224,7 +3224,7 @@ def saveAsHadoopFile( >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopFile") as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Write a temporary Hadoop file @@ -3290,7 +3290,7 @@ def saveAsSequenceFile( Set the related classes - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsSequenceFile") as d: ... path = os.path.join(d, "sequence_file") ... ... # Write a temporary sequence file @@ -3332,7 +3332,7 @@ def saveAsPickleFile(self, path: str, batchSize: int = 10) -> None: -------- >>> import os >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="saveAsPickleFile") as d: ... path = os.path.join(d, "pickle_file") ... ... # Write a temporary pickled file @@ -3374,7 +3374,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None) >>> import tempfile >>> from fileinput import input >>> from glob import glob - >>> with tempfile.TemporaryDirectory() as d1: + >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile1") as d1: ... path1 = os.path.join(d1, "text_file1") ... ... # Write a temporary text file @@ -3386,7 +3386,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None) Empty lines are tolerated when saving to text files. - >>> with tempfile.TemporaryDirectory() as d2: + >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile2") as d2: ... path2 = os.path.join(d2, "text2_file2") ... ... # Write another temporary text file @@ -3399,7 +3399,7 @@ def saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None) Using compressionCodecClass >>> from fileinput import input, hook_compressed - >>> with tempfile.TemporaryDirectory() as d3: + >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile3") as d3: ... path3 = os.path.join(d3, "text3") ... codec = "org.apache.hadoop.io.compress.GzipCodec" ... diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 6595659a4daea..d70bd89baedab 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -836,7 +836,7 @@ def createTable( Creating an external table >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="createTable") as d: ... _ = spark.catalog.createTable( ... "tbl2", schema=spark.range(1).schema, path=d, source='parquet') >>> _ = spark.sql("DROP TABLE tbl2") @@ -1119,7 +1119,7 @@ def refreshTable(self, tableName: str) -> None: The example below caches a table, and then removes the data. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="refreshTable") as d: ... _ = spark.sql("DROP TABLE IF EXISTS tbl1") ... _ = spark.sql( ... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d)) @@ -1170,7 +1170,7 @@ def recoverPartitions(self, tableName: str) -> None: the partitioned table. After that, it recovers the partitions. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="recoverPartitions") as d: ... _ = spark.sql("DROP TABLE IF EXISTS tbl1") ... spark.range(1).selectExpr( ... "id as key", "id as value").write.partitionBy("key").mode("overwrite").save(d) @@ -1209,7 +1209,7 @@ def refreshByPath(self, path: str) -> None: The example below caches a table, and then removes the data. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="refreshByPath") as d: ... _ = spark.sql("DROP TABLE IF EXISTS tbl1") ... _ = spark.sql( ... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION '{}'".format(d)) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index dad031fc2625c..5c5c263b8156e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -586,7 +586,7 @@ def writeStream(self) -> DataStreamWriter: >>> type(df.writeStream) - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="writeStream") as d: ... # Create a table with Rate source. ... df.writeStream.toTable( ... "my_table", checkpointLocation=d) @@ -1139,7 +1139,7 @@ def checkpoint(self, eager: bool = True) -> "DataFrame": >>> import tempfile >>> df = spark.createDataFrame([ ... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="checkpoint") as d: ... spark.sparkContext.setCheckpointDir("/tmp/bb") ... df.checkpoint(False) DataFrame[age: bigint, name: string] @@ -6766,7 +6766,7 @@ def inputFiles(self) -> List[str]: Examples -------- >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="inputFiles") as d: ... # Write a single-row DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] diff --git a/python/pyspark/sql/protobuf/functions.py b/python/pyspark/sql/protobuf/functions.py index acb1a17efbd65..5a99aed55f744 100644 --- a/python/pyspark/sql/protobuf/functions.py +++ b/python/pyspark/sql/protobuf/functions.py @@ -94,7 +94,7 @@ def from_protobuf( ... '26F746F33') >>> # Writing a protobuf description into a file, generated by using >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file - >>> with tempfile.TemporaryDirectory() as tmp_dir: + >>> with tempfile.TemporaryDirectory(prefix="from_protobuf") as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) @@ -224,7 +224,7 @@ def to_protobuf( ... '26F746F33') >>> # Writing a protobuf description into a file, generated by using >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file - >>> with tempfile.TemporaryDirectory() as tmp_dir: + >>> with tempfile.TemporaryDirectory(prefix="to_protobuf") as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index db9220fc48bb3..9eb5d99dfa4c8 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -96,7 +96,7 @@ def format(self, source: str) -> "DataFrameReader": Write a DataFrame into a JSON file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="format") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -139,7 +139,7 @@ def schema(self, schema: Union[StructType, str]) -> "DataFrameReader": Specify the schema with reading a CSV file. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="schema") as d: ... spark.read.schema("col0 INT, col1 DOUBLE").format("csv").load(d).printSchema() root |-- col0: integer (nullable = true) @@ -187,7 +187,7 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameReader": Specify the option 'nullValue' with reading a CSV file. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="option") as d: ... # Write a DataFrame into a CSV file ... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]) ... df.write.mode("overwrite").format("csv").save(d) @@ -231,7 +231,7 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataFrameReader": Specify the option 'nullValue' and 'header' with reading a CSV file. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="options") as d: ... # Write a DataFrame into a CSV file with a header. ... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]) ... df.write.option("header", True).mode("overwrite").format("csv").save(d) @@ -283,7 +283,7 @@ def load( Load a CSV file with format, schema and options specified. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="load") as d: ... # Write a DataFrame into a CSV file with a header ... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]) ... df.write.option("header", True).mode("overwrite").format("csv").save(d) @@ -383,7 +383,7 @@ def json( Example 1: Write a DataFrame into a JSON file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="json1") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin"}] @@ -399,8 +399,8 @@ def json( Example 2: Read JSON from multiple files in a directory - >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: + >>> from tempfile import TemporaryDirectory + >>> with TemporaryDirectory(prefix="json2") as d1, TemporaryDirectory(prefix="json3") as d2: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 30, "name": "Bob"}] @@ -421,7 +421,7 @@ def json( Example 3: Read JSON with a custom schema >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="json4") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 30, "name": "Bob"}] @@ -564,7 +564,7 @@ def parquet(self, *paths: str, **options: "OptionalPrimitiveType") -> "DataFrame Write a DataFrame into a Parquet file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="parquet1") as d: ... # Write a DataFrame into a Parquet file. ... df.write.mode("overwrite").format("parquet").save(d) ... @@ -580,7 +580,7 @@ def parquet(self, *paths: str, **options: "OptionalPrimitiveType") -> "DataFrame Read a Parquet file with a specific column. - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="parquet2") as d: ... df.write.mode("overwrite").format("parquet").save(d) ... ... # Read the Parquet file with only the 'name' column. @@ -595,15 +595,16 @@ def parquet(self, *paths: str, **options: "OptionalPrimitiveType") -> "DataFrame Read multiple Parquet files and merge schema. - >>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: - ... df.write.mode("overwrite").format("parquet").save(d1) - ... df2.write.mode("overwrite").format("parquet").save(d2) + >>> with tempfile.TemporaryDirectory(prefix="parquet3") as d1: + ... with tempfile.TemporaryDirectory(prefix="parquet4") as d2: + ... df.write.mode("overwrite").format("parquet").save(d1) + ... df2.write.mode("overwrite").format("parquet").save(d2) ... - ... spark.read.option( - ... "mergeSchema", "true" - ... ).parquet(d1, d2).select( - ... "name", "age", "height" - ... ).orderBy("name", "age").show() + ... spark.read.option( + ... "mergeSchema", "true" + ... ).parquet(d1, d2).select( + ... "name", "age", "height" + ... ).orderBy("name", "age").show() +-----+----+------+ | name| age|height| +-----+----+------+ @@ -675,7 +676,7 @@ def text( Write a DataFrame into a text file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="text") as d: ... # Write a DataFrame into a text file ... df = spark.createDataFrame([("a",), ("b",), ("c",)], schema=["alphabets"]) ... df.write.mode("overwrite").format("text").save(d) @@ -775,7 +776,7 @@ def csv( Write a DataFrame into a CSV file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="csv") as d: ... # Write a DataFrame into a CSV file ... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]) ... df.write.mode("overwrite").format("csv").save(d) @@ -911,7 +912,7 @@ def xml( Write a DataFrame into a XML file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="xml") as d: ... # Write a DataFrame into a XML file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1015,7 +1016,7 @@ def orc( Write a DataFrame into a ORC file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="orc") as d: ... # Write a DataFrame into a ORC file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1201,7 +1202,7 @@ def mode(self, saveMode: Optional[str]) -> "DataFrameWriter": Raise an error when writing to an existing path. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="mode1") as d: ... spark.createDataFrame( ... [{"age": 80, "name": "Xinrong Meng"}] ... ).write.mode("error").format("parquet").save(d) # doctest: +SKIP @@ -1211,7 +1212,7 @@ def mode(self, saveMode: Optional[str]) -> "DataFrameWriter": Write a Parquet file back with various options, and read it back. - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="mode2") as d: ... # Overwrite the path with a new Parquet file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1263,7 +1264,7 @@ def format(self, source: str) -> "DataFrameWriter": Write a DataFrame into a Parquet file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="format") as d: ... # Write a DataFrame into a Parquet file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1304,7 +1305,7 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriter": Specify the option 'nullValue' with writing a CSV file. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="option") as d: ... # Write a DataFrame into a CSV file with 'nullValue' option set to 'Hyukjin Kwon'. ... df = spark.createDataFrame([(100, None)], "age INT, name STRING") ... df.write.option("nullValue", "Hyukjin Kwon").mode("overwrite").format("csv").save(d) @@ -1353,7 +1354,7 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriter": ... StructField("name",StringType(),True), ... ]) >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="options") as d: ... # Write a DataFrame into a CSV file with 'nullValue' option set to 'Hyukjin Kwon', ... # and 'header' option set to `True`. ... df = spark.createDataFrame([(100, None)], schema=schema) @@ -1402,7 +1403,7 @@ def partitionBy(self, *cols: Union[str, List[str]]) -> "DataFrameWriter": >>> import tempfile >>> import os - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="partitionBy") as d: ... # Write a DataFrame into a Parquet file in a partitioned manner. ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}, {"age": 120, "name": "Ruifeng Zheng"}] @@ -1656,7 +1657,7 @@ def save( Write a DataFrame into a JSON file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="save") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1850,7 +1851,7 @@ def json( Write a DataFrame into a JSON file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="json") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1918,7 +1919,7 @@ def parquet( Write a DataFrame into a Parquet file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="parquet") as d: ... # Write a DataFrame into a Parquet file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -1973,7 +1974,7 @@ def text( Write a DataFrame into a text file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="text") as d: ... # Write a DataFrame into a text file ... df = spark.createDataFrame([("a",), ("b",), ("c",)], schema=["alphabets"]) ... df.write.mode("overwrite").text(d) @@ -2046,7 +2047,7 @@ def csv( Write a DataFrame into a CSV file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="csv") as d: ... # Write a DataFrame into a CSV file ... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]) ... df.write.csv(d, mode="overwrite") @@ -2129,7 +2130,7 @@ def xml( Write a DataFrame into a XML file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="xml") as d: ... # Write a DataFrame into a XML file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -2203,7 +2204,7 @@ def orc( Write a DataFrame into a ORC file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="orc") as d: ... # Write a DataFrame into a ORC file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 0a19921f7286a..c70a28f58ecaa 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1748,7 +1748,7 @@ def read(self) -> DataFrameReader: Write a DataFrame into a JSON file and read it back. >>> import tempfile - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="read") as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index 01441ee77ac16..41a83355ab6ca 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -109,7 +109,7 @@ def format(self, source: str) -> "DataStreamReader": >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="format") as d: ... # Write a temporary text file to read it. ... spark.createDataFrame( ... [("hello",), ("this",)]).write.mode("overwrite").format("text").save(d) @@ -156,7 +156,7 @@ def schema(self, schema: Union[StructType, str]) -> "DataStreamReader": >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="schema") as d: ... # Start a streaming query to read the CSV file. ... spark.readStream.schema("col0 INT, col1 STRING").format("csv").load(d).printSchema() root @@ -280,7 +280,7 @@ def load( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="load") as d: ... # Write a temporary JSON file to read it. ... spark.createDataFrame( ... [(100, "Hyukjin Kwon"),], ["age", "name"] @@ -375,7 +375,7 @@ def json( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="json") as d: ... # Write a temporary JSON file to read it. ... spark.createDataFrame( ... [(100, "Hyukjin Kwon"),], ["age", "name"] @@ -448,7 +448,7 @@ def orc( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="orc") as d: ... # Write a temporary ORC file to read it. ... spark.range(10).write.mode("overwrite").format("orc").save(d) ... @@ -507,7 +507,7 @@ def parquet( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="parquet") as d: ... # Write a temporary Parquet file to read it. ... spark.range(10).write.mode("overwrite").format("parquet").save(d) ... @@ -577,7 +577,7 @@ def text( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="text") as d: ... # Write a temporary text file to read it. ... spark.createDataFrame( ... [("hello",), ("this",)]).write.mode("overwrite").format("text").save(d) @@ -673,7 +673,7 @@ def csv( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="csv") as d: ... # Write a temporary text file to read it. ... spark.createDataFrame([(1, "2"),]).write.mode("overwrite").format("csv").save(d) ... @@ -781,7 +781,7 @@ def xml( >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="xml") as d: ... # Write a DataFrame into a XML file ... spark.createDataFrame( ... [{"age": 100, "name": "Hyukjin Kwon"}] @@ -852,7 +852,7 @@ def table(self, tableName: str) -> "DataFrame": >>> import tempfile >>> import time >>> _ = spark.sql("DROP TABLE IF EXISTS my_table") - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="table") as d: ... # Create a table with Rate source. ... q1 = spark.readStream.format("rate").load().writeStream.toTable( ... "my_table", checkpointLocation=d) @@ -984,12 +984,13 @@ def format(self, source: str) -> "DataStreamWriter": >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d, tempfile.TemporaryDirectory() as cp: - ... df = spark.readStream.format("rate").load() - ... q = df.writeStream.format("csv").option("checkpointLocation", cp).start(d) - ... time.sleep(5) - ... q.stop() - ... spark.read.schema("timestamp TIMESTAMP, value STRING").csv(d).show() + >>> with tempfile.TemporaryDirectory(prefix="format1") as d: + ... with tempfile.TemporaryDirectory(prefix="format2") as cp: + ... df = spark.readStream.format("rate").load() + ... q = df.writeStream.format("csv").option("checkpointLocation", cp).start(d) + ... time.sleep(5) + ... q.stop() + ... spark.read.schema("timestamp TIMESTAMP, value STRING").csv(d).show() +...---------+-----+ |...timestamp|value| +...---------+-----+ @@ -1104,13 +1105,14 @@ def partitionBy(self, *cols: str) -> "DataStreamWriter": # type: ignore[misc] >>> import tempfile >>> import time - >>> with tempfile.TemporaryDirectory() as d, tempfile.TemporaryDirectory() as cp: - ... df = spark.readStream.format("rate").option("rowsPerSecond", 10).load() - ... q = df.writeStream.partitionBy( - ... "timestamp").format("parquet").option("checkpointLocation", cp).start(d) - ... time.sleep(5) - ... q.stop() - ... spark.read.schema(df.schema).parquet(d).show() + >>> with tempfile.TemporaryDirectory(prefix="partitionBy1") as d: + ... with tempfile.TemporaryDirectory(prefix="partitionBy2") as cp: + ... df = spark.readStream.format("rate").option("rowsPerSecond", 10).load() + ... q = df.writeStream.partitionBy( + ... "timestamp").format("parquet").option("checkpointLocation", cp).start(d) + ... time.sleep(5) + ... q.stop() + ... spark.read.schema(df.schema).parquet(d).show() +...---------+-----+ |...timestamp|value| +...---------+-----+ @@ -1703,7 +1705,7 @@ def toTable( >>> import tempfile >>> import time >>> _ = spark.sql("DROP TABLE IF EXISTS my_table2") - >>> with tempfile.TemporaryDirectory() as d: + >>> with tempfile.TemporaryDirectory(prefix="toTable") as d: ... # Create a table with Rate source. ... q = spark.readStream.format("rate").option( ... "rowsPerSecond", 10).load().writeStream.toTable( diff --git a/python/pyspark/sql/tests/connect/client/test_artifact.py b/python/pyspark/sql/tests/connect/client/test_artifact.py index eca6e7de76349..f1cbf637b92a0 100644 --- a/python/pyspark/sql/tests/connect/client/test_artifact.py +++ b/python/pyspark/sql/tests/connect/client/test_artifact.py @@ -34,7 +34,7 @@ class ArtifactTestsMixin: def check_add_pyfile(self, spark_session): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="check_add_pyfile") as d: pyfile_path = os.path.join(d, "my_pyfile.py") with open(pyfile_path, "w") as f: f.write("my_func = lambda: 10") @@ -60,7 +60,7 @@ def test_add_pyfile(self): ) def test_artifacts_cannot_be_overwritten(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_artifacts_cannot_be_overwritten") as d: pyfile_path = os.path.join(d, "my_pyfile.py") with open(pyfile_path, "w+") as f: f.write("my_func = lambda: 10") @@ -79,7 +79,7 @@ def test_artifacts_cannot_be_overwritten(self): self.spark.addArtifacts(pyfile_path, pyfile=True) def check_add_zipped_package(self, spark_session): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="check_add_zipped_package") as d: package_path = os.path.join(d, "my_zipfile") os.mkdir(package_path) pyfile_path = os.path.join(package_path, "__init__.py") @@ -108,7 +108,7 @@ def test_add_zipped_package(self): ) def check_add_archive(self, spark_session): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="check_add_archive") as d: archive_path = os.path.join(d, "my_archive") os.mkdir(archive_path) pyfile_path = os.path.join(archive_path, "my_file.txt") @@ -144,7 +144,7 @@ def test_add_archive(self): ) def check_add_file(self, spark_session): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="check_add_file") as d: file_path = os.path.join(d, "my_file.txt") with open(file_path, "w") as f: f.write("Hello world!!") @@ -393,8 +393,8 @@ def test_single_chunked_and_chunked_artifact(self): self.assertEqual(artifact2.data.data, data) def test_copy_from_local_to_fs(self): - with tempfile.TemporaryDirectory() as d: - with tempfile.TemporaryDirectory() as d2: + with tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs1") as d: + with tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs2") as d2: file_path = os.path.join(d, "file1") dest_path = os.path.join(d2, "file1_dest") file_content = "test_copy_from_local_to_FS" @@ -417,7 +417,7 @@ def test_cache_artifact(self): self.assertEqual(self.artifact_manager.is_cached_artifact(expected_hash), True) def test_add_not_existing_artifact(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_add_not_existing_artifact") as d: with self.assertRaises(FileNotFoundError): self.artifact_manager.add_artifacts( os.path.join(d, "not_existing"), file=True, pyfile=False, archive=False diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 3f5484f48764d..88c4754029b6c 100755 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -291,7 +291,7 @@ def test_simple_read(self): self.assertEqual(len(data.index), 10) def test_json(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_json") as d: # Write a DataFrame into a JSON file self.spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]).write.mode( "overwrite" @@ -376,7 +376,7 @@ def test_xml(self): def test_parquet(self): # SPARK-41445: Implement DataFrameReader.parquet - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_parquet") as d: # Write a DataFrame into a JSON file self.spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}]).write.mode( "overwrite" @@ -388,7 +388,7 @@ def test_parquet(self): def test_text(self): # SPARK-41849: Implement DataFrameReader.text - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_text") as d: # Write a DataFrame into a text file self.spark.createDataFrame( [{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}] @@ -398,7 +398,7 @@ def test_text(self): def test_csv(self): # SPARK-42011: Implement DataFrameReader.csv - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_csv") as d: # Write a DataFrame into a text file self.spark.createDataFrame( [{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}] @@ -409,7 +409,7 @@ def test_csv(self): def test_multi_paths(self): # SPARK-42041: DataFrameReader should support list of paths - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_multi_paths1") as d: text_files = [] for i in range(0, 3): text_file = f"{d}/text-{i}.text" @@ -421,7 +421,7 @@ def test_multi_paths(self): self.spark.read.text(text_files).collect(), ) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_multi_paths2") as d: json_files = [] for i in range(0, 5): json_file = f"{d}/json-{i}.json" @@ -435,7 +435,7 @@ def test_multi_paths(self): def test_orc(self): # SPARK-42012: Implement DataFrameReader.orc - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_orc") as d: # Write a DataFrame into a text file self.spark.createDataFrame( [{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}] @@ -2474,7 +2474,7 @@ def test_subtract(self): ) def test_write_operations(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_write_operations") as d: df = self.connect.range(50) df.write.mode("overwrite").format("csv").save(d) @@ -2483,7 +2483,7 @@ def test_write_operations(self): cd = ndf.collect() self.assertEqual(set(df.collect()), set(cd)) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_write_operations") as d: df = self.connect.range(50) df.write.mode("overwrite").csv(d, lineSep="|") @@ -3120,7 +3120,7 @@ def test_simple_udt(self): def test_simple_udt_from_read(self): from pyspark.ml.linalg import Matrices, Vectors - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_simple_udt_from_read") as d: path1 = f"{d}/df1.parquet" self.spark.createDataFrame( [(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 31486feae156a..abfacdbbf059b 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -358,7 +358,7 @@ def test_streaming_read_from_table(self): ) def test_streaming_write_to_table(self): - with self.table("output_table"), tempfile.TemporaryDirectory() as tmpdir: + with self.table("output_table"), tempfile.TemporaryDirectory(prefix="to_table") as tmpdir: df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir) self.assertTrue(q.isActive) diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py index 278fbbb2ba510..bc6bfdd2759f2 100644 --- a/python/pyspark/sql/tests/test_catalog.py +++ b/python/pyspark/sql/tests/test_catalog.py @@ -477,7 +477,7 @@ def test_refresh_table(self): import tempfile spark = self.spark - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_refresh_table") as tmp_dir: with self.table("my_tab"): spark.sql( "CREATE TABLE my_tab (col STRING) USING TEXT LOCATION '{}'".format(tmp_dir) diff --git a/python/pyspark/sql/tests/test_python_datasource.py b/python/pyspark/sql/tests/test_python_datasource.py index 6ba4b68b02bac..343f8f2483270 100644 --- a/python/pyspark/sql/tests/test_python_datasource.py +++ b/python/pyspark/sql/tests/test_python_datasource.py @@ -329,14 +329,14 @@ def test_custom_json_data_source_write(self): self.spark.dataSource.register(data_source) input_path = os.path.join(SPARK_HOME, "python/test_support/sql/people.json") df = self.spark.read.json(input_path) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_write") as d: df.write.format("my-json").mode("append").save(d) assertDataFrameEqual(self.spark.read.json(d), self.spark.read.json(input_path)) def test_custom_json_data_source_commit(self): data_source = self._get_test_json_data_source() self.spark.dataSource.register(data_source) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_commit") as d: self.spark.range(0, 5, 1, 3).write.format("my-json").mode("append").save(d) with open(os.path.join(d, "_success.txt"), "r") as file: text = file.read() @@ -345,7 +345,7 @@ def test_custom_json_data_source_commit(self): def test_custom_json_data_source_abort(self): data_source = self._get_test_json_data_source() self.spark.dataSource.register(data_source) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_abort") as d: with self.assertRaises(PythonException): self.spark.range(0, 8, 1, 3).write.format("my-json").mode("append").save(d) with open(os.path.join(d, "_failed.txt"), "r") as file: diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 99684a20af41c..557b4daa85508 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -82,7 +82,7 @@ def test_udf_profiler(self): finally: sys.stdout = old_stdout - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udf_profiler") as d: self.sc.dump_profiles(d) for i, udf_name in enumerate(["add1", "add2", "add1", "add2"]): @@ -185,7 +185,7 @@ def test_perf_profiler_udf(self): with self.trap_stdout() as io_all: self.spark.profile.show(type="perf") - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_perf_profiler_udf") as d: self.spark.profile.dump(d, type="perf") for id in self.profile_results: diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index 41321f556ac66..3a17f7013bd6f 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -408,7 +408,7 @@ def terminate(self): ) def test_udtf_cleanup_with_exception_in_eval(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_cleanup_with_exception_in_eval") as d: path = os.path.join(d, "file.txt") @udtf(returnType="x: int") @@ -437,7 +437,9 @@ def cleanup(self): self.assertEqual(data, "cleanup") def test_udtf_cleanup_with_exception_in_terminate(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory( + prefix="test_udtf_cleanup_with_exception_in_terminate" + ) as d: path = os.path.join(d, "file.txt") @udtf(returnType="x: int") @@ -942,7 +944,7 @@ def upper(s: str): ) def test_udtf_pickle_error(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_pickle_error") as d: file = os.path.join(d, "file.txt") file_obj = open(file, "w") @@ -1715,7 +1717,7 @@ def _add_pyfile(self, path): self.sc.addPyFile(path) def test_udtf_with_analyze_using_pyfile(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_pyfile") as d: pyfile_path = os.path.join(d, "my_pyfile.py") with open(pyfile_path, "w") as f: f.write("my_func = lambda: 'col1'") @@ -1752,7 +1754,7 @@ def terminate(self): assertDataFrameEqual(df, [Row(col1=10), Row(col1=100)]) def test_udtf_with_analyze_using_zipped_package(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_zipped_package") as d: package_path = os.path.join(d, "my_zipfile") os.mkdir(package_path) pyfile_path = os.path.join(package_path, "__init__.py") @@ -1795,7 +1797,7 @@ def _add_archive(self, path): self.sc.addArchive(path) def test_udtf_with_analyze_using_archive(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_archive") as d: archive_path = os.path.join(d, "my_archive") os.mkdir(archive_path) pyfile_path = os.path.join(archive_path, "my_file.txt") @@ -1842,7 +1844,7 @@ def _add_file(self, path): self.sc.addFile(path) def test_udtf_with_analyze_using_file(self): - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_file") as d: file_path = os.path.join(d, "my_file.txt") with open(file_path, "w") as f: f.write("col1") diff --git a/python/pyspark/tests/test_install_spark.py b/python/pyspark/tests/test_install_spark.py index c4188034a1057..dee28af9a407d 100644 --- a/python/pyspark/tests/test_install_spark.py +++ b/python/pyspark/tests/test_install_spark.py @@ -34,7 +34,7 @@ def test_install_spark(self): # the Spark distribution. spark_version, hadoop_version, hive_version = checked_versions("3.0.1", "3", "2.3") - with tempfile.TemporaryDirectory() as tmp_dir: + with tempfile.TemporaryDirectory(prefix="test_install_spark") as tmp_dir: install_spark( dest=tmp_dir, spark_version=spark_version, diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index a7d3fff7887c3..f0abdd03e2437 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -106,7 +106,7 @@ def test_memory_profiler(self): self.sc.show_profiles() self.assertTrue("plus_one" in fake_out.getvalue()) - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_memory_profiler") as d: self.sc.dump_profiles(d) self.assertTrue(f"udf_{id}_memory.txt" in os.listdir(d)) @@ -235,7 +235,7 @@ def test_memory_profiler_udf(self): with self.trap_stdout() as io_all: self.spark.profile.show(type="memory") - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_memory_profiler_udf") as d: self.spark.profile.dump(d, type="memory") for id in self.profile_results: diff --git a/python/pyspark/tests/test_profiler.py b/python/pyspark/tests/test_profiler.py index a12bc99c54aec..09470bd47f7e5 100644 --- a/python/pyspark/tests/test_profiler.py +++ b/python/pyspark/tests/test_profiler.py @@ -54,7 +54,7 @@ def test_profiler(self): self.assertTrue("heavy_foo" in io.getvalue()) sys.stdout = old_stdout - with tempfile.TemporaryDirectory() as d: + with tempfile.TemporaryDirectory(prefix="test_profiler") as d: self.sc.dump_profiles(d) self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) diff --git a/python/pyspark/tests/test_shuffle.py b/python/pyspark/tests/test_shuffle.py index 4fb73607a2e0b..f40bf5ea9d666 100644 --- a/python/pyspark/tests/test_shuffle.py +++ b/python/pyspark/tests/test_shuffle.py @@ -16,7 +16,7 @@ # import random import unittest -import tempfile +from tempfile import TemporaryDirectory import os from py4j.protocol import Py4JJavaError @@ -67,16 +67,16 @@ def test_shuffle_data_with_multiple_locations(self): # SPARK-39179: Test shuffle of data with multiple location also check # shuffle locations get randomized - with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: + with TemporaryDirectory(prefix="shf1") as d1, TemporaryDirectory(prefix="shf2") as d2: original = os.environ.get("SPARK_LOCAL_DIRS", None) - os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 + os.environ["SPARK_LOCAL_DIRS"] = d1 + "," + d2 try: index_of_tempdir1 = [False, False] for idx in range(10): m = ExternalMerger(self.agg, 20) - if m.localdirs[0].startswith(tempdir1): + if m.localdirs[0].startswith(d1): index_of_tempdir1[0] = True - elif m.localdirs[1].startswith(tempdir1): + elif m.localdirs[1].startswith(d1): index_of_tempdir1[1] = True m.mergeValues(self.data) self.assertTrue(m.spills >= 1) diff --git a/python/pyspark/util.py b/python/pyspark/util.py index 4a828d6bfc947..ec9c2489b41e4 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -124,7 +124,7 @@ def try_simplify_traceback(tb: TracebackType) -> Optional[TracebackType]: >>> import sys >>> import traceback >>> import tempfile - >>> with tempfile.TemporaryDirectory() as tmp_dir: + >>> with tempfile.TemporaryDirectory(prefix="try_simplify_traceback") as tmp_dir: ... with open("%s/dummy_module.py" % tmp_dir, "w") as f: ... _ = f.write( ... 'def raise_stop_iteration():\\n'