Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup #38624

Closed

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Nov 11, 2022

What changes were proposed in this pull request?

Add applyInArrow method to PySpark groupBy and groupBy.cogroup to allow for user functions that work on Arrow. Similar to existing mapInArrow.

Why are the changes needed?

PySpark allows to transform a DataFrame via Pandas and Arrow API:

df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")

For df.groupBy(...) and df.groupBy(...).cogroup(...), there is only a Pandas interface, no Arrow interface:

df.groupBy("id").applyInPandas(apply_pandas, schema="...")

Providing a pure Arrow interface allows user code to use any Arrow-based data framework, not only Pandas, e.g. Polars:

def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")

Does this PR introduce any user-facing change?

This adds method applyInPandas to PySpark groupBy and groupBy.cogroup.

How was this patch tested?

Tested with unit tests.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi EnricoMi force-pushed the branch-pyspark-grouped-apply-in-arrow branch from 8a4fdcd to 208ee90 Compare November 25, 2022 11:13
@github-actions github-actions bot added the BUILD label Nov 25, 2022
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 2, 2022

@HyukjinKwon what do you think?

@goodwanghan
Copy link

@EnricoMi @HyukjinKwon I think this is a very critical feature that is missing in the current PySpark. Can we consider merging this change?

@EnricoMi EnricoMi force-pushed the branch-pyspark-grouped-apply-in-arrow branch 2 times, most recently from c3e5647 to 89d4acc Compare March 6, 2023 10:06
@EnricoMi EnricoMi force-pushed the branch-pyspark-grouped-apply-in-arrow branch from 89d4acc to ac936c1 Compare March 13, 2023 08:52
@github-actions github-actions bot removed the BUILD label Mar 13, 2023
@EnricoMi
Copy link
Contributor Author

CC @xinrong-meng

Copy link
Contributor

@Kimahriman Kimahriman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would definitely be useful to have! Left one doc typo comment and one question about iteration

Comment on lines 172 to 188
batch_iter = [
(batch, arrow_type)
for batches, arrow_type in iterator # tuple constructed in wrap_grouped_map_arrow_udf
for batch in batches
]

if self._assign_cols_by_name:
batch_iter = [
(
pa.RecordBatch.from_arrays(
[batch.column(field.name) for field in arrow_type],
names=[field.name for field in arrow_type],
),
arrow_type,
)
for batch, arrow_type in batch_iter
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these list comprehensions going to materialize the entire result set before actually sending anything back to the JVM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for highlighting this, I have changed this to a generator.

@EnricoMi EnricoMi force-pushed the branch-pyspark-grouped-apply-in-arrow branch from 843b13a to cf86682 Compare June 30, 2023 07:57
@EnricoMi EnricoMi force-pushed the branch-pyspark-grouped-apply-in-arrow branch 4 times, most recently from 9705a70 to 839c50a Compare July 18, 2023 08:56
@EnricoMi
Copy link
Contributor Author

@xinrong-meng @HyukjinKwon rebased with master and conflicts resolved

@EnricoMi
Copy link
Contributor Author

@Kimahriman you mentioned this would fix a 2GB memory limit?

@Kimahriman
Copy link
Contributor

Kimahriman commented Jul 18, 2023

@Kimahriman you mentioned this would fix a 2GB memory limit?

Yeah combined with the new setting spark.sql.execution.arrow.useLargeVarTypes it should allow getting around a 2GiB limit on a single string/binary column being returned from a applyInPandas function (by using applyInArrow instead too)

@ion-elgreco
Copy link

Looking forward to see this PR getting merged :)

@igorghi
Copy link

igorghi commented Aug 18, 2023

Any updates on getting this merged?

@ion-elgreco
Copy link

@dongjoon-hyun @zhengruifeng @allisonwang-db @xinrong-meng @HyukjinKwon
Are there any updates on this PR? This would be a very useful feature for scaling other data frame libraries that use arrow with spark.

@HyukjinKwon
Copy link
Member

qq, can't we workaround by df.repartitionByExpression().mapInArrow() for groupby case?

@HyukjinKwon
Copy link
Member

I get that cogroup might not be possible tho. But we can just convert pandas back to arrow batches easily. Is this really required for some scenario? IIRC this is only useful for addressing nested types.

dongjoon-hyun pushed a commit that referenced this pull request Dec 4, 2023
… both applyInArrows

### What changes were proposed in this pull request?

This PR is a followup of #38624 that documents both applyInArrows with a docstring fix.

### Why are the changes needed?

For end users to refer the API documentation.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released out yet.

### How was this patch tested?

Existing CI, and documentation build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44139 from HyukjinKwon/SPARK-40559-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
HyukjinKwon added a commit that referenced this pull request Dec 4, 2023
…p in Spark Connect

### What changes were proposed in this pull request?

This PR implements Spark Connect version of #38624.

### Why are the changes needed?

For feature parity.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new API for Python Spark Connect client.

### How was this patch tested?

Reused unittest and doctests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44146 from HyukjinKwon/connect-arrow-api.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
### What changes were proposed in this pull request?
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.

### Why are the changes needed?
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```

### Does this PR introduce _any_ user-facing change?
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.

### How was this patch tested?
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
### What changes were proposed in this pull request?

This PR proposes to use `inspect.getfullargspec` instead of unimported `getfullargspec`. This PR is a followup of apache#38624.

### Why are the changes needed?

To recover the CI.

It fails as below:

```
./python/pyspark/worker.py:749:19: F821 undefined name 'getfullargspec'
        argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
                  ^
./python/pyspark/worker.py:757:19: F8[21](https://github.com/apache/spark/actions/runs/7080907452/job/19269484124#step:21:22) undefined name 'getfullargspec'
        argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
```

https://github.com/apache/spark/actions/runs/7080907452/job/19269484124

It was caused by the logical conflict w/ apache@f5e4e84

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested via `linter-python`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44141 from HyukjinKwon/SPARK-40559-followup2.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
… both applyInArrows

### What changes were proposed in this pull request?

This PR is a followup of apache#38624 that documents both applyInArrows with a docstring fix.

### Why are the changes needed?

For end users to refer the API documentation.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released out yet.

### How was this patch tested?

Existing CI, and documentation build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44139 from HyukjinKwon/SPARK-40559-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
…p in Spark Connect

### What changes were proposed in this pull request?

This PR implements Spark Connect version of apache#38624.

### Why are the changes needed?

For feature parity.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new API for Python Spark Connect client.

### How was this patch tested?

Reused unittest and doctests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44146 from HyukjinKwon/connect-arrow-api.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@ion-elgreco
Copy link

@HyukjinKwon which spark release will we see this feature? :D

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 5, 2023

That will be 4.0.0.

@EnricoMi EnricoMi deleted the branch-pyspark-grouped-apply-in-arrow branch December 5, 2023 19:58
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 5, 2023

@HyukjinKwon thanks for merging!

@ion-elgreco
Copy link

@EnricoMi do you know by any chance when that is targeted for?

@HyukjinKwon
Copy link
Member

Around next June

dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
### What changes were proposed in this pull request?
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.

### Why are the changes needed?
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```

### Does this PR introduce _any_ user-facing change?
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.

### How was this patch tested?
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
### What changes were proposed in this pull request?

This PR proposes to use `inspect.getfullargspec` instead of unimported `getfullargspec`. This PR is a followup of apache#38624.

### Why are the changes needed?

To recover the CI.

It fails as below:

```
./python/pyspark/worker.py:749:19: F821 undefined name 'getfullargspec'
        argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
                  ^
./python/pyspark/worker.py:757:19: F8[21](https://github.com/apache/spark/actions/runs/7080907452/job/19269484124#step:21:22) undefined name 'getfullargspec'
        argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
```

https://github.com/apache/spark/actions/runs/7080907452/job/19269484124

It was caused by the logical conflict w/ apache@f5e4e84

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested via `linter-python`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44141 from HyukjinKwon/SPARK-40559-followup2.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
… both applyInArrows

### What changes were proposed in this pull request?

This PR is a followup of apache#38624 that documents both applyInArrows with a docstring fix.

### Why are the changes needed?

For end users to refer the API documentation.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released out yet.

### How was this patch tested?

Existing CI, and documentation build.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44139 from HyukjinKwon/SPARK-40559-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
…p in Spark Connect

### What changes were proposed in this pull request?

This PR implements Spark Connect version of apache#38624.

### Why are the changes needed?

For feature parity.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new API for Python Spark Connect client.

### How was this patch tested?

Reused unittest and doctests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44146 from HyukjinKwon/connect-arrow-api.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Dec 16, 2023
### What changes were proposed in this pull request?

This PR is a sort of a followup of #38624 that proposes to rename the plan nodes for Python as below:

From:

```
package org.apache.spark.sql.execution.python

MapInBatchExec
├── MapInPandasExec
└── *PythonMapInArrowExec* (and *PythonMapInArrow*)

*FlatMapCoGroupsInPythonExec*
├── FlatMapCoGroupsInArrowExec
└── FlatMapCoGroupsInPandasExec

*FlatMapGroupsInPythonExec*
├── FlatMapGroupsInArrowExec
└── FlatMapGroupsInPandasExec
```

To:

```
package org.apache.spark.sql.execution.python

MapInBatchExec
├── MapInPandasExec
└── *MapInArrowExec* (and *MapInArrow*)

*FlatMapCoGroupsInBatchExec*
├── FlatMapCoGroupsInArrowExec
└── FlatMapCoGroupsInPandasExec

*FlatMapGroupsInBatchExec*
├── FlatMapGroupsInArrowExec
└── FlatMapGroupsInPandasExec
```

### Why are the changes needed?

To have the consistent names for Python related execution nodes.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44373 from HyukjinKwon/minor-arrow-rename.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Apr 18, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Apr 18, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Jul 18, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Jul 30, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Aug 12, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Sep 3, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Sep 16, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Dec 9, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Dec 19, 2024
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request Feb 27, 2025
Add `applyInArrow` method to PySpark `groupBy` and `groupBy.cogroup` to allow for user functions that work on Arrow. Similar to existing `mapInArrow`.
PySpark allows to transform a `DataFrame` via Pandas and Arrow API:
```
df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
```

For `df.groupBy(...)` and `df.groupBy(...).cogroup(...)`, there is only a Pandas interface, no Arrow interface:
```
df.groupBy("id").applyInPandas(apply_pandas, schema="...")
```

Providing a pure Arrow interface allows user code to use **any** Arrow-based data framework, not only Pandas, e.g. Polars:
```
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(table: pyarrow.Table) -> pyarrow.Table:
  df = polars.from_arrow(table)
  return apply_polars(df).to_arrow()

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
```
This adds method `applyInPandas` to PySpark `groupBy` and `groupBy.cogroup`.
Tested with unit tests.

Closes apache#38624 from EnricoMi/branch-pyspark-grouped-apply-in-arrow.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants