Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
fix demo in readme (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Mar 15, 2022
1 parent 7b336a9 commit e24d59c
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Simple usage:
```python
import datafusion
from datafusion import functions as f
from datafusion import col
import pyarrow

# create a context
Expand All @@ -54,8 +55,8 @@ df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
f.col("a") + f.col("b"),
f.col("a") - f.col("b"),
col("a") + col("b"),
col("a") - col("b"),
)

# execute and collect the first (and only) batch
Expand All @@ -68,31 +69,35 @@ assert result.column(1) == pyarrow.array([-3, -3, -3])
### UDFs

```python
from datafusion import udf

def is_null(array: pyarrow.Array) -> pyarrow.Array:
return array.is_null()

udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_())
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')

df = df.select(is_null_arr(col("a")))

result = df.collect()

df = df.select(udf(f.col("a")))
assert result.column(0) == pyarrow.array([False] * 3)
```

### UDAF

```python
import pyarrow
import pyarrow.compute
from datafusion import udaf, Accumulator


class Accumulator:
class MyAccumulator(Accumulator):
"""
Interface of a user-defined accumulation.
"""
def __init__(self):
self._sum = pyarrow.scalar(0.0)

def to_scalars(self) -> [pyarrow.Scalar]:
return [self._sum]

def update(self, values: pyarrow.Array) -> None:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
Expand All @@ -101,18 +106,25 @@ class Accumulator:
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())

def state(self) -> pyarrow.Array:
return pyarrow.array([self._sum.as_py()])

def evaluate(self) -> pyarrow.Scalar:
return self._sum


df = ...
df = ctx.create_dataframe([[batch]])

udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()])
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')

df = df.aggregate(
[],
[udaf(f.col("a"))]
[my_udaf(col("a"))]
)

result = df.collect()[0]

assert result.column(0) == pyarrow.array([6.0])
```

## How to install (from pip)
Expand Down

0 comments on commit e24d59c

Please sign in to comment.