Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udf_apply_feature_dataframe UDF in executor? #458

Open
soxofaan opened this issue Jun 20, 2023 · 3 comments
Open

udf_apply_feature_dataframe UDF in executor? #458

soxofaan opened this issue Jun 20, 2023 · 3 comments
Assignees
Labels

Comments

@soxofaan
Copy link
Member

(I stumbled on this issue while working on #437 / Open-EO/openeo-python-driver#197)

#251 / #262 added parallelized UDF execution on vector cubes (udf_apply_feature_dataframe and udf_apply_udf_data entrypoints), as documented at https://github.com/Open-EO/openeo-geopyspark-driver/blob/1f0ad56cc749d9f3ade315a85f39f1200f74168c/docs/vectorcube-run_udf.md . The idea was to get parallelization and executor isolation automatically by using the pyspark.pandas with apply

However, it seems that a pyspark.pandas apply callback does not run in the executors, but just in the driver.

example snippet to illustrate:

import openeo
import openeo.processes
connection = openeo.connect("openeo.vito.be").authenticate_oidc()
cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    temporal_extent=["2023-03-01", "2023-03-20"],
    bands=["B02"],
)
geometries = {"type": "Polygon", "coordinates": [[[3.68, 51.04], [3.69, 51.04], [3.69, 51.05], [3.68, 51.05], [3.68, 51.04]]]}
aggregates = cube.aggregate_spatial(geometries=geometries, reducer="mean")
udf_code = """
import pandas as pd
import pyspark

def udf_apply_feature_dataframe(df: pd.DataFrame):
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
"""
processed = openeo.processes.run_udf(data=aggregates, udf=udf_code, runtime="Python")
connection.download(processed, outputfile="tmp.json")

This fails with: Internal: Server error: ValueError('in_executor=False') indicating the callback did not run in executor

@soxofaan
Copy link
Member Author

as comparison, here is UDF usage with openeo apply, where the UDF does run in executor:

s2_cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    spatial_extent={"west": 4.00, "south": 51.00, "east": 4.01, "north": 51.01},
    temporal_extent=["2022-03-01", "2022-03-31"],
    bands=["B02"]
)
udf = openeo.UDF("""
import pyspark
from openeo.udf import XarrayDataCube

def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
""")
rescaled = s2_cube.apply(process=udf)
rescaled.download("udf-in-executor-apply_datacube-tmp.nc")

which fails with [500] Internal: Server error: UDF Exception during Spark execution: ... ValueError: in_executor=True indicating the UDF ran in an executor

@jdries
Copy link
Contributor

jdries commented Jan 28, 2025

Note that spark.pandas performs schema inference, by running the apply on the first rows.
To avoid this, we would need to add a type hint to the callback.

jdries added a commit that referenced this issue Jan 29, 2025
@jdries
Copy link
Contributor

jdries commented Jan 29, 2025

I'm switching over the implementation to use plain RDD's. These do not require to deal with output schemas. Conversion to pandas structures is now done inside the callback.

jdries added a commit that referenced this issue Jan 29, 2025
jdries added a commit that referenced this issue Jan 30, 2025
jdries added a commit that referenced this issue Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants