Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the streaming join example #54

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ impl DataStream {
})
}

pub fn drop_columns(self, columns: &[&str]) -> Result<Self> {
Ok(Self {
df: Arc::new(self.df.as_ref().clone().drop_columns(columns)?),
context: self.context.clone(),
shutdown_tx: self.shutdown_tx.clone(),
shutdown_rx: self.shutdown_rx.clone(),
})
}

// Join two streams using the specified expression
pub fn join_on(
self,
Expand Down
41 changes: 26 additions & 15 deletions examples/examples/stream_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ async fn main() -> Result<()> {

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;
let ctx = Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1"))
.await;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic_builder = topic_builder
Expand All @@ -29,7 +32,7 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("temperature"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;
Expand All @@ -40,30 +43,38 @@ async fn main() -> Result<()> {
.clone()
.with_topic(String::from("humidity"))
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?,
)
.await?;
.await?
.with_column("humidity_sensor", col("sensor_name"))?
.drop_columns(&["sensor_name"])?
.window(
vec![col("humidity_sensor")],
vec![avg(col("reading")).alias("avg_humidity")],
Duration::from_millis(1_000),
None,
)?
.with_column("humidity_window_start_time", col("window_start_time"))?
.with_column("humidity_window_end_time", col("window_end_time"))?
.drop_columns(&["window_start_time", "window_end_time"])?;

let joined_ds = ctx
.from_topic(temperature_topic)
.await?
.window(
vec![col("sensor_name")],
vec![avg(col("reading")).alias("avg_temperature")],
Duration::from_millis(1_000),
None,
)?
.join(
humidity_ds,
JoinType::Inner,
&["sensor_name"],
&["sensor_name"],
None,
)?
.window(
vec![],
vec![
avg(col("temperature.reading")).alias("avg_temperature"),
avg(col("humidity.reading")).alias("avg_humidity"),
],
Duration::from_millis(1_000),
&["sensor_name", "window_start_time"],
&["humidity_sensor", "humidity_window_start_time"],
None,
)?;

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ feast = ["feast"]
[tool.maturin]
python-source = "python"
features = ["pyo3/extension-module"]
module-name = "denormalized._internal"
module-name = "denormalized._d_internal"

[tool.rye]
dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"]
Expand Down
3 changes: 1 addition & 2 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from denormalized._internal import PyContext
from denormalized._d_internal import PyContext
from .data_stream import DataStream

class Context:
Expand All @@ -20,4 +20,3 @@ def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> Da
ds = DataStream(py_ds)

return ds

2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized._d_internal import PyDataStream
from denormalized.datafusion import Expr
from denormalized.utils import to_internal_expr, to_internal_exprs

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .catalog import Catalog, Database, Table

# The following imports are okay to remain as opaque to the user.
from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime
from denormalized._d_internal import Config, LogicalPlan, ExecutionPlan, runtime

from .record_batch import RecordBatchStream, RecordBatch

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

import denormalized._internal as df_internal
import denormalized._d_internal as df_internal

from typing import TYPE_CHECKING

Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
"""Common data types used throughout the DataFusion project."""

from denormalized._internal import common as common_internal
from denormalized._d_internal import common as common_internal
from enum import Enum

# TODO these should all have proper wrapper classes
Expand Down
12 changes: 6 additions & 6 deletions py-denormalized/python/denormalized/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

from __future__ import annotations

from denormalized._internal import SessionConfig as SessionConfigInternal
from denormalized._internal import RuntimeConfig as RuntimeConfigInternal
from denormalized._internal import SQLOptions as SQLOptionsInternal
from denormalized._internal import SessionContext as SessionContextInternal
from denormalized._internal import LogicalPlan, ExecutionPlan
from denormalized._d_internal import SessionConfig as SessionConfigInternal
from denormalized._d_internal import RuntimeConfig as RuntimeConfigInternal
from denormalized._d_internal import SQLOptions as SQLOptionsInternal
from denormalized._d_internal import SessionContext as SessionContextInternal
from denormalized._d_internal import LogicalPlan, ExecutionPlan

from denormalized._internal import AggregateUDF
from denormalized._d_internal import AggregateUDF
from denormalized.datafusion.catalog import Catalog, Table
from denormalized.datafusion.dataframe import DataFrame
from denormalized.datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list
Expand Down
4 changes: 2 additions & 2 deletions py-denormalized/python/denormalized/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import pathlib
from typing import Callable

from denormalized._internal import DataFrame as DataFrameInternal
from denormalized._d_internal import DataFrame as DataFrameInternal
from denormalized.datafusion.expr import Expr, SortExpr, sort_or_default
from denormalized._internal import (
from denormalized._d_internal import (
LogicalPlan,
ExecutionPlan,
)
Expand Down
6 changes: 3 additions & 3 deletions py-denormalized/python/denormalized/datafusion/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from denormalized.datafusion.common import DataTypeMap, NullTreatment, RexType
from typing_extensions import deprecated

from denormalized._internal import LogicalPlan
from denormalized._internal import expr as expr_internal
from denormalized._internal import functions as functions_internal
from denormalized._d_internal import LogicalPlan
from denormalized._d_internal import expr as expr_internal
from denormalized._d_internal import functions as functions_internal

# The following are imported from the internal representation. We may choose to
# give these all proper wrappers, or to simply leave as is. These were added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from __future__ import annotations

from denormalized._internal import functions as f
from denormalized._d_internal import functions as f
from denormalized.datafusion.expr import (
CaseBuilder,
Expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
"""Object store functionality."""

from denormalized._internal import object_store
from denormalized._d_internal import object_store

AmazonS3 = object_store.AmazonS3
GoogleCloud = object_store.GoogleCloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

if TYPE_CHECKING:
import pyarrow
import denormalized._internal as df_internal
import denormalized._d_internal as df_internal
import typing_extensions


Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/datafusion/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

import denormalized._internal as df_internal
import denormalized._d_internal as df_internal
from datafusion.expr import Expr
from typing import Callable, TYPE_CHECKING, TypeVar
from abc import ABCMeta, abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/feast_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, TypeVar, Union, cast, get_type_hints

import pyarrow as pa
from denormalized._internal import PyDataStream
from denormalized._d_internal import PyDataStream
from denormalized.datafusion import Expr
from feast import FeatureStore, Field
from feast.data_source import PushMode
Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/python/denormalized/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from denormalized._internal import expr as internal_exprs
from denormalized._d_internal import expr as internal_exprs
from denormalized.datafusion import Expr


Expand Down
2 changes: 1 addition & 1 deletion py-denormalized/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod utils;
pub(crate) struct TokioRuntime(tokio::runtime::Runtime);

/// A Python module implemented in Rust.
#[pymodule(name="_internal")]
#[pymodule(name = "_d_internal")]
fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<datastream::PyDataStream>()?;
m.add_class::<context::PyContext>()?;
Expand Down
Loading