From c9f47605efbbb658a1f03346340c6b7b9c2e184b Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 12 Nov 2024 17:30:56 -0800 Subject: [PATCH] Add example python udaf (#53) * Add example python udaf * Fixing the streaming join example (#54) * Fixing the streaming join example * format * add drop_columns * update python internal package name --------- Co-authored-by: Matt Green * Implement stream join in python (#51) * Implement stream join in python * update * fmt * clippy fmt * example works * fmt * Adding config option for checkpointing (#50) * Adding config option for checkpointing * Add maturin build step for ci (#52) * fix: correct python module name * Fixing the streaming join example (#54) * Fixing the streaming join example * format * add drop_columns * update python internal package name --------- Co-authored-by: Matt Green * merge with main * Adding config option for checkpointing * merge with main * Cargo fmt --------- Co-authored-by: Matt Green * Rm hardcoded checkpoint (#55) * Add dockerfile to run emit_measurements and kafka (#56) * Add dockerfile to run emit_measurements and kafka * Add ability to specify timestamp_column on kafka stream (#57) * Add example python udaf * Fixing the UDAF example * merge main --------- Co-authored-by: Amey Chaugule Co-authored-by: Amey Chaugule --- .gitignore | 2 + .../core/src/logical_plan/streaming_window.rs | 2 +- .../core/src/physical_plan/continuous/mod.rs | 2 +- py-denormalized/pyproject.toml | 11 +- .../python/examples/udaf_example.py | 82 +++++++ py-denormalized/requirements-dev.lock | 214 ++++++++++++++++++ 6 files changed, 305 insertions(+), 8 deletions(-) create mode 100644 py-denormalized/python/examples/udaf_example.py diff --git a/.gitignore b/.gitignore index a1ab9bc..dd1acb4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target .vscode .DS_Store +.ipynb_checkpoints/ +Untitled.ipynb diff --git a/crates/core/src/logical_plan/streaming_window.rs b/crates/core/src/logical_plan/streaming_window.rs index 8bb2a93..29ae2df 100644 --- a/crates/core/src/logical_plan/streaming_window.rs +++ b/crates/core/src/logical_plan/streaming_window.rs @@ -83,7 +83,7 @@ pub struct StreamingWindowSchema { impl StreamingWindowSchema { pub fn try_new(aggr_expr: Aggregate) -> Result { let inner_schema = aggr_expr.schema.inner().clone(); - let fields = inner_schema.flattened_fields().to_owned(); + let fields = inner_schema.fields(); let mut builder = SchemaBuilder::new(); diff --git a/crates/core/src/physical_plan/continuous/mod.rs b/crates/core/src/physical_plan/continuous/mod.rs index 664aada..5970890 100644 --- a/crates/core/src/physical_plan/continuous/mod.rs +++ b/crates/core/src/physical_plan/continuous/mod.rs @@ -40,7 +40,7 @@ pub(crate) fn create_group_accumulator( } fn add_window_columns_to_schema(schema: SchemaRef) -> Schema { - let fields = schema.flattened_fields().to_owned(); + let fields = schema.fields(); let mut builder = SchemaBuilder::new(); diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 9af3a12..b76a343 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -5,13 +5,10 @@ build-backend = "maturin" [project] name = "denormalized" requires-python = ">=3.12" -classifiers = [ ] -dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml +classifiers = [] +dynamic = ["version"] # Version specified in py-denormalized/Cargo.toml description = "" -dependencies = [ - "pyarrow>=17.0.0", - "datafusion>=40.1.0", -] +dependencies = ["pyarrow>=17.0.0", "datafusion>=40.1.0"] [project.optional-dependencies] tests = ["pytest"] @@ -30,6 +27,8 @@ dev-dependencies = [ "pytest>=8.3.2", "maturin>=1.7.4", "pyarrow-stubs>=17.11", + "pandas>=2.2.3", + "jupyterlab>=4.3.0", ] # Enable docstring linting using the google style guide diff --git a/py-denormalized/python/examples/udaf_example.py b/py-denormalized/python/examples/udaf_example.py new file mode 100644 index 0000000..724fdf0 --- /dev/null +++ b/py-denormalized/python/examples/udaf_example.py @@ -0,0 +1,82 @@ +"""stream_aggregate example.""" + +import json +import signal +import sys +from collections import Counter +from typing import List +import pyarrow as pa + +from denormalized import Context +from denormalized.datafusion import Accumulator, col +from denormalized.datafusion import functions as f +from denormalized.datafusion import udaf + + +def signal_handler(sig, frame): + sys.exit(0) + + +signal.signal(signal.SIGINT, signal_handler) + +bootstrap_server = "localhost:9092" + +sample_event = { + "occurred_at_ms": 100, + "sensor_name": "foo", + "reading": 0.0, +} + +class TotalValuesRead(Accumulator): + # Define the state type as a struct containing a map + acc_state_type = pa.struct([("counts", pa.map_(pa.string(), pa.int64()))]) + + def __init__(self): + self.counts = Counter() + + def update(self, values: pa.Array) -> None: + # Update counter with new values + if values is not None: + self.counts.update(values.to_pylist()) + + def merge(self, states: pa.Array) -> None: + # Merge multiple states into this accumulator + if states is None or len(states) == 0: + return + for state in states: + if state is not None: + counts_map = state.to_pylist()[0] # will always be one element struct + for k, v in counts_map["counts"]: + self.counts[k] += v + + def state(self) -> List[pa.Scalar]: + # Convert current state to Arrow array format + result = {"counts": dict(self.counts.items())} + return [pa.scalar(result, type=pa.struct([("counts", pa.map_(pa.string(), pa.int64()))]))] + + def evaluate(self) -> pa.Scalar: + return self.state()[0] + + +input_type = [pa.string()] +return_type = TotalValuesRead.acc_state_type +state_type = [TotalValuesRead.acc_state_type] +sample_udaf = udaf(TotalValuesRead, input_type, return_type, state_type, "stable") + + +def print_batch(rb: pa.RecordBatch): + if not len(rb): + return + print(rb) + +ctx = Context() +ds = ctx.from_topic("temperature", json.dumps(sample_event), bootstrap_server, "occurred_at_ms") + +ds = ds.window( + [], + [ + sample_udaf(col("sensor_name")).alias("count"), + ], + 2000, + None, +).sink(print_batch) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index 92923c9..135f630 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -10,37 +10,172 @@ # universal: false -e file:. +anyio==4.6.2.post1 + # via httpx + # via jupyter-server +appnope==0.1.4 + # via ipykernel +argon2-cffi==23.1.0 + # via jupyter-server +argon2-cffi-bindings==21.2.0 + # via argon2-cffi +arrow==1.3.0 + # via isoduration asttokens==2.4.1 # via stack-data +async-lru==2.0.4 + # via jupyterlab +attrs==24.2.0 + # via jsonschema + # via referencing +babel==2.16.0 + # via jupyterlab-server +beautifulsoup4==4.12.3 + # via nbconvert +bleach==6.2.0 + # via nbconvert +certifi==2024.8.30 + # via httpcore + # via httpx + # via requests +cffi==1.17.1 + # via argon2-cffi-bindings +charset-normalizer==3.4.0 + # via requests +comm==0.2.2 + # via ipykernel datafusion==40.1.0 # via denormalized +debugpy==1.8.7 + # via ipykernel decorator==5.1.1 # via ipython +defusedxml==0.7.1 + # via nbconvert executing==2.0.1 # via stack-data +fastjsonschema==2.20.0 + # via nbformat +fqdn==1.5.1 + # via jsonschema +h11==0.14.0 + # via httpcore +httpcore==1.0.6 + # via httpx +httpx==0.27.2 + # via jupyterlab +idna==3.10 + # via anyio + # via httpx + # via jsonschema + # via requests iniconfig==2.0.0 # via pytest +ipykernel==6.29.5 + # via jupyterlab ipython==8.26.0 + # via ipykernel +isoduration==20.11.0 + # via jsonschema jedi==0.19.1 # via ipython +jinja2==3.1.4 + # via jupyter-server + # via jupyterlab + # via jupyterlab-server + # via nbconvert +json5==0.9.25 + # via jupyterlab-server +jsonpointer==3.0.0 + # via jsonschema +jsonschema==4.23.0 + # via jupyter-events + # via jupyterlab-server + # via nbformat +jsonschema-specifications==2024.10.1 + # via jsonschema +jupyter-client==8.6.3 + # via ipykernel + # via jupyter-server + # via nbclient +jupyter-core==5.7.2 + # via ipykernel + # via jupyter-client + # via jupyter-server + # via jupyterlab + # via nbclient + # via nbconvert + # via nbformat +jupyter-events==0.10.0 + # via jupyter-server +jupyter-lsp==2.2.5 + # via jupyterlab +jupyter-server==2.14.2 + # via jupyter-lsp + # via jupyterlab + # via jupyterlab-server + # via notebook-shim +jupyter-server-terminals==0.5.3 + # via jupyter-server +jupyterlab==4.3.0 +jupyterlab-pygments==0.3.0 + # via nbconvert +jupyterlab-server==2.27.3 + # via jupyterlab +markupsafe==3.0.2 + # via jinja2 + # via nbconvert matplotlib-inline==0.1.7 + # via ipykernel # via ipython +mistune==3.0.2 + # via nbconvert +nbclient==0.10.0 + # via nbconvert +nbconvert==7.16.4 + # via jupyter-server +nbformat==5.10.4 + # via jupyter-server + # via nbclient + # via nbconvert +nest-asyncio==1.6.0 + # via ipykernel +notebook-shim==0.2.4 + # via jupyterlab maturin==1.7.4 numpy==2.1.0 + # via pandas # via pyarrow +overrides==7.7.0 + # via jupyter-server packaging==24.1 + # via ipykernel + # via jupyter-server + # via jupyterlab + # via jupyterlab-server + # via nbconvert # via pytest +pandas==2.2.3 +pandocfilters==1.5.1 + # via nbconvert parso==0.8.4 # via jedi pexpect==4.9.0 # via ipython pip==24.2 +platformdirs==4.3.6 + # via jupyter-core pluggy==1.5.0 # via pytest +prometheus-client==0.21.0 + # via jupyter-server prompt-toolkit==3.0.47 # via ipython +psutil==6.1.0 + # via ipykernel ptyprocess==0.7.0 # via pexpect + # via terminado pure-eval==0.2.3 # via stack-data pyarrow==17.0.0 @@ -48,17 +183,96 @@ pyarrow==17.0.0 # via denormalized # via pyarrow-stubs pyarrow-stubs==17.11 +pycparser==2.22 + # via cffi pygments==2.18.0 # via ipython + # via nbconvert pytest==8.3.2 +python-dateutil==2.9.0.post0 + # via arrow + # via jupyter-client + # via pandas +python-json-logger==2.0.7 + # via jupyter-events +pytz==2024.2 + # via pandas +pyyaml==6.0.2 + # via jupyter-events +pyzmq==26.2.0 + # via ipykernel + # via jupyter-client + # via jupyter-server +referencing==0.35.1 + # via jsonschema + # via jsonschema-specifications + # via jupyter-events +requests==2.32.3 + # via jupyterlab-server +rfc3339-validator==0.1.4 + # via jsonschema + # via jupyter-events +rfc3986-validator==0.1.1 + # via jsonschema + # via jupyter-events +rpds-py==0.20.1 + # via jsonschema + # via referencing +send2trash==1.8.3 + # via jupyter-server +setuptools==75.3.0 + # via jupyterlab six==1.16.0 # via asttokens + # via python-dateutil + # via rfc3339-validator +sniffio==1.3.1 + # via anyio + # via httpx +soupsieve==2.6 + # via beautifulsoup4 stack-data==0.6.3 # via ipython +terminado==0.18.1 + # via jupyter-server + # via jupyter-server-terminals +tinycss2==1.4.0 + # via nbconvert +tornado==6.4.1 + # via ipykernel + # via jupyter-client + # via jupyter-server + # via jupyterlab + # via terminado traitlets==5.14.3 + # via comm + # via ipykernel # via ipython + # via jupyter-client + # via jupyter-core + # via jupyter-events + # via jupyter-server + # via jupyterlab # via matplotlib-inline + # via nbclient + # via nbconvert + # via nbformat +types-python-dateutil==2.9.0.20241003 + # via arrow typing-extensions==4.12.2 # via datafusion +tzdata==2024.2 + # via pandas +uri-template==1.3.0 + # via jsonschema +urllib3==2.2.3 + # via requests wcwidth==0.2.13 # via prompt-toolkit +webcolors==24.8.0 + # via jsonschema +webencodings==0.5.1 + # via bleach + # via tinycss2 +websocket-client==1.8.0 + # via jupyter-server