From cd72f0c3690e06f08c56daa063a448639cf0fe75 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 2 Aug 2024 16:22:08 -0700 Subject: [PATCH 1/3] Emit one kafka message per row in a recordbatch --- Cargo.toml | 4 +- .../core/src/datasource/kafka/topic_writer.rs | 21 ++--- crates/core/src/utils/mod.rs | 1 + crates/core/src/utils/row_encoder.rs | 90 +++++++++++++++++++ 4 files changed, 101 insertions(+), 15 deletions(-) create mode 100644 crates/core/src/utils/row_encoder.rs diff --git a/Cargo.toml b/Cargo.toml index 2c4d3e0..1e83231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,10 @@ authors = [ "Amey Chaugule ", ] edition = "2021" -homepage = "https://github.com/probably-nothing-labs/df-streams" +homepage = "https://github.com/probably-nothing-labs/denormalized" license = "Apache-2.0" readme = "README.md" -repository = "https://github.com/probably-nothing-labs/df-streams" +repository = "https://github.com/probably-nothing-labs/denormalized" version = "0.1.0" description = "Embeddable stream processing engine" diff --git a/crates/core/src/datasource/kafka/topic_writer.rs b/crates/core/src/datasource/kafka/topic_writer.rs index 9d330da..70a8f6c 100644 --- a/crates/core/src/datasource/kafka/topic_writer.rs +++ b/crates/core/src/datasource/kafka/topic_writer.rs @@ -4,7 +4,6 @@ use std::fmt::{self, Debug}; use std::time::Duration; use std::{any::Any, sync::Arc}; -use arrow::json::LineDelimitedWriter; use arrow_schema::SchemaRef; use datafusion::catalog::Session; @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; use super::KafkaWriteConfig; +use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder}; // Used to createa kafka source pub struct TopicWriter(pub Arc); @@ -110,21 +110,16 @@ impl DataSink for KafkaSink { while let Some(batch) = data.next().await.transpose()? { row_count += batch.num_rows(); - if batch.num_rows() > 0 { - let buf = Vec::new(); - let mut writer = LineDelimitedWriter::new(buf); - writer.write_batches(&[&batch])?; - writer.finish()?; - let buf = writer.into_inner(); + let encoder = JsonRowEncoder {}; + let rows = encoder.encode(&batch)?; - let record = FutureRecord::<[u8], _>::to(topic).payload(&buf); + for row in rows { + let record = FutureRecord::<[u8], _>::to(topic).payload(&row); // .key(key.as_str()), - let _delivery_status = self - .producer - .send(record, Duration::from_secs(0)) - .await - .expect("Message not delivered"); + if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await { + tracing::error!("{}", msg.0); + } } } diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index f77961d..ce4f0b7 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -2,5 +2,6 @@ pub mod arrow_helpers; mod default_optimizer_rules; pub mod serialize; +pub mod row_encoder; pub use default_optimizer_rules::get_default_optimizer_rules; diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs new file mode 100644 index 0000000..2144865 --- /dev/null +++ b/crates/core/src/utils/row_encoder.rs @@ -0,0 +1,90 @@ +use arrow::json::writer::{JsonFormat, Writer}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion_common::Result; + +pub trait RowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>>; +} + +#[derive(Debug, Default)] +// Formats json without any characting separating items. +pub struct NoDelimiter {} +impl JsonFormat for NoDelimiter {} +// writes rows as json without any character separating them +type JsonWriter = Writer; + +pub struct JsonRowEncoder {} + +impl JsonRowEncoder { + pub fn batch_to_json(&self, batch: &RecordBatch) -> Result> { + let buf = Vec::new(); + let mut writer = JsonWriter::new(buf); + writer.write(batch)?; + writer.finish()?; + let buf = writer.into_inner(); + + Ok(buf) + } +} + +impl RowEncoder for JsonRowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>> { + if batch.num_rows() == 0 { + return Ok(vec![]); + } + + // BufWriter uses a buffer size of 8KB + // We therefore double this and flush once we have more than 8KB + let mut buffer = Vec::with_capacity(batch.num_rows()); + + for i in 0..batch.num_rows() { + let row = batch.slice(i, 1); + buffer.push(self.batch_to_json(&row)?); + } + + Ok(buffer) + } +} + +#[cfg(test)] +mod tests { + use super::{JsonRowEncoder, RowEncoder}; + + use datafusion::arrow::array::{Int32Array, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn serialize_record_batch_to_json() { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + + let encoder = JsonRowEncoder {}; + let buf = encoder.encode(&batch).unwrap(); + + let res: Vec<&[u8]> = vec![ + "{\"col1\":\"a\",\"col2\":1}", + "{\"col1\":\"b\",\"col2\":10}", + "{\"col1\":\"c\",\"col2\":20}", + "{\"col1\":\"d\",\"col2\":100}", + ] + .iter() + .map(|v| v.as_bytes()) + .collect::<_>(); + + assert_eq!(buf, res); + } +} From 203f25ae1e35756c3d6d443bb031d6755e435114 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Fri, 2 Aug 2024 16:59:59 -0700 Subject: [PATCH 2/3] fix formatting --- crates/core/src/utils/mod.rs | 2 +- crates/core/src/utils/row_encoder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index ce4f0b7..73cef59 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -1,7 +1,7 @@ #[allow(dead_code)] pub mod arrow_helpers; mod default_optimizer_rules; -pub mod serialize; pub mod row_encoder; +pub mod serialize; pub use default_optimizer_rules::get_default_optimizer_rules; diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs index 2144865..49ba86a 100644 --- a/crates/core/src/utils/row_encoder.rs +++ b/crates/core/src/utils/row_encoder.rs @@ -75,7 +75,7 @@ mod tests { let encoder = JsonRowEncoder {}; let buf = encoder.encode(&batch).unwrap(); - let res: Vec<&[u8]> = vec![ + let res: Vec<&[u8]> = [ "{\"col1\":\"a\",\"col2\":1}", "{\"col1\":\"b\",\"col2\":10}", "{\"col1\":\"c\",\"col2\":20}", From 3edfee4add13ccd8fa3a642c47b402d51e6794d1 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 5 Aug 2024 11:44:59 -0700 Subject: [PATCH 3/3] remove comment --- crates/core/src/utils/row_encoder.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs index 49ba86a..fea5a8f 100644 --- a/crates/core/src/utils/row_encoder.rs +++ b/crates/core/src/utils/row_encoder.rs @@ -33,10 +33,7 @@ impl RowEncoder for JsonRowEncoder { return Ok(vec![]); } - // BufWriter uses a buffer size of 8KB - // We therefore double this and flush once we have more than 8KB let mut buffer = Vec::with_capacity(batch.num_rows()); - for i in 0..batch.num_rows() { let row = batch.slice(i, 1); buffer.push(self.batch_to_json(&row)?);