Skip to content

Commit

Permalink
Adding Kafka Timestamp support when no ts column is provided (#63)
Browse files Browse the repository at this point in the history
* Adding Kafka Timestamp support when no ts column is provided

* feat: add default kafka timestamp behavior to python API

* update lock file

* format python

* update macosx runner

---------

Co-authored-by: Matt Green <emgeee@users.noreply.github.com>
  • Loading branch information
ameyc and emgeee authored Nov 25, 2024
1 parent f652689 commit 632a318
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ jobs:
strategy:
matrix:
platform:
- runner: macos-12
- runner: macos-14
target: x86_64
- runner: macos-14
target: aarch64
Expand Down
16 changes: 4 additions & 12 deletions crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct KafkaReadConfig {
pub encoding: StreamEncoding,
pub order: Vec<Vec<SortExpr>>,
pub partition_count: i32,
pub timestamp_column: String,
pub timestamp_unit: TimestampUnit,
pub timestamp_column: Option<String>,
pub timestamp_unit: Option<TimestampUnit>,

pub kafka_connection_opts: ConnectionOpts,
}
Expand Down Expand Up @@ -232,17 +232,9 @@ impl KafkaTopicBuilder {
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("encoding required".to_string()))?;

let timestamp_column = self
.timestamp_column
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_column required".to_string()))?
.clone();
let timestamp_column = self.timestamp_column.clone();

let timestamp_unit = self
.timestamp_unit
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_unit required".to_string()))?
.clone();
let timestamp_unit = self.timestamp_unit.clone();

let mut kafka_connection_opts = ConnectionOpts::new();
for (key, value) in opts.into_iter() {
Expand Down
41 changes: 32 additions & 9 deletions crates/core/src/datasource/kafka/kafka_stream_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::datatypes::TimestampMillisecondType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray};
use arrow_array::{
Array, ArrayRef, Int64Array, PrimitiveArray, RecordBatch, StringArray, StructArray,
};
use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
use crossbeam::channel;
use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver};
Expand Down Expand Up @@ -146,8 +148,17 @@ impl PartitionStream for KafkaStreamRead {
let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1);
let tx = builder.tx();
let canonical_schema = self.config.schema.clone();
let timestamp_column: String = self.config.timestamp_column.clone();
let timestamp_unit = self.config.timestamp_unit.clone();
let use_kafka_timestamps = self.config.timestamp_column.is_none();
let timestamp_column = self
.config
.timestamp_column
.clone()
.unwrap_or(String::from(""));
let timestamp_unit = self
.config
.timestamp_unit
.clone()
.unwrap_or(crate::prelude::TimestampUnit::Int64Millis);
let batch_timeout: Duration = Duration::from_millis(100);
let mut decoder = self.config.build_decoder();

Expand Down Expand Up @@ -180,10 +191,13 @@ impl PartitionStream for KafkaStreamRead {

let mut offsets_read: HashMap<i32, i64> = HashMap::new();

let mut timestamps: Vec<i64> = Vec::new();
loop {
match tokio::time::timeout(batch_timeout, consumer.recv()).await {
Ok(Ok(m)) => {
let payload = m.payload().expect("Message payload is empty");
let ts = m.timestamp().to_millis().unwrap_or(-1);
timestamps.push(ts);
decoder.push_to_buffer(payload.to_owned());
offsets_read
.entry(m.partition())
Expand All @@ -207,12 +221,21 @@ impl PartitionStream for KafkaStreamRead {

if !offsets_read.is_empty() {
let record_batch = decoder.to_record_batch().unwrap();
let ts_column = record_batch
.column_by_name(timestamp_column.as_str())
.map(|ts_col| {
Arc::new(array_to_timestamp_array(ts_col, timestamp_unit.clone()))
})
.unwrap();

let kafka_ts_array = Int64Array::from_iter_values(timestamps);
let ts_column_array: Arc<dyn Array> = Arc::new(kafka_ts_array);
let mut ts_column = Arc::new(array_to_timestamp_array(
ts_column_array.as_ref(),
timestamp_unit.clone(),
));

// Timestamp column was provided. TODO: This code is a hack for now we need a little cleanup here.
if !use_kafka_timestamps {
let arr = record_batch
.column_by_name(timestamp_column.as_str())
.unwrap();
ts_column = Arc::new(array_to_timestamp_array(arr, timestamp_unit.clone()));
};

let binary_vec = Vec::from_iter(
std::iter::repeat(String::from("no_barrier")).take(ts_column.len()),
Expand Down
62 changes: 34 additions & 28 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,6 @@ impl WindowAggStream {
let mut watermark_lock: std::sync::MutexGuard<Option<SystemTime>> =
self.latest_watermark.lock().unwrap();

debug!("latest watermark currently is {:?}", *watermark_lock);
if let Some(current_watermark) = *watermark_lock {
if current_watermark <= watermark.min_timestamp {
*watermark_lock = Some(watermark.min_timestamp)
Expand Down Expand Up @@ -987,34 +986,41 @@ impl FullWindowAggStream {
if self.seen_windows.contains(&start_time)
&& !self.cached_frames.contains_key(&start_time)
{
panic!("we are reopening a window already seen.")
debug!(
"received late data for window with start time {:?}. dropping it.",
start_time
);
Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
)))
} else {
let frame = self.cached_frames.entry(start_time).or_insert({
FullWindowAggFrame::new(
start_time,
batch_end_time.unwrap(),
&self.exec_aggregate_expressions,
self.aggregate_expressions.clone(),
self.filter_expressions.clone(),
self.schema.clone(),
self.baseline_metrics.clone(),
)
});

self.seen_windows.insert(start_time);

//last two columns are timestamp columns, so remove them before pushing them onto a frame.
let col_size = rb.num_columns();
rb.remove_column(col_size - 1);
rb.remove_column(col_size - 2);

frame.aggregate_batch(rb);

self.watermark = self
.watermark
.map_or(Some(start_time), |w| Some(w.max(start_time)));

self.finalize_windows()
}
let frame = self.cached_frames.entry(start_time).or_insert({
FullWindowAggFrame::new(
start_time,
batch_end_time.unwrap(),
&self.exec_aggregate_expressions,
self.aggregate_expressions.clone(),
self.filter_expressions.clone(),
self.schema.clone(),
self.baseline_metrics.clone(),
)
});

self.seen_windows.insert(start_time);

//last two columns are timestamp columns, so remove them before pushing them onto a frame.
let col_size = rb.num_columns();
rb.remove_column(col_size - 1);
rb.remove_column(col_size - 2);

frame.aggregate_batch(rb);

self.watermark = self
.watermark
.map_or(Some(start_time), |w| Some(w.max(start_time)));

self.finalize_windows()
} else {
Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
Expand Down
1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "Embeddable stream processing engine"
dependencies = [
"pyarrow>=17.0.0",
"datafusion>=40.1.0",
"pip>=24.3.1",
]

[project.optional-dependencies]
Expand Down
39 changes: 16 additions & 23 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,27 @@
class Context:
"""A context manager for handling data stream operations.
This class provides an interface for creating and managing data streams,
particularly for working with Kafka topics and stream processing.
Attributes:
ctx: Internal PyContext instance managing Rust-side operations
This class provides functionality to create and manage data streams
from various sources like Kafka topics.
"""

def __init__(self) -> None:
"""Initialize a new Context instance."""
"""Initializes a new Context instance with PyContext."""
self.ctx = PyContext()

def __repr__(self):
"""Return a string representation of the Context object.
"""Returns the string representation of the PyContext object.
Returns:
str: A detailed string representation of the context
str: String representation of the underlying PyContext.
"""
return self.ctx.__repr__()

def __str__(self):
"""Return a readable string description of the Context object.
"""Returns the string representation of the PyContext object.
Returns:
str: A human-readable string description
str: String representation of the underlying PyContext.
"""
return self.ctx.__str__()

Expand All @@ -37,31 +34,27 @@ def from_topic(
topic: str,
sample_json: str,
bootstrap_servers: str,
timestamp_column: str,
timestamp_column: str | None = None,
group_id: str = "default_group",
) -> DataStream:
"""Create a new DataStream from a Kafka topic.
"""Creates a new DataStream from a Kafka topic.
Args:
topic: Name of the Kafka topic to consume from
sample_json: Sample JSON string representing the expected message format
bootstrap_servers: Comma-separated list of Kafka broker addresses
timestamp_column: Column name containing event timestamps
group_id: Kafka consumer group ID (defaults to "default_group")
topic: The name of the Kafka topic to consume from.
sample_json: A sample JSON string representing the expected message format.
bootstrap_servers: Comma-separated list of Kafka broker addresses.
timestamp_column: Optional column name containing message timestamps.
group_id: Kafka consumer group ID, defaults to "default_group".
Returns:
DataStream: A new DataStream instance configured for the specified topic
Raises:
ValueError: If the topic name is empty or invalid
ConnectionError: If unable to connect to Kafka brokers
DataStream: A new DataStream instance connected to the specified topic.
"""
py_ds = self.ctx.from_topic(
topic,
sample_json,
bootstrap_servers,
timestamp_column,
group_id,
timestamp_column,
)
ds = DataStream(py_ds)
return ds
14 changes: 12 additions & 2 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,33 @@ impl PyContext {
Ok("PyContext".to_string())
}

#[pyo3(signature = (
topic,
sample_json,
bootstrap_servers,
group_id,
timestamp_column = None
))]
pub fn from_topic(
&self,
py: Python,
topic: String,
sample_json: String,
bootstrap_servers: String,
timestamp_column: String,
group_id: String,
timestamp_column: Option<String>,
) -> PyResult<PyDataStream> {
let context = self.context.clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<DataStream>> =
rt.spawn(async move {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

if let Some(ts_col) = timestamp_column {
topic_builder.with_timestamp(ts_col, TimestampUnit::Int64Millis);
}

let source_topic = topic_builder
.with_timestamp(timestamp_column, TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
Expand Down
2 changes: 2 additions & 0 deletions py-denormalized/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 632a318

Please sign in to comment.