Skip to content

Commit

Permalink
Remove Franz branding (#33)
Browse files Browse the repository at this point in the history
* Remove Franz branding

* Rust fmt

* rv kafka settings

* Add the groupby back in
  • Loading branch information
ameyc authored Sep 2, 2024
1 parent 8743b28 commit 4f0bb85
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 115 deletions.
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ description = "Embeddable stream processing engine"
denormalized = { path = "crates/core" }
denormalized-common = { path = "crates/common" }

datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "amey/patch-with-node-id" }
datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" }

arrow = { version = "52.0.0", features = ["prettyprint"] }
arrow-array = { version = "52.0.0", default-features = false, features = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::error::Result;

use crate::physical_plan::continuous::streaming_window::FranzStreamingWindowExec;
use crate::physical_plan::continuous::streaming_window::StreamingWindowExec;

pub struct CoaslesceBeforeStreamingAggregate {}

Expand Down Expand Up @@ -37,7 +37,7 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate {
) -> Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
plan.transform(|original| {
if let Some(streaming_aggr_exec) =
original.as_any().downcast_ref::<FranzStreamingWindowExec>()
original.as_any().downcast_ref::<StreamingWindowExec>()
{
let input = streaming_aggr_exec.input();
let partitions = match input.output_partitioning() {
Expand Down Expand Up @@ -68,18 +68,16 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate {
),
)?)
};
Ok(Transformed::yes(Arc::new(
FranzStreamingWindowExec::try_new(
streaming_aggr_exec.mode,
streaming_aggr_exec.group_by.clone(),
streaming_aggr_exec.aggregate_expressions.clone(),
streaming_aggr_exec.filter_expressions.clone(),
coalesce_exec.clone(),
input.schema(),
streaming_aggr_exec.window_type,
streaming_aggr_exec.upstream_partitioning,
)?,
)))
Ok(Transformed::yes(Arc::new(StreamingWindowExec::try_new(
streaming_aggr_exec.mode,
streaming_aggr_exec.group_by.clone(),
streaming_aggr_exec.aggregate_expressions.clone(),
streaming_aggr_exec.filter_expressions.clone(),
coalesce_exec.clone(),
input.schema(),
streaming_aggr_exec.window_type,
streaming_aggr_exec.upstream_partitioning,
)?)))
} else {
Ok(Transformed::no(original))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::physical_plan::utils::time::RecordBatchWatermark;
use super::{
add_window_columns_to_record_batch, add_window_columns_to_schema, create_group_accumulator,
streaming_window::{
get_windows_for_watermark, FranzStreamingWindowExec, FranzStreamingWindowType,
get_windows_for_watermark, PhysicalStreamingWindowType, StreamingWindowExec,
},
GroupsAccumulatorItem,
};
Expand All @@ -56,7 +56,7 @@ pub struct GroupedWindowAggStream {
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
latest_watermark: Arc<Mutex<Option<SystemTime>>>,
window_frames: BTreeMap<SystemTime, GroupedAggWindowFrame>,
window_type: FranzStreamingWindowType,
window_type: PhysicalStreamingWindowType,
aggregation_mode: AggregateMode,
group_by: PhysicalGroupBy,
group_schema: Arc<Schema>,
Expand All @@ -71,11 +71,11 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
#[allow(dead_code)]
impl GroupedWindowAggStream {
pub fn new(
exec_operator: &FranzStreamingWindowExec,
exec_operator: &StreamingWindowExec,
context: Arc<TaskContext>,
partition: usize,
watermark: Arc<Mutex<Option<SystemTime>>>,
window_type: FranzStreamingWindowType,
window_type: PhysicalStreamingWindowType,
aggregation_mode: AggregateMode,
) -> Result<Self> {
let agg_schema = Arc::clone(&exec_operator.schema);
Expand Down Expand Up @@ -165,9 +165,9 @@ impl GroupedWindowAggStream {

fn get_window_length(&mut self) -> Duration {
match self.window_type {
FranzStreamingWindowType::Session(duration) => duration,
FranzStreamingWindowType::Sliding(duration, _) => duration,
FranzStreamingWindowType::Tumbling(duration) => duration,
PhysicalStreamingWindowType::Session(duration) => duration,
PhysicalStreamingWindowType::Sliding(duration, _) => duration,
PhysicalStreamingWindowType::Tumbling(duration) => duration,
}
}

Expand Down
Loading

0 comments on commit 4f0bb85

Please sign in to comment.