From 632a318f971ab8686f30c3242f2e94f6aaaeb756 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Mon, 25 Nov 2024 11:57:52 -0800 Subject: [PATCH] Adding Kafka Timestamp support when no ts column is provided (#63) * Adding Kafka Timestamp support when no ts column is provided * feat: add default kafka timestamp behavior to python API * update lock file * format python * update macosx runner --------- Co-authored-by: Matt Green --- .github/workflows/python.yml | 2 +- .../core/src/datasource/kafka/kafka_config.rs | 16 ++--- .../src/datasource/kafka/kafka_stream_read.rs | 41 +++++++++--- .../continuous/streaming_window.rs | 62 ++++++++++--------- py-denormalized/pyproject.toml | 1 + .../python/denormalized/context.py | 39 +++++------- py-denormalized/src/context.rs | 14 ++++- py-denormalized/uv.lock | 2 + 8 files changed, 102 insertions(+), 75 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index f59a614..3023d6b 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -119,7 +119,7 @@ jobs: strategy: matrix: platform: - - runner: macos-12 + - runner: macos-14 target: x86_64 - runner: macos-14 target: aarch64 diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index 6559cb9..84b8dbf 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -37,8 +37,8 @@ pub struct KafkaReadConfig { pub encoding: StreamEncoding, pub order: Vec>, pub partition_count: i32, - pub timestamp_column: String, - pub timestamp_unit: TimestampUnit, + pub timestamp_column: Option, + pub timestamp_unit: Option, pub kafka_connection_opts: ConnectionOpts, } @@ -232,17 +232,9 @@ impl KafkaTopicBuilder { .as_ref() .ok_or_else(|| DenormalizedError::KafkaConfig("encoding required".to_string()))?; - let timestamp_column = self - .timestamp_column - .as_ref() - .ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_column required".to_string()))? - .clone(); + let timestamp_column = self.timestamp_column.clone(); - let timestamp_unit = self - .timestamp_unit - .as_ref() - .ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_unit required".to_string()))? - .clone(); + let timestamp_unit = self.timestamp_unit.clone(); let mut kafka_connection_opts = ConnectionOpts::new(); for (key, value) in opts.into_iter() { diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index caed655..7dcdf94 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use std::time::Duration; use arrow::datatypes::TimestampMillisecondType; -use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray}; +use arrow_array::{ + Array, ArrayRef, Int64Array, PrimitiveArray, RecordBatch, StringArray, StructArray, +}; use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; use crossbeam::channel; use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; @@ -146,8 +148,17 @@ impl PartitionStream for KafkaStreamRead { let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1); let tx = builder.tx(); let canonical_schema = self.config.schema.clone(); - let timestamp_column: String = self.config.timestamp_column.clone(); - let timestamp_unit = self.config.timestamp_unit.clone(); + let use_kafka_timestamps = self.config.timestamp_column.is_none(); + let timestamp_column = self + .config + .timestamp_column + .clone() + .unwrap_or(String::from("")); + let timestamp_unit = self + .config + .timestamp_unit + .clone() + .unwrap_or(crate::prelude::TimestampUnit::Int64Millis); let batch_timeout: Duration = Duration::from_millis(100); let mut decoder = self.config.build_decoder(); @@ -180,10 +191,13 @@ impl PartitionStream for KafkaStreamRead { let mut offsets_read: HashMap = HashMap::new(); + let mut timestamps: Vec = Vec::new(); loop { match tokio::time::timeout(batch_timeout, consumer.recv()).await { Ok(Ok(m)) => { let payload = m.payload().expect("Message payload is empty"); + let ts = m.timestamp().to_millis().unwrap_or(-1); + timestamps.push(ts); decoder.push_to_buffer(payload.to_owned()); offsets_read .entry(m.partition()) @@ -207,12 +221,21 @@ impl PartitionStream for KafkaStreamRead { if !offsets_read.is_empty() { let record_batch = decoder.to_record_batch().unwrap(); - let ts_column = record_batch - .column_by_name(timestamp_column.as_str()) - .map(|ts_col| { - Arc::new(array_to_timestamp_array(ts_col, timestamp_unit.clone())) - }) - .unwrap(); + + let kafka_ts_array = Int64Array::from_iter_values(timestamps); + let ts_column_array: Arc = Arc::new(kafka_ts_array); + let mut ts_column = Arc::new(array_to_timestamp_array( + ts_column_array.as_ref(), + timestamp_unit.clone(), + )); + + // Timestamp column was provided. TODO: This code is a hack for now we need a little cleanup here. + if !use_kafka_timestamps { + let arr = record_batch + .column_by_name(timestamp_column.as_str()) + .unwrap(); + ts_column = Arc::new(array_to_timestamp_array(arr, timestamp_unit.clone())); + }; let binary_vec = Vec::from_iter( std::iter::repeat(String::from("no_barrier")).take(ts_column.len()), diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index c6ea378..49d8e27 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -736,7 +736,6 @@ impl WindowAggStream { let mut watermark_lock: std::sync::MutexGuard> = self.latest_watermark.lock().unwrap(); - debug!("latest watermark currently is {:?}", *watermark_lock); if let Some(current_watermark) = *watermark_lock { if current_watermark <= watermark.min_timestamp { *watermark_lock = Some(watermark.min_timestamp) @@ -987,34 +986,41 @@ impl FullWindowAggStream { if self.seen_windows.contains(&start_time) && !self.cached_frames.contains_key(&start_time) { - panic!("we are reopening a window already seen.") + debug!( + "received late data for window with start time {:?}. dropping it.", + start_time + ); + Ok(RecordBatch::new_empty(Arc::new( + add_window_columns_to_schema(self.schema.clone()), + ))) + } else { + let frame = self.cached_frames.entry(start_time).or_insert({ + FullWindowAggFrame::new( + start_time, + batch_end_time.unwrap(), + &self.exec_aggregate_expressions, + self.aggregate_expressions.clone(), + self.filter_expressions.clone(), + self.schema.clone(), + self.baseline_metrics.clone(), + ) + }); + + self.seen_windows.insert(start_time); + + //last two columns are timestamp columns, so remove them before pushing them onto a frame. + let col_size = rb.num_columns(); + rb.remove_column(col_size - 1); + rb.remove_column(col_size - 2); + + frame.aggregate_batch(rb); + + self.watermark = self + .watermark + .map_or(Some(start_time), |w| Some(w.max(start_time))); + + self.finalize_windows() } - let frame = self.cached_frames.entry(start_time).or_insert({ - FullWindowAggFrame::new( - start_time, - batch_end_time.unwrap(), - &self.exec_aggregate_expressions, - self.aggregate_expressions.clone(), - self.filter_expressions.clone(), - self.schema.clone(), - self.baseline_metrics.clone(), - ) - }); - - self.seen_windows.insert(start_time); - - //last two columns are timestamp columns, so remove them before pushing them onto a frame. - let col_size = rb.num_columns(); - rb.remove_column(col_size - 1); - rb.remove_column(col_size - 2); - - frame.aggregate_batch(rb); - - self.watermark = self - .watermark - .map_or(Some(start_time), |w| Some(w.max(start_time))); - - self.finalize_windows() } else { Ok(RecordBatch::new_empty(Arc::new( add_window_columns_to_schema(self.schema.clone()), diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index b8fb1af..eafbee2 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -11,6 +11,7 @@ description = "Embeddable stream processing engine" dependencies = [ "pyarrow>=17.0.0", "datafusion>=40.1.0", + "pip>=24.3.1", ] [project.optional-dependencies] diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 202b292..66ec3d1 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -5,30 +5,27 @@ class Context: """A context manager for handling data stream operations. - This class provides an interface for creating and managing data streams, - particularly for working with Kafka topics and stream processing. - - Attributes: - ctx: Internal PyContext instance managing Rust-side operations + This class provides functionality to create and manage data streams + from various sources like Kafka topics. """ def __init__(self) -> None: - """Initialize a new Context instance.""" + """Initializes a new Context instance with PyContext.""" self.ctx = PyContext() def __repr__(self): - """Return a string representation of the Context object. + """Returns the string representation of the PyContext object. Returns: - str: A detailed string representation of the context + str: String representation of the underlying PyContext. """ return self.ctx.__repr__() def __str__(self): - """Return a readable string description of the Context object. + """Returns the string representation of the PyContext object. Returns: - str: A human-readable string description + str: String representation of the underlying PyContext. """ return self.ctx.__str__() @@ -37,31 +34,27 @@ def from_topic( topic: str, sample_json: str, bootstrap_servers: str, - timestamp_column: str, + timestamp_column: str | None = None, group_id: str = "default_group", ) -> DataStream: - """Create a new DataStream from a Kafka topic. + """Creates a new DataStream from a Kafka topic. Args: - topic: Name of the Kafka topic to consume from - sample_json: Sample JSON string representing the expected message format - bootstrap_servers: Comma-separated list of Kafka broker addresses - timestamp_column: Column name containing event timestamps - group_id: Kafka consumer group ID (defaults to "default_group") + topic: The name of the Kafka topic to consume from. + sample_json: A sample JSON string representing the expected message format. + bootstrap_servers: Comma-separated list of Kafka broker addresses. + timestamp_column: Optional column name containing message timestamps. + group_id: Kafka consumer group ID, defaults to "default_group". Returns: - DataStream: A new DataStream instance configured for the specified topic - - Raises: - ValueError: If the topic name is empty or invalid - ConnectionError: If unable to connect to Kafka brokers + DataStream: A new DataStream instance connected to the specified topic. """ py_ds = self.ctx.from_topic( topic, sample_json, bootstrap_servers, - timestamp_column, group_id, + timestamp_column, ) ds = DataStream(py_ds) return ds diff --git a/py-denormalized/src/context.rs b/py-denormalized/src/context.rs index cee7a88..78eccee 100644 --- a/py-denormalized/src/context.rs +++ b/py-denormalized/src/context.rs @@ -71,14 +71,21 @@ impl PyContext { Ok("PyContext".to_string()) } + #[pyo3(signature = ( + topic, + sample_json, + bootstrap_servers, + group_id, + timestamp_column = None + ))] pub fn from_topic( &self, py: Python, topic: String, sample_json: String, bootstrap_servers: String, - timestamp_column: String, group_id: String, + timestamp_column: Option, ) -> PyResult { let context = self.context.clone(); let rt = &get_tokio_runtime(py).0; @@ -86,8 +93,11 @@ impl PyContext { rt.spawn(async move { let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + if let Some(ts_col) = timestamp_column { + topic_builder.with_timestamp(ts_col, TimestampUnit::Int64Millis); + } + let source_topic = topic_builder - .with_timestamp(timestamp_column, TimestampUnit::Int64Millis) .with_encoding("json")? .with_topic(topic) .infer_schema_from_json(sample_json.as_str())? diff --git a/py-denormalized/uv.lock b/py-denormalized/uv.lock index 7503e9d..1291b8e 100644 --- a/py-denormalized/uv.lock +++ b/py-denormalized/uv.lock @@ -374,6 +374,7 @@ version = "0.0.9" source = { editable = "." } dependencies = [ { name = "datafusion" }, + { name = "pip" }, { name = "pyarrow" }, ] @@ -401,6 +402,7 @@ docs = [ requires-dist = [ { name = "datafusion", specifier = ">=40.1.0" }, { name = "feast", marker = "extra == 'feast'" }, + { name = "pip", specifier = ">=24.3.1" }, { name = "pyarrow", specifier = ">=17.0.0" }, ]