Skip to content

Commit

Permalink
add dataframe passthrough methods to datastream
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 8, 2024
1 parent e407d6b commit 56e2dd1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ serde.workspace = true
rocksdb = "0.22.0"
bincode = "1.3.3"
half = "2.4.1"
delegate = "0.12.0"
19 changes: 18 additions & 1 deletion crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ pub struct DataStream {
}

impl DataStream {
pub fn filter(&self, predicate: Expr) -> Result<Self> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();

let plan = LogicalPlanBuilder::from(plan).filter(predicate)?.build()?;

Ok(Self {
df: Arc::new(DataFrame::new(session_state, plan)),
context: self.context.clone(),
})
}

// drop_columns, sync, columns: &[&str]
// count

pub fn streaming_window(
&self,
group_expr: Vec<Expr>,
Expand Down Expand Up @@ -46,7 +60,10 @@ impl DataStream {
if batch.num_rows() > 0 {
println!(
"{}",
arrow::util::pretty::pretty_format_batches(&[batch]).unwrap()
datafusion::common::arrow::util::pretty::pretty_format_batches(&[
batch
])
.unwrap()
);
}
}
Expand Down
27 changes: 17 additions & 10 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::logical_expr::lit;
use datafusion::logical_expr::{col, max, min};

use df_streams_core::context::Context;
Expand Down Expand Up @@ -29,16 +30,22 @@ async fn main() -> Result<()> {
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?;
let ds = ctx
.from_topic(source_topic)
.await?
.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?
.filter(col("max").gt(lit(114)))?;

println!("{}", ds.df.logical_plan().display_indent());

ds.clone().print_stream().await?;

Expand Down

0 comments on commit 56e2dd1

Please sign in to comment.