From c8a8e3cf4dcb3f24dd96c92bac056daf6e0dae91 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Wed, 4 Sep 2024 15:35:23 -0700 Subject: [PATCH 1/8] Saving progress --- Cargo.lock | 160 ++++++--- Cargo.toml | 3 +- crates/channels/Cargo.toml | 13 + crates/channels/src/channel_manager/mod.rs | 51 +++ crates/channels/src/lib.rs | 1 + crates/core/Cargo.toml | 1 + .../src/datasource/kafka/kafka_stream_read.rs | 46 ++- .../core/src/datasource/kafka/topic_reader.rs | 5 +- crates/core/src/datastream.rs | 33 +- crates/core/src/lib.rs | 1 + crates/core/src/orchestrator/mod.rs | 1 + crates/core/src/orchestrator/orchestrator.rs | 17 + .../continuous/streaming_window.rs | 19 + crates/core/src/physical_plan/mod.rs | 1 + crates/core/src/physical_plan/stream_table.rs | 325 ++++++++++++++++++ examples/examples/kafka_rideshare.rs | 9 +- examples/examples/simple_aggregation.rs | 4 +- 17 files changed, 631 insertions(+), 59 deletions(-) create mode 100644 crates/channels/Cargo.toml create mode 100644 crates/channels/src/channel_manager/mod.rs create mode 100644 crates/channels/src/lib.rs create mode 100644 crates/core/src/orchestrator/mod.rs create mode 100644 crates/core/src/orchestrator/orchestrator.rs create mode 100644 crates/core/src/physical_plan/stream_table.rs diff --git a/Cargo.lock b/Cargo.lock index affeaf4..6650ba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,13 +424,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -495,7 +495,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -748,6 +748,56 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -814,7 +864,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "apache-avro", @@ -873,7 +923,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow-schema", "async-trait", @@ -886,7 +936,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "apache-avro", @@ -910,7 +960,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "tokio", ] @@ -918,7 +968,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "chrono", @@ -938,7 +988,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -959,7 +1009,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "datafusion-common", @@ -969,7 +1019,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "arrow-buffer", @@ -995,7 +1045,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1015,7 +1065,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1028,7 +1078,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "arrow-array", @@ -1049,7 +1099,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1060,7 +1110,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "async-trait", @@ -1079,7 +1129,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1110,7 +1160,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1123,7 +1173,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1138,7 +1188,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1150,7 +1200,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "ahash", "arrow", @@ -1185,7 +1235,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#14d9f85164d2b088656e3e79b20383d4e024cbec" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" dependencies = [ "arrow", "arrow-array", @@ -1206,7 +1256,7 @@ checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -1226,6 +1276,7 @@ dependencies = [ "chrono", "datafusion", "delegate", + "denormalized-channels", "denormalized-common", "futures", "half", @@ -1240,6 +1291,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "denormalized-channels" +version = "0.0.1" +dependencies = [ + "anyhow", + "crossbeam", + "datafusion", + "log", + "once_cell", + "parking_lot", + "tokio", +] + [[package]] name = "denormalized-common" version = "0.0.1" @@ -1441,7 +1505,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -1593,9 +1657,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown", @@ -2051,9 +2115,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.3" +version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" dependencies = [ "memchr", ] @@ -2327,7 +2391,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2340,7 +2404,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2492,9 +2556,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] @@ -2568,7 +2632,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2676,7 +2740,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2710,7 +2774,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2723,7 +2787,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2745,9 +2809,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.76" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -2790,7 +2854,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2840,9 +2904,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.3" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", @@ -2859,7 +2923,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2911,7 +2975,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2976,7 +3040,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -3107,7 +3171,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "wasm-bindgen-shared", ] @@ -3129,7 +3193,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3308,7 +3372,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d7ed65b..29308f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,9 @@ description = "Embeddable stream processing engine" [workspace.dependencies] denormalized = { path = "crates/core" } denormalized-common = { path = "crates/common" } +denormalized-channels = { path = "crates/channels" } -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" } +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "amey/patch-with-node-id" } arrow = { version = "52.0.0", features = ["prettyprint"] } arrow-array = { version = "52.0.0", default-features = false, features = [ diff --git a/crates/channels/Cargo.toml b/crates/channels/Cargo.toml new file mode 100644 index 0000000..a3cedb5 --- /dev/null +++ b/crates/channels/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "denormalized-channels" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] +anyhow = "1.0.86" +crossbeam = "0.8.4" +datafusion = { workspace = true } +log.workspace = true +once_cell = "1.19.0" +parking_lot = "0.12.3" +tokio.workspace = true diff --git a/crates/channels/src/channel_manager/mod.rs b/crates/channels/src/channel_manager/mod.rs new file mode 100644 index 0000000..ee028b7 --- /dev/null +++ b/crates/channels/src/channel_manager/mod.rs @@ -0,0 +1,51 @@ +use crossbeam::channel; +use log::debug; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; + +type Message = String; + +struct ChannelPair { + sender: channel::Sender, + receiver: Option>, +} + +struct Channels { + channels: HashMap, +} + +static GLOBAL_CHANNELS: Lazy>> = Lazy::new(|| { + Arc::new(RwLock::new(Channels { + channels: HashMap::new(), + })) +}); + +// Function to create a new channel +pub fn create_channel(id: &str, buffer: usize) { + debug!("create request for channel {} with size {}", id, buffer); + let mut channels = GLOBAL_CHANNELS.write(); + let (tx, rx) = channel::unbounded(); + + channels.channels.insert( + id.to_string(), + ChannelPair { + sender: tx, + receiver: Some(rx), + }, + ); +} + +pub fn get_sender(id: &str) -> Option> { + let channels = GLOBAL_CHANNELS.read(); + channels.channels.get(id).map(|pair| pair.sender.clone()) +} + +pub fn take_receiver(id: &str) -> Option> { + let mut channels = GLOBAL_CHANNELS.write(); + channels + .channels + .get_mut(id) + .and_then(|pair| pair.receiver.take()) +} diff --git a/crates/channels/src/lib.rs b/crates/channels/src/lib.rs new file mode 100644 index 0000000..9644df2 --- /dev/null +++ b/crates/channels/src/lib.rs @@ -0,0 +1 @@ +pub mod channel_manager; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 38dbfcb..3289863 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -5,6 +5,7 @@ edition = { workspace = true } [dependencies] denormalized-common = { workspace = true } +denormalized-channels = { workspace = true } datafusion = { workspace = true } diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 72721d7..56f75a5 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -5,11 +5,14 @@ use std::time::Duration; use arrow::datatypes::TimestampMillisecondType; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; +use denormalized_channels::channel_manager::{create_channel, get_sender}; +use log::{debug, error}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tracing::{debug, error}; +//use tracing::{debug, error}; use crate::config_extensions::denormalized_config::DenormalizedConfig; +use crate::physical_plan::stream_table::PartitionStreamExt; use crate::physical_plan::utils::time::array_to_timestamp_array; use crate::state_backend::rocksdb_backend::get_global_rocksdb; use crate::utils::arrow_helpers::json_records_to_arrow_record_batch; @@ -24,9 +27,21 @@ use rdkafka::{ClientConfig, Message, Timestamp, TopicPartitionList}; use super::KafkaReadConfig; +#[derive(Clone)] pub struct KafkaStreamRead { pub config: Arc, pub assigned_partitions: Vec, + pub exec_node_id: Option, +} + +impl KafkaStreamRead { + pub fn with_node_id(self, node_id: Option) -> KafkaStreamRead { + Self { + config: self.config.clone(), + assigned_partitions: self.assigned_partitions.clone(), + exec_node_id: node_id, + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -123,9 +138,25 @@ impl PartitionStream for KafkaStreamRead { let timestamp_unit = self.config.timestamp_unit.clone(); let batch_timeout = Duration::from_millis(100); + let partition_tag = self + .assigned_partitions + .iter() + .map(|&num| num.to_string()) + .collect::>() + .join("_"); + + let node_id = self.exec_node_id.unwrap(); + let channel_tag = format!("{}_{}", node_id, partition_tag); + create_channel(channel_tag.as_str(), 10); builder.spawn(async move { let mut epoch = 0; + let orchestrator_sender = get_sender("orchestrator"); loop { + if epoch == 0 { + let msg = format!("Registering {}", channel_tag); + debug!("sending {} to orchestrator", msg); + orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); + } let mut last_offsets = HashMap::new(); if let Some(backend) = &state_backend { if let Some(offsets) = backend @@ -202,7 +233,7 @@ impl PartitionStream for KafkaStreamRead { } } - debug!("Batch size {}", batch.len()); + //debug!("Batch size {}", batch.len()); if !batch.is_empty() { let record_batch: RecordBatch = @@ -277,3 +308,14 @@ impl PartitionStream for KafkaStreamRead { builder.build() } } + +// Implement this for KafkaStreamRead +impl PartitionStreamExt for KafkaStreamRead { + fn requires_node_id(&self) -> bool { + true + } + + fn as_partition_with_node_id(&self) -> Option<&KafkaStreamRead> { + Some(self) + } +} diff --git a/crates/core/src/datasource/kafka/topic_reader.rs b/crates/core/src/datasource/kafka/topic_reader.rs index 884dd14..6eca1b4 100644 --- a/crates/core/src/datasource/kafka/topic_reader.rs +++ b/crates/core/src/datasource/kafka/topic_reader.rs @@ -9,6 +9,8 @@ use datafusion::logical_expr::{Expr, TableType}; use datafusion::physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; use datafusion::physical_plan::{streaming::StreamingTableExec, ExecutionPlan}; +use crate::physical_plan::stream_table::DenormalizedStreamingTableExec; + use super::{KafkaReadConfig, KafkaStreamRead}; // Used to createa kafka source @@ -37,11 +39,12 @@ impl TopicReader { let read_stream = Arc::new(KafkaStreamRead { config: self.0.clone(), assigned_partitions: vec![part], + exec_node_id: None, }); partition_streams.push(read_stream as _); } - Ok(Arc::new(StreamingTableExec::try_new( + Ok(Arc::new(DenormalizedStreamingTableExec::try_new( self.0.schema.clone(), partition_streams, projection, diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 25db7fb..7c704f0 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,4 +1,7 @@ use datafusion::logical_expr::LogicalPlan; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::ExecutionPlanProperties; +use denormalized_channels::channel_manager::take_receiver; use futures::StreamExt; use std::{sync::Arc, time::Duration}; @@ -14,8 +17,11 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; +use crate::orchestrator; +use crate::orchestrator::orchestrator::Orchestrator; use crate::physical_plan::utils::time::TimestampUnit; +use denormalized_channels::channel_manager::create_channel; use denormalized_common::error::Result; /// The primary interface for building a streaming job @@ -128,7 +134,19 @@ impl DataStream { /// execute the stream and print the results to stdout. /// Mainly used for development and debugging - pub async fn print_stream(self) -> Result<()> { + pub async fn print_stream(&self) -> Result<()> { + println!("entered print stream."); + + let plan = self.df.as_ref().clone().create_physical_plan().await?; + let node_ids = extract_node_ids_and_partitions(&plan); + let max_buffer_size = node_ids + .iter() + .map(|x| x.1) + .fold(0, |sum, partition_count| sum + partition_count); + println!("creating orchestrator channel"); + let orchestrator = Orchestrator {}; + tokio::task::spawn_blocking(move || orchestrator.run(max_buffer_size)); + println!("started the orchestrator"); let mut stream: SendableRecordBatchStream = self.df.as_ref().clone().execute_stream().await?; loop { @@ -232,3 +250,16 @@ impl Joinable for DataStream { plan } } + +fn extract_node_ids_and_partitions(plan: &Arc) -> Vec<(Option, usize)> { + let node_id = plan.node_id(); + let partitions = plan.output_partitioning().partition_count(); + let mut traversals: Vec<(Option, usize)> = vec![]; + + for child in plan.children() { + let mut traversal = extract_node_ids_and_partitions(child); + traversals.append(&mut traversal); + } + traversals.push((node_id, partitions)); + traversals +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b915289..5d005e5 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,6 +4,7 @@ pub mod context; pub mod datasource; pub mod datastream; pub mod logical_plan; +pub mod orchestrator; pub mod physical_optimizer; pub mod physical_plan; pub mod planner; diff --git a/crates/core/src/orchestrator/mod.rs b/crates/core/src/orchestrator/mod.rs new file mode 100644 index 0000000..7b53598 --- /dev/null +++ b/crates/core/src/orchestrator/mod.rs @@ -0,0 +1 @@ +pub mod orchestrator; diff --git a/crates/core/src/orchestrator/orchestrator.rs b/crates/core/src/orchestrator/orchestrator.rs new file mode 100644 index 0000000..ed8f9f0 --- /dev/null +++ b/crates/core/src/orchestrator/orchestrator.rs @@ -0,0 +1,17 @@ +use denormalized_channels::channel_manager::{create_channel, take_receiver}; +use log::{debug, info}; + +pub struct Orchestrator {} + +impl Orchestrator { + pub fn run(&self, managed_tasks: usize) { + info!("creating a channel"); + create_channel("orchestrator", managed_tasks); + let receiver = take_receiver("orchestrator"); + info!("receiver is {:?}", receiver); + loop { + let msg = receiver.as_ref().unwrap().recv().unwrap(); + info!("Received from blocking channel: {}", msg); + } + } +} diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index ea7ffc8..093596a 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -509,6 +509,25 @@ impl ExecutionPlan for StreamingWindowExec { ) -> Result>> { Ok(None) } + + fn with_node_id(self: Arc, _node_id: usize) -> Result>> { + let cache = self.properties().clone().with_node_id(_node_id); + let new_exec = StreamingWindowExec { + input: self.input.clone(), + aggregate_expressions: self.aggregate_expressions.clone(), + filter_expressions: self.filter_expressions.clone(), + group_by: self.group_by.clone(), + schema: self.schema.clone(), + input_schema: self.input_schema.clone(), + watermark: Arc::new(Mutex::new(None)), + metrics: ExecutionPlanMetricsSet::new(), + cache, + mode: self.mode, + window_type: self.window_type, + upstream_partitioning: self.upstream_partitioning, + }; + Ok(Some(Arc::new(new_exec))) + } } impl DisplayAs for StreamingWindowExec { diff --git a/crates/core/src/physical_plan/mod.rs b/crates/core/src/physical_plan/mod.rs index 44831a7..1e46239 100644 --- a/crates/core/src/physical_plan/mod.rs +++ b/crates/core/src/physical_plan/mod.rs @@ -1,2 +1,3 @@ pub mod continuous; +pub mod stream_table; pub mod utils; diff --git a/crates/core/src/physical_plan/stream_table.rs b/crates/core/src/physical_plan/stream_table.rs new file mode 100644 index 0000000..52feaf3 --- /dev/null +++ b/crates/core/src/physical_plan/stream_table.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`] + +use std::any::Any; +use std::sync::Arc; + +//use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties}; +//use crate::display::{display_orderings, ProjectSchemaDisplay}; +//use crate::stream::RecordBatchStreamAdapter; +//use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use arrow::datatypes::SchemaRef; +use arrow_schema::Schema; +use datafusion::{ + common::{internal_err, plan_err, Result}, + physical_expr::EquivalenceProperties, + physical_plan::{ + display::{display_orderings, ProjectSchemaDisplay}, + limit::LimitStream, + metrics::{BaselineMetrics, MetricsSet}, + stream::RecordBatchStreamAdapter, + streaming::PartitionStream, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + }, +}; +//use datafusion_execution::TaskContext; +//use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; + +//use crate::limit::LimitStream; +//use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use async_trait::async_trait; +use datafusion::{ + execution::{SendableRecordBatchStream, TaskContext}, + physical_expr::LexOrdering, + physical_plan::{metrics::ExecutionPlanMetricsSet, PlanProperties}, +}; +use futures::stream::StreamExt; +use log::debug; + +use crate::datasource::kafka::KafkaStreamRead; + +pub trait PartitionStreamExt: PartitionStream { + fn requires_node_id(&self) -> bool { + false + } + + fn as_partition_with_node_id(&self) -> Option<&KafkaStreamRead> { + None + } +} + +/// An [`ExecutionPlan`] for one or more [`PartitionStream`]s. +/// +/// If your source can be represented as one or more [`PartitionStream`]s, you can +/// use this struct to implement [`ExecutionPlan`]. +pub struct DenormalizedStreamingTableExec { + partitions: Vec>, + projection: Option>, + projected_schema: SchemaRef, + projected_output_ordering: Vec, + infinite: bool, + limit: Option, + cache: PlanProperties, + metrics: ExecutionPlanMetricsSet, +} + +impl DenormalizedStreamingTableExec { + /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect + pub fn try_new( + schema: SchemaRef, + partitions: Vec>, + projection: Option<&Vec>, + projected_output_ordering: impl IntoIterator, + infinite: bool, + limit: Option, + ) -> Result { + for x in partitions.iter() { + let partition_schema = x.schema(); + if !schema.eq(partition_schema) { + debug!( + "Target schema does not match with partition schema. \ + Target_schema: {schema:?}. Partition Schema: {partition_schema:?}" + ); + return plan_err!("Mismatch between schema and batches"); + } + } + + let projected_schema = match projection { + Some(p) => Arc::new(schema.project(p)?), + None => schema, + }; + let projected_output_ordering = projected_output_ordering.into_iter().collect::>(); + let cache = Self::compute_properties( + Arc::clone(&projected_schema), + &projected_output_ordering, + &partitions, + infinite, + ); + Ok(Self { + partitions, + projected_schema, + projection: projection.cloned().map(Into::into), + projected_output_ordering, + infinite, + limit, + cache, + metrics: ExecutionPlanMetricsSet::new(), + }) + } + + pub fn partitions(&self) -> &Vec> { + &self.partitions + } + + pub fn partition_schema(&self) -> &SchemaRef { + self.partitions[0].schema() + } + + pub fn projection(&self) -> &Option> { + &self.projection + } + + pub fn projected_schema(&self) -> &Schema { + &self.projected_schema + } + + pub fn projected_output_ordering(&self) -> impl IntoIterator { + self.projected_output_ordering.clone() + } + + pub fn is_infinite(&self) -> bool { + self.infinite + } + + pub fn limit(&self) -> Option { + self.limit + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + schema: SchemaRef, + orderings: &[LexOrdering], + partitions: &[Arc], + is_infinite: bool, + ) -> PlanProperties { + // Calculate equivalence properties: + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + + // Get output partitioning: + let output_partitioning = Partitioning::UnknownPartitioning(partitions.len()); + + // Determine execution mode: + let mode = if is_infinite { + ExecutionMode::Unbounded + } else { + ExecutionMode::Bounded + }; + + PlanProperties::new(eq_properties, output_partitioning, mode) + } +} + +impl std::fmt::Debug for DenormalizedStreamingTableExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazyMemTableExec").finish_non_exhaustive() + } +} + +impl DisplayAs for DenormalizedStreamingTableExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "DenormalizedStreamingTableExec: partition_sizes={:?}", + self.partitions.len(), + )?; + if !self.projected_schema.fields().is_empty() { + write!( + f, + ", projection={}", + ProjectSchemaDisplay(&self.projected_schema) + )?; + } + if self.infinite { + write!(f, ", infinite_source=true")?; + } + if let Some(fetch) = self.limit { + write!(f, ", fetch={fetch}")?; + } + + display_orderings(f, &self.projected_output_ordering)?; + + Ok(()) + } + } + } +} + +#[async_trait] +impl ExecutionPlan for DenormalizedStreamingTableExec { + fn name(&self) -> &'static str { + "DenormalizedStreamingTableExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn fetch(&self) -> Option { + self.limit + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } + } + + fn execute( + &self, + partition: usize, + ctx: Arc, + ) -> Result { + let stream = self.partitions[partition].execute(ctx); + let projected_stream = match self.projection.clone() { + Some(projection) => Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.projected_schema), + stream.map(move |x| { + x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into)) + }), + )), + None => stream, + }; + Ok(match self.limit { + None => projected_stream, + Some(fetch) => { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Box::pin(LimitStream::new( + projected_stream, + 0, + Some(fetch), + baseline_metrics, + )) + } + }) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn with_fetch(&self, limit: Option) -> Option> { + Some(Arc::new(DenormalizedStreamingTableExec { + partitions: self.partitions.clone(), + projection: self.projection.clone(), + projected_schema: Arc::clone(&self.projected_schema), + projected_output_ordering: self.projected_output_ordering.clone(), + infinite: self.infinite, + limit, + cache: self.cache.clone(), + metrics: self.metrics.clone(), + })) + } + + fn with_node_id(self: Arc, _node_id: usize) -> Result>> { + let new_partitions: Vec> = self + .partitions + .iter() + .map(|partition| { + if partition.requires_node_id() { + if let Some(kafka_stream) = partition.as_partition_with_node_id() { + let new_stream = kafka_stream.clone().with_node_id(Some(_node_id)); + Arc::new(new_stream) as Arc + } else { + Arc::clone(partition) + } + } else { + Arc::clone(partition) + } + }) + .collect(); + let mut new_plan = DenormalizedStreamingTableExec { + partitions: new_partitions, + projection: self.projection.clone(), + projected_schema: Arc::clone(&self.projected_schema), + projected_output_ordering: self.projected_output_ordering.clone(), + infinite: self.infinite, + limit: self.limit, + cache: self.cache.clone(), + metrics: self.metrics.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(_node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } +} diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index b520a7b..83c92ac 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -49,7 +49,7 @@ async fn main() -> Result<()> { } }"#; - let bootstrap_servers = String::from("localhost:9092"); + let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); let ctx = Context::new()?; @@ -79,9 +79,10 @@ async fn main() -> Result<()> { )?; // ds.clone().print_stream().await?; - - ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) - .await?; + //ds.clone().create_orchestrator_thread().await?; + ds.print_physical_plan().await?; + //ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) + // .await?; Ok(()) } diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index a0bb20f..f0d45da 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -15,12 +15,12 @@ use denormalized_examples::get_sample_json; #[tokio::main] async fn main() -> Result<()> { env_logger::builder() - .filter_level(log::LevelFilter::Debug) + .filter_level(log::LevelFilter::Info) .init(); let sample_event = get_sample_json(); - let bootstrap_servers = String::from("localhost:9092"); + let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); From a8cf5957684825b650d51417b4567143e8f8e7c7 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 13:36:18 -0700 Subject: [PATCH 2/8] Adding orchestrator base --- Cargo.lock | 28 ++++++------ Cargo.toml | 2 +- crates/core/Cargo.toml | 2 +- .../src/datasource/kafka/kafka_stream_read.rs | 31 ++++++------- .../core/src/datasource/kafka/topic_reader.rs | 2 +- crates/core/src/datastream.rs | 30 ++++++------- crates/core/src/lib.rs | 1 - crates/core/src/orchestrator/mod.rs | 1 - crates/core/src/orchestrator/orchestrator.rs | 17 ------- crates/{channels => orchestrator}/Cargo.toml | 2 +- .../src/channel_manager/mod.rs | 10 ++--- crates/{channels => orchestrator}/src/lib.rs | 1 + crates/orchestrator/src/orchestrator.rs | 44 +++++++++++++++++++ examples/examples/simple_aggregation.rs | 2 +- 14 files changed, 96 insertions(+), 77 deletions(-) delete mode 100644 crates/core/src/orchestrator/mod.rs delete mode 100644 crates/core/src/orchestrator/orchestrator.rs rename crates/{channels => orchestrator}/Cargo.toml (87%) rename crates/{channels => orchestrator}/src/channel_manager/mod.rs (76%) rename crates/{channels => orchestrator}/src/lib.rs (53%) create mode 100644 crates/orchestrator/src/orchestrator.rs diff --git a/Cargo.lock b/Cargo.lock index 6650ba0..52f6633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1276,8 +1276,8 @@ dependencies = [ "chrono", "datafusion", "delegate", - "denormalized-channels", "denormalized-common", + "denormalized-orchestrator", "futures", "half", "hashbrown", @@ -1291,19 +1291,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "denormalized-channels" -version = "0.0.1" -dependencies = [ - "anyhow", - "crossbeam", - "datafusion", - "log", - "once_cell", - "parking_lot", - "tokio", -] - [[package]] name = "denormalized-common" version = "0.0.1" @@ -1337,6 +1324,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "denormalized-orchestrator" +version = "0.0.1" +dependencies = [ + "anyhow", + "crossbeam", + "datafusion", + "log", + "once_cell", + "parking_lot", + "tokio", +] + [[package]] name = "denormalized-python" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 29308f9..8802e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ description = "Embeddable stream processing engine" [workspace.dependencies] denormalized = { path = "crates/core" } denormalized-common = { path = "crates/common" } -denormalized-channels = { path = "crates/channels" } +denormalized-orchestrator = { path = "crates/orchestrator" } datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "amey/patch-with-node-id" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3289863..95091bb 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -5,7 +5,7 @@ edition = { workspace = true } [dependencies] denormalized-common = { workspace = true } -denormalized-channels = { workspace = true } +denormalized-orchestrator = { workspace = true } datafusion = { workspace = true } diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 56f75a5..792c5b0 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -5,7 +5,8 @@ use std::time::Duration; use arrow::datatypes::TimestampMillisecondType; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; -use denormalized_channels::channel_manager::{create_channel, get_sender}; +use denormalized_orchestrator::channel_manager::{create_channel, get_sender}; +use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage}; use log::{debug, error}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -137,26 +138,20 @@ impl PartitionStream for KafkaStreamRead { let timestamp_column: String = self.config.timestamp_column.clone(); let timestamp_unit = self.config.timestamp_unit.clone(); let batch_timeout = Duration::from_millis(100); - - let partition_tag = self - .assigned_partitions - .iter() - .map(|&num| num.to_string()) - .collect::>() - .join("_"); - - let node_id = self.exec_node_id.unwrap(); - let channel_tag = format!("{}_{}", node_id, partition_tag); - create_channel(channel_tag.as_str(), 10); + let mut channel_tag = String::from(""); + if orchestrator::SHOULD_CHECKPOINT { + let node_id = self.exec_node_id.unwrap(); + channel_tag = format!("{}_{}", node_id, partition_tag); + create_channel(channel_tag.as_str(), 10); + } builder.spawn(async move { let mut epoch = 0; - let orchestrator_sender = get_sender("orchestrator"); + if orchestrator::SHOULD_CHECKPOINT { + let orchestrator_sender = get_sender("orchestrator"); + let msg = OrchestrationMessage::RegisterStream(channel_tag.clone()); + orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); + } loop { - if epoch == 0 { - let msg = format!("Registering {}", channel_tag); - debug!("sending {} to orchestrator", msg); - orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); - } let mut last_offsets = HashMap::new(); if let Some(backend) = &state_backend { if let Some(offsets) = backend diff --git a/crates/core/src/datasource/kafka/topic_reader.rs b/crates/core/src/datasource/kafka/topic_reader.rs index 6eca1b4..66d72fe 100644 --- a/crates/core/src/datasource/kafka/topic_reader.rs +++ b/crates/core/src/datasource/kafka/topic_reader.rs @@ -7,7 +7,7 @@ use datafusion::common::{not_impl_err, plan_err, Result}; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Expr, TableType}; use datafusion::physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; -use datafusion::physical_plan::{streaming::StreamingTableExec, ExecutionPlan}; +use datafusion::physical_plan::ExecutionPlan; use crate::physical_plan::stream_table::DenormalizedStreamingTableExec; diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 7c704f0..8c2e411 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,7 +1,8 @@ +use datafusion::common::runtime::SpawnedTask; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlanProperties; -use denormalized_channels::channel_manager::take_receiver; +use denormalized_orchestrator::orchestrator; use futures::StreamExt; use std::{sync::Arc, time::Duration}; @@ -17,11 +18,9 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; -use crate::orchestrator; -use crate::orchestrator::orchestrator::Orchestrator; use crate::physical_plan::utils::time::TimestampUnit; +use denormalized_orchestrator::orchestrator::Orchestrator; -use denormalized_channels::channel_manager::create_channel; use denormalized_common::error::Result; /// The primary interface for building a streaming job @@ -135,18 +134,17 @@ impl DataStream { /// execute the stream and print the results to stdout. /// Mainly used for development and debugging pub async fn print_stream(&self) -> Result<()> { - println!("entered print stream."); - - let plan = self.df.as_ref().clone().create_physical_plan().await?; - let node_ids = extract_node_ids_and_partitions(&plan); - let max_buffer_size = node_ids - .iter() - .map(|x| x.1) - .fold(0, |sum, partition_count| sum + partition_count); - println!("creating orchestrator channel"); - let orchestrator = Orchestrator {}; - tokio::task::spawn_blocking(move || orchestrator.run(max_buffer_size)); - println!("started the orchestrator"); + if orchestrator::SHOULD_CHECKPOINT { + let plan = self.df.as_ref().clone().create_physical_plan().await?; + let node_ids = extract_node_ids_and_partitions(&plan); + let max_buffer_size = node_ids + .iter() + .map(|x| x.1) + .fold(0, |sum, partition_count| sum + partition_count); + let mut orchestrator = Orchestrator::default(); + SpawnedTask::spawn_blocking(move || orchestrator.run(max_buffer_size)); + } + let mut stream: SendableRecordBatchStream = self.df.as_ref().clone().execute_stream().await?; loop { diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5d005e5..b915289 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,7 +4,6 @@ pub mod context; pub mod datasource; pub mod datastream; pub mod logical_plan; -pub mod orchestrator; pub mod physical_optimizer; pub mod physical_plan; pub mod planner; diff --git a/crates/core/src/orchestrator/mod.rs b/crates/core/src/orchestrator/mod.rs deleted file mode 100644 index 7b53598..0000000 --- a/crates/core/src/orchestrator/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod orchestrator; diff --git a/crates/core/src/orchestrator/orchestrator.rs b/crates/core/src/orchestrator/orchestrator.rs deleted file mode 100644 index ed8f9f0..0000000 --- a/crates/core/src/orchestrator/orchestrator.rs +++ /dev/null @@ -1,17 +0,0 @@ -use denormalized_channels::channel_manager::{create_channel, take_receiver}; -use log::{debug, info}; - -pub struct Orchestrator {} - -impl Orchestrator { - pub fn run(&self, managed_tasks: usize) { - info!("creating a channel"); - create_channel("orchestrator", managed_tasks); - let receiver = take_receiver("orchestrator"); - info!("receiver is {:?}", receiver); - loop { - let msg = receiver.as_ref().unwrap().recv().unwrap(); - info!("Received from blocking channel: {}", msg); - } - } -} diff --git a/crates/channels/Cargo.toml b/crates/orchestrator/Cargo.toml similarity index 87% rename from crates/channels/Cargo.toml rename to crates/orchestrator/Cargo.toml index a3cedb5..10c6180 100644 --- a/crates/channels/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "denormalized-channels" +name = "denormalized-orchestrator" version = { workspace = true } edition = { workspace = true } diff --git a/crates/channels/src/channel_manager/mod.rs b/crates/orchestrator/src/channel_manager/mod.rs similarity index 76% rename from crates/channels/src/channel_manager/mod.rs rename to crates/orchestrator/src/channel_manager/mod.rs index ee028b7..f1a16c6 100644 --- a/crates/channels/src/channel_manager/mod.rs +++ b/crates/orchestrator/src/channel_manager/mod.rs @@ -5,11 +5,11 @@ use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; -type Message = String; +use crate::orchestrator::OrchestrationMessage; struct ChannelPair { - sender: channel::Sender, - receiver: Option>, + sender: channel::Sender, + receiver: Option>, } struct Channels { @@ -37,12 +37,12 @@ pub fn create_channel(id: &str, buffer: usize) { ); } -pub fn get_sender(id: &str) -> Option> { +pub fn get_sender(id: &str) -> Option> { let channels = GLOBAL_CHANNELS.read(); channels.channels.get(id).map(|pair| pair.sender.clone()) } -pub fn take_receiver(id: &str) -> Option> { +pub fn take_receiver(id: &str) -> Option> { let mut channels = GLOBAL_CHANNELS.write(); channels .channels diff --git a/crates/channels/src/lib.rs b/crates/orchestrator/src/lib.rs similarity index 53% rename from crates/channels/src/lib.rs rename to crates/orchestrator/src/lib.rs index 9644df2..fb8b8ce 100644 --- a/crates/channels/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1 +1,2 @@ pub mod channel_manager; +pub mod orchestrator; diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs new file mode 100644 index 0000000..e125b63 --- /dev/null +++ b/crates/orchestrator/src/orchestrator.rs @@ -0,0 +1,44 @@ +use std::collections::HashMap; + +use crate::channel_manager::{create_channel, get_sender, take_receiver}; +use crossbeam::channel; +use log::{debug, info}; + +#[derive(Clone, Debug)] +pub enum OrchestrationMessage { + RegisterStream(String), + CheckpointBarrier(String), + CheckpointComplete(String), +} + +pub struct Orchestrator { + senders: HashMap>, +} + +pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG + +impl Orchestrator { + pub fn default() -> Self { + Self { + senders: HashMap::new(), + } + } + + pub fn run(&mut self, managed_tasks: usize) { + info!("Orchestrator started."); + create_channel("orchestrator", managed_tasks); + let receiver = take_receiver("orchestrator"); + loop { + let msg: OrchestrationMessage = receiver.as_ref().unwrap().recv().unwrap(); + match msg { + OrchestrationMessage::RegisterStream(stream_id) => { + debug!("registering stream {}", stream_id); + let sender = get_sender(&stream_id).unwrap(); + self.senders.insert(stream_id, sender); + } + OrchestrationMessage::CheckpointBarrier(_) => todo!(), + OrchestrationMessage::CheckpointComplete(_) => todo!(), + } + } + } +} diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index f0d45da..7185b8d 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -15,7 +15,7 @@ use denormalized_examples::get_sample_json; #[tokio::main] async fn main() -> Result<()> { env_logger::builder() - .filter_level(log::LevelFilter::Info) + .filter_level(log::LevelFilter::Debug) .init(); let sample_event = get_sample_json(); From 33fd49a8e55b45e4a501a2cfedbf0ffe80029e2c Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 13:47:52 -0700 Subject: [PATCH 3/8] Fmt changes --- crates/core/src/datastream.rs | 2 +- crates/orchestrator/src/orchestrator.rs | 7 +------ examples/examples/kafka_rideshare.rs | 2 +- examples/examples/simple_aggregation.rs | 2 +- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 8c2e411..1ef15de 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -140,7 +140,7 @@ impl DataStream { let max_buffer_size = node_ids .iter() .map(|x| x.1) - .fold(0, |sum, partition_count| sum + partition_count); + .sum::(); let mut orchestrator = Orchestrator::default(); SpawnedTask::spawn_blocking(move || orchestrator.run(max_buffer_size)); } diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index e125b63..3ada4ea 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -11,6 +11,7 @@ pub enum OrchestrationMessage { CheckpointComplete(String), } +#[derive(Default)] pub struct Orchestrator { senders: HashMap>, } @@ -18,12 +19,6 @@ pub struct Orchestrator { pub const SHOULD_CHECKPOINT: bool = false; // THIS WILL BE MOVED INTO CONFIG impl Orchestrator { - pub fn default() -> Self { - Self { - senders: HashMap::new(), - } - } - pub fn run(&mut self, managed_tasks: usize) { info!("Orchestrator started."); create_channel("orchestrator", managed_tasks); diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 83c92ac..82a1cb4 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -49,7 +49,7 @@ async fn main() -> Result<()> { } }"#; - let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); + let bootstrap_servers = String::from("localhost:9092"); let ctx = Context::new()?; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 7185b8d..a0bb20f 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -20,7 +20,7 @@ async fn main() -> Result<()> { let sample_event = get_sample_json(); - let bootstrap_servers = String::from("localhost:19092,localhost:29092,localhost:39092"); + let bootstrap_servers = String::from("localhost:9092"); let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); From 74d5764d978ddcf3a92ef879f06cb837375056cd Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 13:53:21 -0700 Subject: [PATCH 4/8] rv examples --- examples/examples/kafka_rideshare.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 82a1cb4..6270ef0 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -79,10 +79,8 @@ async fn main() -> Result<()> { )?; // ds.clone().print_stream().await?; - //ds.clone().create_orchestrator_thread().await?; - ds.print_physical_plan().await?; - //ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) - // .await?; + ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) + .await?; Ok(()) } From eaec3a6eff5c57d379d500516b394bbc05f09c15 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 13:53:59 -0700 Subject: [PATCH 5/8] rv examples --- examples/examples/kafka_rideshare.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index 6270ef0..b520a7b 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -79,6 +79,7 @@ async fn main() -> Result<()> { )?; // ds.clone().print_stream().await?; + ds.sink_kafka(bootstrap_servers.clone(), String::from("out_topic")) .await?; From c09b0ec5bd7d7f8a905ec3b5ad7f81bed4cdcb3f Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 14:33:05 -0700 Subject: [PATCH 6/8] respond to PR comments --- Cargo.lock | 1 - crates/orchestrator/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52f6633..35b22bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1328,7 +1328,6 @@ dependencies = [ name = "denormalized-orchestrator" version = "0.0.1" dependencies = [ - "anyhow", "crossbeam", "datafusion", "log", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 10c6180..ba7c0af 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -4,7 +4,6 @@ version = { workspace = true } edition = { workspace = true } [dependencies] -anyhow = "1.0.86" crossbeam = "0.8.4" datafusion = { workspace = true } log.workspace = true From d4213c7629c9463e64b86c26c88e97eb13c02c81 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 14:34:03 -0700 Subject: [PATCH 7/8] use main branch for datafusion --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8802e0b..774c472 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ denormalized = { path = "crates/core" } denormalized-common = { path = "crates/common" } denormalized-orchestrator = { path = "crates/orchestrator" } -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "amey/patch-with-node-id" } +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" } arrow = { version = "52.0.0", features = ["prettyprint"] } arrow-array = { version = "52.0.0", default-features = false, features = [ From 9a2391d0b749929741828306a40dbb31af310a1f Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 6 Sep 2024 14:41:51 -0700 Subject: [PATCH 8/8] rust fmt --- Cargo.lock | 38 +++++++++++++++++------------------ crates/core/src/datastream.rs | 5 +---- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35b22bd..dc4c914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,7 +864,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "apache-avro", @@ -923,7 +923,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow-schema", "async-trait", @@ -936,7 +936,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "apache-avro", @@ -960,7 +960,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "tokio", ] @@ -968,7 +968,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "chrono", @@ -988,7 +988,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1009,7 +1009,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "datafusion-common", @@ -1019,7 +1019,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "arrow-buffer", @@ -1045,7 +1045,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1065,7 +1065,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1078,7 +1078,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "arrow-array", @@ -1099,7 +1099,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1110,7 +1110,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "async-trait", @@ -1129,7 +1129,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1160,7 +1160,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1173,7 +1173,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1188,7 +1188,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1200,7 +1200,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "ahash", "arrow", @@ -1235,7 +1235,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=amey/patch-with-node-id#8ca759f18a606b91672ebc346192507d94202323" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#2e125836d4e75c1d07adf30d3f13e70c3afc1416" dependencies = [ "arrow", "arrow-array", diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 1ef15de..69a8397 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -137,10 +137,7 @@ impl DataStream { if orchestrator::SHOULD_CHECKPOINT { let plan = self.df.as_ref().clone().create_physical_plan().await?; let node_ids = extract_node_ids_and_partitions(&plan); - let max_buffer_size = node_ids - .iter() - .map(|x| x.1) - .sum::(); + let max_buffer_size = node_ids.iter().map(|x| x.1).sum::(); let mut orchestrator = Orchestrator::default(); SpawnedTask::spawn_blocking(move || orchestrator.run(max_buffer_size)); }