Skip to content

Commit

Permalink
Merge branch 'master' of github-amirmor1:amirmor1/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
amirmor1 committed Jan 11, 2025
2 parents e295e18 + 22cbb96 commit 727eb48
Show file tree
Hide file tree
Showing 1,356 changed files with 5,573 additions and 2,031 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ jobs:
- >-
pyspark-sql, pyspark-resource, pyspark-testing
- >-
pyspark-core, pyspark-errors, pyspark-streaming
pyspark-core, pyspark-errors, pyspark-streaming, pyspark-logger
- >-
pyspark-mllib, pyspark-ml, pyspark-ml-connect
- >-
Expand Down Expand Up @@ -743,7 +743,7 @@ jobs:
python-version: '3.11'
- name: Install dependencies for Python CodeGen check
run: |
python3.11 -m pip install 'black==23.9.1' 'protobuf==5.29.1' 'mypy==1.8.0' 'mypy-protobuf==3.3.0'
python3.11 -m pip install 'black==23.12.1' 'protobuf==5.29.1' 'mypy==1.8.0' 'mypy-protobuf==3.3.0'
python3.11 -m pip list
- name: Python CodeGen check for branch-3.5
if: inputs.branch == 'branch-3.5'
Expand Down Expand Up @@ -1094,6 +1094,7 @@ jobs:
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.join.forceApplyShuffledHashJoin=true
- name: Run TPC-DS queries on collated data
if: inputs.branch != 'branch-3.5'
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSCollationQueryTestSuite"
- name: Upload test results to report
Expand Down Expand Up @@ -1225,6 +1226,7 @@ jobs:
- name: Start Minikube
uses: medyagh/setup-minikube@v0.0.18
with:
kubernetes-version: "1.32.0"
# Github Action limit cpu:2, memory: 6947MB, limit to 2U6G for better resource statistic
cpus: 2
memory: 6144m
Expand Down
13 changes: 13 additions & 0 deletions .github/workflows/build_infra_images_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ jobs:
- name: Image digest (PySpark with old dependencies)
if: hashFiles('dev/spark-test-image/python-minimum/Dockerfile') != ''
run: echo ${{ steps.docker_build_pyspark_python_minimum.outputs.digest }}
- name: Build and push (PySpark PS with old dependencies)
if: hashFiles('dev/spark-test-image/python-ps-minimum/Dockerfile') != ''
id: docker_build_pyspark_python_ps_minimum
uses: docker/build-push-action@v6
with:
context: ./dev/spark-test-image/python-ps-minimum/
push: true
tags: ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-ps-minimum-cache:${{ github.ref_name }}-static
cache-from: type=registry,ref=ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-ps-minimum-cache:${{ github.ref_name }}
cache-to: type=registry,ref=ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-ps-minimum-cache:${{ github.ref_name }},mode=max
- name: Image digest (PySpark PS with old dependencies)
if: hashFiles('dev/spark-test-image/python-ps-minimum/Dockerfile') != ''
run: echo ${{ steps.docker_build_pyspark_python_ps_minimum.outputs.digest }}
- name: Build and push (PySpark with PyPy 3.10)
if: hashFiles('dev/spark-test-image/pypy-310/Dockerfile') != ''
id: docker_build_pyspark_pypy_310
Expand Down
47 changes: 47 additions & 0 deletions .github/workflows/build_python_ps_minimum.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Build / Python-only (master, Python PS with old dependencies)"

on:
schedule:
- cron: '0 10 * * *'
workflow_dispatch:

jobs:
run-build:
permissions:
packages: write
name: Run
uses: ./.github/workflows/build_and_test.yml
if: github.repository == 'apache/spark'
with:
java: 17
branch: master
hadoop: hadoop3
envs: >-
{
"PYSPARK_IMAGE_TO_TEST": "python-ps-minimum",
"PYTHON_TO_TEST": "python3.9"
}
jobs: >-
{
"pyspark": "true",
"pyspark-pandas": "true"
}
2 changes: 1 addition & 1 deletion .github/workflows/pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
run: |
pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' \
ipython ipython_genutils sphinx_plotly_directive 'numpy>=1.20.0' pyarrow 'pandas==2.2.3' 'plotly>=4.8' 'docutils<0.18.0' \
'flake8==3.9.0' 'mypy==1.8.0' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' 'black==23.9.1' \
'flake8==3.9.0' 'mypy==1.8.0' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' 'black==23.12.1' \
'pandas-stubs==1.2.0.53' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' \
'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5'
- name: Install Ruby for documentation generation
Expand Down
4 changes: 4 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ io.netty:netty-transport-classes-kqueue
io.netty:netty-transport-native-epoll
io.netty:netty-transport-native-kqueue
io.netty:netty-transport-native-unix-common
io.vertx:vertx-auth-common
io.vertx:vertx-core
io.vertx:vertx-web-client
io.vertx:vertx-web-common
jakarta.inject:jakarta.inject-api
jakarta.validation:jakarta.validation-api
javax.jdo:jdo-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -125,10 +126,11 @@ public static void deleteRecursively(File file, FilenameFilter filter) throws IO
private static void deleteRecursivelyUsingJavaIO(
File file,
FilenameFilter filter) throws IOException {
if (!file.exists()) return;
BasicFileAttributes fileAttributes =
Files.readAttributes(file.toPath(), BasicFileAttributes.class);
if (fileAttributes.isDirectory() && !isSymlink(file)) {
BasicFileAttributes fileAttributes = readFileAttributes(file);
// SPARK-50716: If the file attributes are null, that is, the file attributes cannot be read,
// or if the file does not exist and is not a broken symbolic link, then return directly.
if (fileAttributes == null || (!file.exists() && !fileAttributes.isSymbolicLink())) return;
if (fileAttributes.isDirectory()) {
IOException savedIOException = null;
for (File child : listFilesSafely(file, filter)) {
try {
Expand All @@ -143,8 +145,8 @@ private static void deleteRecursivelyUsingJavaIO(
}
}

// Delete file only when it's a normal file or an empty directory.
if (fileAttributes.isRegularFile() ||
// Delete file only when it's a normal file, a symbolic link, or an empty directory.
if (fileAttributes.isRegularFile() || fileAttributes.isSymbolicLink() ||
(fileAttributes.isDirectory() && listFilesSafely(file, null).length == 0)) {
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
Expand All @@ -154,6 +156,18 @@ private static void deleteRecursivelyUsingJavaIO(
}
}

/**
* Reads basic attributes of a given file, of return null if an I/O error occurs.
*/
private static BasicFileAttributes readFileAttributes(File file) {
try {
return Files.readAttributes(
file.toPath(), BasicFileAttributes.class, LinkOption.NOFOLLOW_LINKS);
} catch (IOException e) {
return null;
}
}

private static void deleteRecursivelyUsingUnixNative(File file) throws IOException {
ProcessBuilder builder = new ProcessBuilder("rm", "-rf", file.getAbsolutePath());
Process process = null;
Expand Down Expand Up @@ -192,17 +206,6 @@ private static File[] listFilesSafely(File file, FilenameFilter filter) throws I
}
}

private static boolean isSymlink(File file) throws IOException {
Objects.requireNonNull(file);
File fileInCanonicalDir = null;
if (file.getParent() == null) {
fileInCanonicalDir = file;
} else {
fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
}
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
}

private static final Map<String, TimeUnit> timeSuffixes;

private static final Map<String, ByteUnit> byteSuffixes;
Expand Down
80 changes: 79 additions & 1 deletion common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,18 @@
},
"sqlState" : "4274K"
},
"DUPLICATE_ROUTINE_PARAMETER_NAMES" : {
"message" : [
"Found duplicate name(s) in the parameter list of the user-defined routine <routineName>: <names>."
],
"sqlState" : "42734"
},
"DUPLICATE_ROUTINE_RETURNS_COLUMNS" : {
"message" : [
"Found duplicate column(s) in the RETURNS clause column list of the user-defined routine <routineName>: <columns>."
],
"sqlState" : "42711"
},
"EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
"message" : [
"Previous node emitted a row with eventTime=<emittedRowEventTime> which is older than current_watermark_value=<currentWatermark>",
Expand Down Expand Up @@ -4146,6 +4158,18 @@
],
"sqlState" : "38000"
},
"RECURSIVE_CTE_IN_LEGACY_MODE" : {
"message" : [
"Recursive definitions cannot be used in legacy CTE precedence mode (spark.sql.legacy.ctePrecedencePolicy=LEGACY)."
],
"sqlState" : "42836"
},
"RECURSIVE_CTE_WHEN_INLINING_IS_FORCED" : {
"message" : [
"Recursive definitions cannot be used when CTE inlining is forced."
],
"sqlState" : "42836"
},
"RECURSIVE_PROTOBUF_SCHEMA" : {
"message" : [
"Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` 1 to 10. Going beyond 10 levels of recursion is not allowed."
Expand Down Expand Up @@ -4683,6 +4707,12 @@
],
"sqlState" : "42P01"
},
"TABLE_VALUED_ARGUMENTS_NOT_YET_IMPLEMENTED_FOR_SQL_FUNCTIONS" : {
"message" : [
"Cannot <action> SQL user-defined function <functionName> with TABLE arguments because this functionality is not yet implemented."
],
"sqlState" : "0A000"
},
"TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON" : {
"message" : [
"Failed to analyze the Python user defined table function: <msg>"
Expand Down Expand Up @@ -5815,6 +5845,54 @@
],
"sqlState" : "42K0E"
},
"USER_DEFINED_FUNCTIONS" : {
"message" : [
"User defined function is invalid:"
],
"subClass" : {
"CANNOT_CONTAIN_COMPLEX_FUNCTIONS" : {
"message" : [
"SQL scalar function cannot contain aggregate/window/generate functions: <queryText>"
]
},
"CANNOT_REPLACE_NON_SQL_UDF_WITH_SQL_UDF" : {
"message" : [
"Cannot replace the non-SQL function <name> with a SQL function."
]
},
"NOT_A_VALID_DEFAULT_EXPRESSION" : {
"message" : [
"The DEFAULT expression of `<functionName>`.`<parameterName>` is not supported because it contains a subquery."
]
},
"NOT_A_VALID_DEFAULT_PARAMETER_POSITION" : {
"message" : [
"In routine `<functionName>` parameter `<parameterName>` with DEFAULT must not be followed by parameter `<nextParameterName>` without DEFAULT."
]
},
"NOT_NULL_ON_FUNCTION_PARAMETERS" : {
"message" : [
"Cannot specify NOT NULL on function parameters: <input>"
]
},
"RETURN_COLUMN_COUNT_MISMATCH" : {
"message" : [
"The number of columns produced by the RETURN clause (num: `<outputSize>`) does not match the number of column names specified by the RETURNS clause (num: `<returnParamSize>`) of <name>."
]
},
"SQL_TABLE_UDF_BODY_MUST_BE_A_QUERY" : {
"message" : [
"SQL table function <name> body must be a query."
]
},
"SQL_TABLE_UDF_MISSING_COLUMN_NAMES" : {
"message" : [
"The relation returned by the query in the CREATE FUNCTION statement for <functionName> with RETURNS TABLE clause lacks explicit names for one or more output columns; please rewrite the function body to provide explicit column names or add column names to the RETURNS TABLE clause, and re-run the command."
]
}
},
"sqlState" : "42601"
},
"USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down Expand Up @@ -5935,7 +6013,7 @@
},
"XML_ROW_TAG_MISSING" : {
"message" : [
"<rowTag> option is required for reading files in XML format."
"<rowTag> option is required for reading/writing files in XML format."
],
"sqlState" : "42KDF"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{agnosticEncoderFor, ProductEncoder}
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.common.UdfUtils
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, UdfUtils}
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.toExpr
Expand Down Expand Up @@ -502,18 +502,20 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
}

val outputEncoder = agnosticEncoderFor[U]
val stateEncoder = agnosticEncoderFor[S]
val nf = UDFAdaptors.flatMapGroupsWithStateWithMappedValues(func, valueMapFunc)

sparkSession.newDataset[U](outputEncoder) { builder =>
val groupMapBuilder = builder.getGroupMapBuilder
groupMapBuilder
.setInput(plan.getRoot)
.addAllGroupingExpressions(groupingExprs)
.setFunc(getUdf(nf, outputEncoder)(ivEncoder))
.setFunc(getUdf(nf, outputEncoder, stateEncoder)(ivEncoder))
.setIsMapGroupsWithState(isMapGroupWithState)
.setOutputMode(if (outputMode.isEmpty) OutputMode.Update.toString
else outputMode.get.toString)
.setTimeoutConf(timeoutConf.toString)
.setStateSchema(DataTypeProtoConverter.toConnectProtoType(stateEncoder.schema))

if (initialStateImpl != null) {
groupMapBuilder
Expand All @@ -533,6 +535,21 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
udf.apply(inputEncoders.map(_ => col("*")): _*).expr.getCommonInlineUserDefinedFunction
}

private def getUdf[U: Encoder, S: Encoder](
nf: AnyRef,
outputEncoder: AgnosticEncoder[U],
stateEncoder: AgnosticEncoder[S])(
inEncoders: AgnosticEncoder[_]*): proto.CommonInlineUserDefinedFunction = {
// Apply keyAs changes by setting kEncoder
// Add the state encoder to the inputEncoders.
val inputEncoders = kEncoder +: stateEncoder +: inEncoders
val udf = SparkUserDefinedFunction(
function = nf,
inputEncoders = inputEncoders,
outputEncoder = outputEncoder)
udf.apply(inputEncoders.map(_ => col("*")): _*).expr.getCommonInlineUserDefinedFunction
}

/**
* We cannot deserialize a connect [[KeyValueGroupedDataset]] because of a class clash on the
* server side. We null out the instance for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,19 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {
.setColName(regex)
planId.foreach(b.setPlanId)

case UnresolvedFunction(functionName, arguments, isDistinct, isUserDefinedFunction, _, _) =>
// TODO(SPARK-49087) use internal namespace.
case UnresolvedFunction(
functionName,
arguments,
isDistinct,
isUserDefinedFunction,
isInternal,
_) =>
builder.getUnresolvedFunctionBuilder
.setFunctionName(functionName)
.setIsUserDefinedFunction(isUserDefinedFunction)
.setIsDistinct(isDistinct)
.addAllArguments(arguments.map(apply(_, e)).asJava)
.setIsInternal(isInternal)

case Alias(child, name, metadata, _) =>
val b = builder.getAliasBuilder.setExpr(apply(child, e))
Expand Down Expand Up @@ -156,6 +162,7 @@ object ColumnNodeToProtoConverter extends (ColumnNode => proto.Expression) {
case CaseWhenOtherwise(branches, otherwise, _) =>
val b = builder.getUnresolvedFunctionBuilder
.setFunctionName("when")
.setIsInternal(false)
branches.foreach { case (condition, value) =>
b.addArguments(apply(condition, e))
b.addArguments(apply(value, e))
Expand Down
Loading

0 comments on commit 727eb48

Please sign in to comment.