From 367a3060a7e7cc7aa3e747cbba4f5a5529f68485 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Tue, 5 Nov 2024 17:42:24 -0800 Subject: [PATCH 1/4] Fixing the streaming join example --- examples/examples/stream_join.rs | 41 ++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index ac7dae4..5c6c672 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,7 +17,10 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()?; + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) + .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -29,7 +32,7 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("temperature")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?; @@ -40,30 +43,38 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("humidity")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?, ) - .await?; + .await? + .with_column("humidity_sensor", col("sensor_name"))? + .drop_columns(&["sensor_name"])? + .window( + vec![col("humidity_sensor")], + vec![avg(col("reading")).alias("avg_humidity")], + Duration::from_millis(1_000), + None, + )? + .with_column("humidity_window_start_time", col("window_start_time"))? + .with_column("humidity_window_end_time", col("window_end_time"))? + .drop_columns(&["window_start_time", "window_end_time"])?; let joined_ds = ctx .from_topic(temperature_topic) .await? + .window( + vec![col("sensor_name")], + vec![avg(col("reading")).alias("avg_temperature")], + Duration::from_millis(1_000), + None, + )? .join( humidity_ds, JoinType::Inner, - &["sensor_name"], - &["sensor_name"], - None, - )? - .window( - vec![], - vec![ - avg(col("temperature.reading")).alias("avg_temperature"), - avg(col("humidity.reading")).alias("avg_humidity"), - ], - Duration::from_millis(1_000), + &["sensor_name", "window_start_time"], + &["humidity_sensor", "humidity_window_start_time"], None, )?; From ce87b06bbb7456118b008ec06f5c25864b1d4c0e Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 6 Nov 2024 10:52:09 -0800 Subject: [PATCH 2/4] format --- py-denormalized/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index ddb4734..2a00178 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -11,7 +11,7 @@ pub mod utils; pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. -#[pymodule(name="_internal")] +#[pymodule(name = "_internal")] fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From a88c714be70f111e454589c25c6933c103fef161 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 6 Nov 2024 11:24:29 -0800 Subject: [PATCH 3/4] add drop_columns --- crates/core/src/datastream.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 8b83ec4..42a8f4f 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -113,6 +113,15 @@ impl DataStream { }) } + pub fn drop_columns(self, columns: &[&str]) -> Result { + Ok(Self { + df: Arc::new(self.df.as_ref().clone().drop_columns(columns)?), + context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), + }) + } + // Join two streams using the specified expression pub fn join_on( self, From 2d9e6dbf9f5e6fd848cda7c8a6471ea0c1d29389 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 7 Nov 2024 10:30:39 -0800 Subject: [PATCH 4/4] update python internal package name --- py-denormalized/pyproject.toml | 2 +- py-denormalized/python/denormalized/context.py | 3 +-- py-denormalized/python/denormalized/data_stream.py | 2 +- .../python/denormalized/datafusion/__init__.py | 2 +- .../python/denormalized/datafusion/catalog.py | 2 +- .../python/denormalized/datafusion/common.py | 2 +- .../python/denormalized/datafusion/context.py | 12 ++++++------ .../python/denormalized/datafusion/dataframe.py | 4 ++-- .../python/denormalized/datafusion/expr.py | 6 +++--- .../python/denormalized/datafusion/functions.py | 2 +- .../python/denormalized/datafusion/object_store.py | 2 +- .../python/denormalized/datafusion/record_batch.py | 2 +- .../python/denormalized/datafusion/udf.py | 2 +- .../python/denormalized/feast_data_stream.py | 2 +- py-denormalized/python/denormalized/utils.py | 2 +- py-denormalized/src/lib.rs | 2 +- 16 files changed, 24 insertions(+), 25 deletions(-) diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 171eae2..c6c5baf 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -20,7 +20,7 @@ feast = ["feast"] [tool.maturin] python-source = "python" features = ["pyo3/extension-module"] -module-name = "denormalized._internal" +module-name = "denormalized._d_internal" [tool.rye] dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"] diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 462ce21..1b3c5df 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,4 +1,4 @@ -from denormalized._internal import PyContext +from denormalized._d_internal import PyContext from .data_stream import DataStream class Context: @@ -20,4 +20,3 @@ def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> Da ds = DataStream(py_ds) return ds - diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index 418c520..a2a5805 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -1,5 +1,5 @@ import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs diff --git a/py-denormalized/python/denormalized/datafusion/__init__.py b/py-denormalized/python/denormalized/datafusion/__init__.py index 7419ad7..715a94d 100644 --- a/py-denormalized/python/denormalized/datafusion/__init__.py +++ b/py-denormalized/python/denormalized/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime +from denormalized._d_internal import Config, LogicalPlan, ExecutionPlan, runtime from .record_batch import RecordBatchStream, RecordBatch diff --git a/py-denormalized/python/denormalized/datafusion/catalog.py b/py-denormalized/python/denormalized/datafusion/catalog.py index d8c9092..faa5058 100644 --- a/py-denormalized/python/denormalized/datafusion/catalog.py +++ b/py-denormalized/python/denormalized/datafusion/catalog.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from typing import TYPE_CHECKING diff --git a/py-denormalized/python/denormalized/datafusion/common.py b/py-denormalized/python/denormalized/datafusion/common.py index 73ed7c4..082b979 100644 --- a/py-denormalized/python/denormalized/datafusion/common.py +++ b/py-denormalized/python/denormalized/datafusion/common.py @@ -16,7 +16,7 @@ # under the License. """Common data types used throughout the DataFusion project.""" -from denormalized._internal import common as common_internal +from denormalized._d_internal import common as common_internal from enum import Enum # TODO these should all have proper wrapper classes diff --git a/py-denormalized/python/denormalized/datafusion/context.py b/py-denormalized/python/denormalized/datafusion/context.py index 19c0760..f3eed51 100644 --- a/py-denormalized/python/denormalized/datafusion/context.py +++ b/py-denormalized/python/denormalized/datafusion/context.py @@ -19,13 +19,13 @@ from __future__ import annotations -from denormalized._internal import SessionConfig as SessionConfigInternal -from denormalized._internal import RuntimeConfig as RuntimeConfigInternal -from denormalized._internal import SQLOptions as SQLOptionsInternal -from denormalized._internal import SessionContext as SessionContextInternal -from denormalized._internal import LogicalPlan, ExecutionPlan +from denormalized._d_internal import SessionConfig as SessionConfigInternal +from denormalized._d_internal import RuntimeConfig as RuntimeConfigInternal +from denormalized._d_internal import SQLOptions as SQLOptionsInternal +from denormalized._d_internal import SessionContext as SessionContextInternal +from denormalized._d_internal import LogicalPlan, ExecutionPlan -from denormalized._internal import AggregateUDF +from denormalized._d_internal import AggregateUDF from denormalized.datafusion.catalog import Catalog, Table from denormalized.datafusion.dataframe import DataFrame from denormalized.datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list diff --git a/py-denormalized/python/denormalized/datafusion/dataframe.py b/py-denormalized/python/denormalized/datafusion/dataframe.py index 4a50545..63b41e2 100644 --- a/py-denormalized/python/denormalized/datafusion/dataframe.py +++ b/py-denormalized/python/denormalized/datafusion/dataframe.py @@ -32,9 +32,9 @@ import pathlib from typing import Callable -from denormalized._internal import DataFrame as DataFrameInternal +from denormalized._d_internal import DataFrame as DataFrameInternal from denormalized.datafusion.expr import Expr, SortExpr, sort_or_default -from denormalized._internal import ( +from denormalized._d_internal import ( LogicalPlan, ExecutionPlan, ) diff --git a/py-denormalized/python/denormalized/datafusion/expr.py b/py-denormalized/python/denormalized/datafusion/expr.py index a858a66..69f2505 100644 --- a/py-denormalized/python/denormalized/datafusion/expr.py +++ b/py-denormalized/python/denormalized/datafusion/expr.py @@ -28,9 +28,9 @@ from denormalized.datafusion.common import DataTypeMap, NullTreatment, RexType from typing_extensions import deprecated -from denormalized._internal import LogicalPlan -from denormalized._internal import expr as expr_internal -from denormalized._internal import functions as functions_internal +from denormalized._d_internal import LogicalPlan +from denormalized._d_internal import expr as expr_internal +from denormalized._d_internal import functions as functions_internal # The following are imported from the internal representation. We may choose to # give these all proper wrappers, or to simply leave as is. These were added diff --git a/py-denormalized/python/denormalized/datafusion/functions.py b/py-denormalized/python/denormalized/datafusion/functions.py index 291c578..d564672 100644 --- a/py-denormalized/python/denormalized/datafusion/functions.py +++ b/py-denormalized/python/denormalized/datafusion/functions.py @@ -18,7 +18,7 @@ from __future__ import annotations -from denormalized._internal import functions as f +from denormalized._d_internal import functions as f from denormalized.datafusion.expr import ( CaseBuilder, Expr, diff --git a/py-denormalized/python/denormalized/datafusion/object_store.py b/py-denormalized/python/denormalized/datafusion/object_store.py index 3a3371e..54610c0 100644 --- a/py-denormalized/python/denormalized/datafusion/object_store.py +++ b/py-denormalized/python/denormalized/datafusion/object_store.py @@ -16,7 +16,7 @@ # under the License. """Object store functionality.""" -from denormalized._internal import object_store +from denormalized._d_internal import object_store AmazonS3 = object_store.AmazonS3 GoogleCloud = object_store.GoogleCloud diff --git a/py-denormalized/python/denormalized/datafusion/record_batch.py b/py-denormalized/python/denormalized/datafusion/record_batch.py index e0e436e..7f7b7ef 100644 --- a/py-denormalized/python/denormalized/datafusion/record_batch.py +++ b/py-denormalized/python/denormalized/datafusion/record_batch.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: import pyarrow - import denormalized._internal as df_internal + import denormalized._d_internal as df_internal import typing_extensions diff --git a/py-denormalized/python/denormalized/datafusion/udf.py b/py-denormalized/python/denormalized/datafusion/udf.py index c1d45f9..c7f4d26 100644 --- a/py-denormalized/python/denormalized/datafusion/udf.py +++ b/py-denormalized/python/denormalized/datafusion/udf.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from datafusion.expr import Expr from typing import Callable, TYPE_CHECKING, TypeVar from abc import ABCMeta, abstractmethod diff --git a/py-denormalized/python/denormalized/feast_data_stream.py b/py-denormalized/python/denormalized/feast_data_stream.py index 252c2bb..e289b8c 100644 --- a/py-denormalized/python/denormalized/feast_data_stream.py +++ b/py-denormalized/python/denormalized/feast_data_stream.py @@ -2,7 +2,7 @@ from typing import Any, TypeVar, Union, cast, get_type_hints import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from feast import FeatureStore, Field from feast.data_source import PushMode diff --git a/py-denormalized/python/denormalized/utils.py b/py-denormalized/python/denormalized/utils.py index 13a5dbf..adb8e10 100644 --- a/py-denormalized/python/denormalized/utils.py +++ b/py-denormalized/python/denormalized/utils.py @@ -1,4 +1,4 @@ -from denormalized._internal import expr as internal_exprs +from denormalized._d_internal import expr as internal_exprs from denormalized.datafusion import Expr diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index 2a00178..230171b 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -11,7 +11,7 @@ pub mod utils; pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. -#[pymodule(name = "_internal")] +#[pymodule(name = "_d_internal")] fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?;