diff --git a/Cargo.lock b/Cargo.lock index e96c25c..affeaf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -814,7 +814,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "apache-avro", @@ -873,7 +873,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow-schema", "async-trait", @@ -886,7 +886,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "apache-avro", @@ -910,7 +910,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "tokio", ] @@ -918,7 +918,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "chrono", @@ -938,7 +938,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -959,7 +959,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "datafusion-common", @@ -969,7 +969,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "arrow-buffer", @@ -995,7 +995,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1015,7 +1015,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1028,7 +1028,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "arrow-array", @@ -1049,7 +1049,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1060,7 +1060,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "async-trait", @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1110,7 +1110,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1123,7 +1123,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1138,7 +1138,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1150,7 +1150,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "ahash", "arrow", @@ -1185,7 +1185,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" dependencies = [ "arrow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 994879a..d7ed65b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ description = "Embeddable stream processing engine" denormalized = { path = "crates/core" } denormalized-common = { path = "crates/common" } -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "amey/patch-with-node-id" } +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" } arrow = { version = "52.0.0", features = ["prettyprint"] } arrow-array = { version = "52.0.0", default-features = false, features = [ diff --git a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs index f4b3eaa..3c7910a 100644 --- a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs +++ b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs @@ -8,7 +8,7 @@ use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::error::Result; -use crate::physical_plan::continuous::streaming_window::FranzStreamingWindowExec; +use crate::physical_plan::continuous::streaming_window::StreamingWindowExec; pub struct CoaslesceBeforeStreamingAggregate {} @@ -37,7 +37,7 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate { ) -> Result> { plan.transform(|original| { if let Some(streaming_aggr_exec) = - original.as_any().downcast_ref::() + original.as_any().downcast_ref::() { let input = streaming_aggr_exec.input(); let partitions = match input.output_partitioning() { @@ -68,18 +68,16 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate { ), )?) }; - Ok(Transformed::yes(Arc::new( - FranzStreamingWindowExec::try_new( - streaming_aggr_exec.mode, - streaming_aggr_exec.group_by.clone(), - streaming_aggr_exec.aggregate_expressions.clone(), - streaming_aggr_exec.filter_expressions.clone(), - coalesce_exec.clone(), - input.schema(), - streaming_aggr_exec.window_type, - streaming_aggr_exec.upstream_partitioning, - )?, - ))) + Ok(Transformed::yes(Arc::new(StreamingWindowExec::try_new( + streaming_aggr_exec.mode, + streaming_aggr_exec.group_by.clone(), + streaming_aggr_exec.aggregate_expressions.clone(), + streaming_aggr_exec.filter_expressions.clone(), + coalesce_exec.clone(), + input.schema(), + streaming_aggr_exec.window_type, + streaming_aggr_exec.upstream_partitioning, + )?))) } else { Ok(Transformed::no(original)) } diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 8cfb757..c0eceb3 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -41,7 +41,7 @@ use crate::physical_plan::utils::time::RecordBatchWatermark; use super::{ add_window_columns_to_record_batch, add_window_columns_to_schema, create_group_accumulator, streaming_window::{ - get_windows_for_watermark, FranzStreamingWindowExec, FranzStreamingWindowType, + get_windows_for_watermark, PhysicalStreamingWindowType, StreamingWindowExec, }, GroupsAccumulatorItem, }; @@ -56,7 +56,7 @@ pub struct GroupedWindowAggStream { filter_expressions: Vec>>, latest_watermark: Arc>>, window_frames: BTreeMap, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, group_by: PhysicalGroupBy, group_schema: Arc, @@ -71,11 +71,11 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { #[allow(dead_code)] impl GroupedWindowAggStream { pub fn new( - exec_operator: &FranzStreamingWindowExec, + exec_operator: &StreamingWindowExec, context: Arc, partition: usize, watermark: Arc>>, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, ) -> Result { let agg_schema = Arc::clone(&exec_operator.schema); @@ -165,9 +165,9 @@ impl GroupedWindowAggStream { fn get_window_length(&mut self) -> Duration { match self.window_type { - FranzStreamingWindowType::Session(duration) => duration, - FranzStreamingWindowType::Sliding(duration, _) => duration, - FranzStreamingWindowType::Tumbling(duration) => duration, + PhysicalStreamingWindowType::Session(duration) => duration, + PhysicalStreamingWindowType::Sliding(duration, _) => duration, + PhysicalStreamingWindowType::Tumbling(duration) => duration, } } diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index 13f08b5..ea7ffc8 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -1,5 +1,4 @@ use std::{ - alloc::System, any::Any, borrow::Cow, collections::BTreeMap, @@ -40,20 +39,15 @@ use datafusion::{ use futures::{Stream, StreamExt}; use tracing::debug; -use crate::{ - accumulators, - physical_plan::{ - continuous::grouped_window_agg_stream::GroupedWindowAggStream, - utils::{ - accumulators::{create_accumulators, AccumulatorItem}, - time::{system_time_from_epoch, RecordBatchWatermark}, - }, +use crate::physical_plan::{ + continuous::grouped_window_agg_stream::GroupedWindowAggStream, + utils::{ + accumulators::{create_accumulators, AccumulatorItem}, + time::{system_time_from_epoch, RecordBatchWatermark}, }, }; -type WatermarkMutex = Arc>>; - -pub struct FranzWindowFrame { +pub struct PartialWindowAggFrame { pub window_start_time: SystemTime, window_end_time: SystemTime, timestamp_column: String, @@ -65,7 +59,7 @@ pub struct FranzWindowFrame { baseline_metrics: BaselineMetrics, } -impl DisplayAs for FranzWindowFrame { +impl DisplayAs for PartialWindowAggFrame { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { @@ -87,7 +81,7 @@ use datafusion::common::Result; use super::{add_window_columns_to_record_batch, add_window_columns_to_schema, batch_filter}; -impl FranzWindowFrame { +impl PartialWindowAggFrame { pub fn new( window_start_time: SystemTime, window_end_time: SystemTime, @@ -190,14 +184,14 @@ impl FranzWindowFrame { } #[derive(Debug, Clone, Copy)] -pub enum FranzStreamingWindowType { +pub enum PhysicalStreamingWindowType { Session(Duration), Sliding(Duration, Duration), Tumbling(Duration), } #[derive(Debug)] -pub struct FranzStreamingWindowExec { +pub struct StreamingWindowExec { pub(crate) input: Arc, pub aggregate_expressions: Vec>, pub filter_expressions: Vec>>, @@ -210,11 +204,11 @@ pub struct FranzStreamingWindowExec { pub(crate) metrics: ExecutionPlanMetricsSet, cache: PlanProperties, pub mode: AggregateMode, - pub window_type: FranzStreamingWindowType, + pub window_type: PhysicalStreamingWindowType, pub upstream_partitioning: Option, } -impl FranzStreamingWindowExec { +impl StreamingWindowExec { /// Create a new execution plan for window aggregates /// pub fn try_new( @@ -224,7 +218,7 @@ impl FranzStreamingWindowExec { filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, upstream_partitioning: Option, ) -> Result { let schema = create_schema( @@ -236,7 +230,7 @@ impl FranzStreamingWindowExec { )?; let schema = Arc::new(schema); - FranzStreamingWindowExec::try_new_with_schema( + StreamingWindowExec::try_new_with_schema( mode, group_by, aggr_expr, @@ -257,7 +251,7 @@ impl FranzStreamingWindowExec { input: Arc, input_schema: SchemaRef, schema: SchemaRef, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, upstream_partitioning: Option, ) -> Result { if aggr_expr.len() != filter_expr.len() { @@ -299,7 +293,7 @@ impl FranzStreamingWindowExec { // construct a map from the input expression to the output expression of the Aggregation group by let projection_mapping = ProjectionMapping::try_new(group_by.expr(), &input.schema())?; - let cache = FranzStreamingWindowExec::compute_properties( + let cache = StreamingWindowExec::compute_properties( &input, Arc::new(add_window_columns_to_schema(schema.clone())), &projection_mapping, @@ -379,9 +373,9 @@ impl FranzStreamingWindowExec { } } -impl ExecutionPlan for FranzStreamingWindowExec { +impl ExecutionPlan for StreamingWindowExec { fn name(&self) -> &'static str { - "FranzWindowExec" + "StreamingWindowExec" } /// Return a reference to Any that can be used for downcasting @@ -405,7 +399,7 @@ impl ExecutionPlan for FranzStreamingWindowExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(FranzStreamingWindowExec::try_new( + Ok(Arc::new(StreamingWindowExec::try_new( self.mode, self.group_by.clone(), self.aggregate_expressions.clone(), @@ -439,7 +433,6 @@ impl ExecutionPlan for FranzStreamingWindowExec { context, partition, Duration::from_millis(100), - self.upstream_partitioning, )?)) } } else { @@ -518,11 +511,11 @@ impl ExecutionPlan for FranzStreamingWindowExec { } } -impl DisplayAs for FranzStreamingWindowExec { +impl DisplayAs for StreamingWindowExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FranzStreamingWindowExec: mode={:?}", self.mode)?; + write!(f, "StreamingWindowExec: mode={:?}", self.mode)?; let g: Vec = if self.group_by.is_single() { self.group_by .expr() @@ -600,19 +593,19 @@ pub struct WindowAggStream { aggregate_expressions: Vec>>, filter_expressions: Vec>>, latest_watermark: Arc>>, - window_frames: BTreeMap, - window_type: FranzStreamingWindowType, + window_frames: BTreeMap, + window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, } #[allow(dead_code)] impl WindowAggStream { pub fn new( - exec_operator: &FranzStreamingWindowExec, + exec_operator: &StreamingWindowExec, context: Arc, partition: usize, watermark: Arc>>, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, aggregation_mode: AggregateMode, ) -> Result { let agg_schema = Arc::clone(&exec_operator.schema); @@ -698,9 +691,9 @@ impl WindowAggStream { fn get_window_length(&mut self) -> Duration { match self.window_type { - FranzStreamingWindowType::Session(duration) => duration, - FranzStreamingWindowType::Sliding(duration, _) => duration, - FranzStreamingWindowType::Tumbling(duration) => duration, + PhysicalStreamingWindowType::Session(duration) => duration, + PhysicalStreamingWindowType::Sliding(duration, _) => duration, + PhysicalStreamingWindowType::Tumbling(duration) => duration, } } @@ -711,7 +704,7 @@ impl WindowAggStream { for (start_time, end_time) in ranges { self.window_frames.entry(*start_time).or_insert({ let accumulators = create_accumulators(&self.exec_aggregate_expressions)?; - FranzWindowFrame::new( + PartialWindowAggFrame::new( *start_time, *end_time, "canonical_timestamp".to_string(), @@ -789,7 +782,7 @@ struct FullWindowAggFrame { aggregate_expressions: Vec>>, filter_expressions: Vec>>, schema: SchemaRef, - baseline_metrics: BaselineMetrics, + _baseline_metrics: BaselineMetrics, batches_accumulated: usize, } @@ -812,7 +805,7 @@ impl FullWindowAggFrame { aggregate_expressions, filter_expressions, schema: schema.clone(), - baseline_metrics: baseline_metrics, + _baseline_metrics: baseline_metrics, batches_accumulated: 0, } } @@ -829,39 +822,32 @@ impl FullWindowAggFrame { } fn evaluate(&mut self) -> Result { - let result = finalize_aggregation(&mut self.accumulators, &AggregateMode::Final).and_then( - |columns| RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into), - ); - result + finalize_aggregation(&mut self.accumulators, &AggregateMode::Final).and_then(|columns| { + RecordBatch::try_new(self.schema.clone(), columns).map_err(Into::into) + }) } } struct FullWindowAggStream { pub schema: SchemaRef, input: SendableRecordBatchStream, - //exec_operator: FranzStreamingWindowExec, baseline_metrics: BaselineMetrics, exec_aggregate_expressions: Vec>, aggregate_expressions: Vec>>, filter_expressions: Vec>>, cached_frames: BTreeMap, watermark: Option, // This stream needs to be run with only one partition in the Exec operator. - upstream_partitions: usize, - lateness_threshold: Duration, + _lateness_threshold: Duration, seen_windows: std::collections::HashSet, } impl FullWindowAggStream { pub fn try_new( - exec_operator: &FranzStreamingWindowExec, + exec_operator: &StreamingWindowExec, context: Arc, partition: usize, lateness_threshold: Duration, - upstream_partitioning: Option, ) -> Result { - debug!(">>>>>> FullWindowAggStream for partition {}", partition); let agg_schema = Arc::clone(&exec_operator.schema); - let agg_filter_expr = exec_operator.filter_expressions.clone(); - let baseline_metrics = BaselineMetrics::new(&exec_operator.metrics, partition); let input = exec_operator .input @@ -880,8 +866,7 @@ impl FullWindowAggStream { filter_expressions, cached_frames: BTreeMap::new(), watermark: None, - upstream_partitions: upstream_partitioning.map_or(1, |x| x), - lateness_threshold, + _lateness_threshold: lateness_threshold, seen_windows: std::collections::HashSet::::new(), }) } @@ -966,7 +951,7 @@ impl FullWindowAggStream { rb.remove_column(col_size - 1); rb.remove_column(col_size - 2); - let _ = frame.aggregate_batch(rb); + frame.aggregate_batch(rb); self.watermark = self .watermark @@ -1008,15 +993,15 @@ impl Stream for FullWindowAggStream { pub fn get_windows_for_watermark( watermark: &RecordBatchWatermark, - window_type: FranzStreamingWindowType, + window_type: PhysicalStreamingWindowType, ) -> Vec<(SystemTime, SystemTime)> { let start_time = watermark.min_timestamp; let end_time = watermark.max_timestamp; let mut window_ranges = Vec::new(); match window_type { - FranzStreamingWindowType::Session(_) => todo!(), - FranzStreamingWindowType::Sliding(window_length, slide) => { + PhysicalStreamingWindowType::Session(_) => todo!(), + PhysicalStreamingWindowType::Sliding(window_length, slide) => { let mut current_start = snap_to_window_start(start_time - window_length, window_length); while current_start <= end_time { let current_end = current_start + window_length; @@ -1029,7 +1014,7 @@ pub fn get_windows_for_watermark( current_start += slide; } } - FranzStreamingWindowType::Tumbling(window_length) => { + PhysicalStreamingWindowType::Tumbling(window_length) => { let mut current_start: SystemTime = snap_to_window_start(start_time, window_length); while current_start <= end_time { let current_end = current_start + window_length; diff --git a/crates/core/src/planner/streaming_window.rs b/crates/core/src/planner/streaming_window.rs index fd677f9..0d0edac 100644 --- a/crates/core/src/planner/streaming_window.rs +++ b/crates/core/src/planner/streaming_window.rs @@ -1,6 +1,4 @@ use async_trait::async_trait; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::Partitioning; use itertools::multiunzip; use std::sync::Arc; @@ -17,9 +15,9 @@ use datafusion::physical_planner::{ create_aggregate_expr_and_maybe_filter, ExtensionPlanner, PhysicalPlanner, }; -use crate::logical_plan::streaming_window::{self, StreamingWindowPlanNode, StreamingWindowType}; +use crate::logical_plan::streaming_window::{StreamingWindowPlanNode, StreamingWindowType}; use crate::physical_plan::continuous::streaming_window::{ - FranzStreamingWindowExec, FranzStreamingWindowType, + PhysicalStreamingWindowType, StreamingWindowExec, }; use datafusion::error::Result; @@ -124,16 +122,16 @@ impl ExtensionPlanner for StreamingWindowPlanner { multiunzip(agg_filter); let franz_window_type = match streaming_window_node.window_type { StreamingWindowType::Tumbling(length) => { - FranzStreamingWindowType::Tumbling(length) + PhysicalStreamingWindowType::Tumbling(length) } StreamingWindowType::Sliding(length, slide) => { - FranzStreamingWindowType::Sliding(length, slide) + PhysicalStreamingWindowType::Sliding(length, slide) } StreamingWindowType::Session(..) => todo!(), }; - let final_aggr = if streaming_window_node.aggregrate.group_expr.len() == 0 { - let partial_aggr = Arc::new(FranzStreamingWindowExec::try_new( + let final_aggr = if streaming_window_node.aggregrate.group_expr.is_empty() { + let partial_aggr = Arc::new(StreamingWindowExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), @@ -143,7 +141,7 @@ impl ExtensionPlanner for StreamingWindowPlanner { franz_window_type, None, )?); - Arc::new(FranzStreamingWindowExec::try_new( + Arc::new(StreamingWindowExec::try_new( AggregateMode::Final, groups.clone(), aggregates.clone(), @@ -154,7 +152,7 @@ impl ExtensionPlanner for StreamingWindowPlanner { None, )?) } else { - Arc::new(FranzStreamingWindowExec::try_new( + Arc::new(StreamingWindowExec::try_new( AggregateMode::Single, groups.clone(), aggregates.clone(), diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 94e5976..749a2d4 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -18,10 +18,7 @@ async fn main() -> Result<()> { let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new() - .set( - "bootstrap.servers", - String::from("localhost:19092,localhost:29092,localhost:39092"), - ) + .set("bootstrap.servers", String::from("localhost:9092")) .set("message.timeout.ms", "100") .create() .expect("Producer creation error"); diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 1c24e29..b520a7b 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -49,7 +49,7 @@ async fn main() -> Result<()> { } }"#; - let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); + let bootstrap_servers = String::from("localhost:9092"); let ctx = Context::new()?; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 15f39e3..a0bb20f 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -20,7 +20,7 @@ async fn main() -> Result<()> { let sample_event = get_sample_json(); - let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); + let bootstrap_servers = String::from("localhost:9092"); let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); @@ -40,7 +40,7 @@ async fn main() -> Result<()> { .from_topic(source_topic) .await? .window( - vec![], //vec![col("sensor_name")], + vec![col("sensor_name")], vec![ count(col("reading")).alias("count"), min(col("reading")).alias("min"),