Skip to content

Commit

Permalink
Merge branch 'main' into amey/checkpointing-deux
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc authored Aug 7, 2024
2 parents 7957cb1 + 57db66f commit eaaf07c
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
.vscode
.DS_Store
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ authors = [
"Amey Chaugule <amey@denormalized.io>",
]
edition = "2021"
homepage = "https://github.com/probably-nothing-labs/df-streams"
homepage = "https://github.com/probably-nothing-labs/denormalized"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/probably-nothing-labs/df-streams"
repository = "https://github.com/probably-nothing-labs/denormalized"
version = "0.1.0"
description = "Embeddable stream processing engine"

Expand Down Expand Up @@ -50,7 +50,7 @@ futures = "0.3"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = "0.3.18"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
tokio = { version = "1.36", features = ["macros", "rt", "sync", "rt-multi-thread"] }
async-trait = "0.1.81"
rdkafka = "0.36.2"
log = "^0.4"
Expand Down
41 changes: 30 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
# Denormalized
<h1>
<a href="https://www.denormalized.io">
<img src="./docs/images/denormalized_dark.png" alt="Denormalized Logo" width="512">
</a>
</h1>

Denormalized is a fast embeddable stream processing engine built on Apache DataFusion.
It currently supports sourcing and sinking to kafka, windowed aggregations, and stream joins.

While this repo is very much a *work-in-progress*, we currently support windowed aggregations and joins on streams of data with a
connector available for Kafka.
This repo is still a *work-in-progress* and we are actively seeking design partners. If you have have a specific use-case you'd like to discuss please drop us a line via a github issue or email hello@denormalized.io.

## Building Denormalized
## Quickstart

Simply run `cargo build`
### Prerequisites
- Docker
- Rust/Cargo installed

## Running Examples
### Running an example
1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
2. Start emitting some sample data: `cargo run --example emit_measurements`
3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example emit_measurements`

See our [benchmarking repo](https://github.com/probably-nothing-labs/benchmarking) for local Kafka setup and data generation.
## More examples

With the data generation in place, run -
A more powerful example can be seen in our [kafka ridesharing example](./docs/kafka_rideshare_example.md)

`cargo run --example kafka_rideshare`
## Roadmap
- [x] Stream aggregation
- [x] Stream joins
- [ ] Checkpointing / restoration
- [ ] Session windows
- [ ] Stateful UDF API
- [ ] DuckDB support
- [ ] Reading/writing from Postgres
- [ ] Python bindings
- [ ] Typescript bindings
- [ ] UI

## Credits

Denormalized is built and maintained by [Denormalized Inc](www.denormalized.io) from San Francisco. Please drop in a line to
hello@denormalized.io or simply open up a GitHub Issue.
Denormalized is built and maintained by [Denormalized](https://www.denormalized.io) in San Francisco. Please drop in a line to
hello@denormalized.io or simply open up a GitHub Issue!
21 changes: 8 additions & 13 deletions crates/core/src/datasource/kafka/topic_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::fmt::{self, Debug};
use std::time::Duration;
use std::{any::Any, sync::Arc};

use arrow::json::LineDelimitedWriter;
use arrow_schema::SchemaRef;

use datafusion::catalog::Session;
Expand All @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use super::KafkaWriteConfig;
use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder};

// Used to createa kafka source
pub struct TopicWriter(pub Arc<KafkaWriteConfig>);
Expand Down Expand Up @@ -110,21 +110,16 @@ impl DataSink for KafkaSink {
while let Some(batch) = data.next().await.transpose()? {
row_count += batch.num_rows();

if batch.num_rows() > 0 {
let buf = Vec::new();
let mut writer = LineDelimitedWriter::new(buf);
writer.write_batches(&[&batch])?;
writer.finish()?;
let buf = writer.into_inner();
let encoder = JsonRowEncoder {};
let rows = encoder.encode(&batch)?;

let record = FutureRecord::<[u8], _>::to(topic).payload(&buf);
for row in rows {
let record = FutureRecord::<[u8], _>::to(topic).payload(&row);
// .key(key.as_str()),

let _delivery_status = self
.producer
.send(record, Duration::from_secs(0))
.await
.expect("Message not delivered");
if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await {
tracing::error!("{}", msg.0);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[allow(dead_code)]
pub mod arrow_helpers;
mod default_optimizer_rules;
pub mod row_encoder;
pub mod serialize;

pub use default_optimizer_rules::get_default_optimizer_rules;
87 changes: 87 additions & 0 deletions crates/core/src/utils/row_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use arrow::json::writer::{JsonFormat, Writer};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_common::Result;

pub trait RowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>>;
}

#[derive(Debug, Default)]
// Formats json without any characting separating items.
pub struct NoDelimiter {}
impl JsonFormat for NoDelimiter {}
// writes rows as json without any character separating them
type JsonWriter<W> = Writer<W, NoDelimiter>;

pub struct JsonRowEncoder {}

impl JsonRowEncoder {
pub fn batch_to_json(&self, batch: &RecordBatch) -> Result<Vec<u8>> {
let buf = Vec::new();
let mut writer = JsonWriter::new(buf);
writer.write(batch)?;
writer.finish()?;
let buf = writer.into_inner();

Ok(buf)
}
}

impl RowEncoder for JsonRowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>> {
if batch.num_rows() == 0 {
return Ok(vec![]);
}

let mut buffer = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
let row = batch.slice(i, 1);
buffer.push(self.batch_to_json(&row)?);
}

Ok(buffer)
}
}

#[cfg(test)]
mod tests {
use super::{JsonRowEncoder, RowEncoder};

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[test]
fn serialize_record_batch_to_json() {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Utf8, false),
Field::new("col2", DataType::Int32, false),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();

let encoder = JsonRowEncoder {};
let buf = encoder.encode(&batch).unwrap();

let res: Vec<&[u8]> = [
"{\"col1\":\"a\",\"col2\":1}",
"{\"col1\":\"b\",\"col2\":10}",
"{\"col1\":\"c\",\"col2\":20}",
"{\"col1\":\"d\",\"col2\":100}",
]
.iter()
.map(|v| v.as_bytes())
.collect::<_>();

assert_eq!(buf, res);
}
}
Binary file added docs/images/denormalized_dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/denormalized_logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 35 additions & 0 deletions docs/kafka_rideshare_example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kafka Rideshare Example

This example application aggregates data across a more involved example setup.

### Configure Kafka Cluster

Clone our [docker compose files for running kafka](https://github.com/probably-nothing-labs/kafka-monitoring-stack-docker-compose). If you already have a different kafka cluster running, you can skip this step.
```sh
git clone git@github.com:probably-nothing-labs/kafka-monitoring-stack-docker-compose.git
cd kafka-monitoring-stack-docker-compose
docker compose -f denormalized-benchmark-cluster.yml up
```

This will spin up a 3 node kafka cluster in docker along with an instance of kafka-ui that can be viewed at http://localhost:8080/

### Generate some sample data to the kafka cluster

We wrote a [small rust tool](https://github.com/probably-nothing-labs/benchmarking) that will send fake traffic to the locally run rust program.
```sh
git clone git@github.com:probably-nothing-labs/benchmarking.git
cd benchmarking
cargo run -- -d 60 -a 1000
```

This will start a simulation for 60s and will create two topics: `driver-imu-data` and `trips` which should have around ~58k and ~500 messages accordingly.
There are several other knobs that can be tuned to change the amount of traffic which can be viewed with `cargo run -- --help`.
There are also several other knobs that are not exposes but can be changed in the [src/main.rs](https://github.com/probably-nothing-labs/benchmarking/blob/main/src/main.rs#L104-L108) file

### Run a Streaming Datafusion job

```sh
cargo run --example kafka_rideshare
```

Once everything is setup and one of the two streaming jobs is running, it is recommend to re-run the kafka data generation tool so that live data is produced. This is because watermark tracking of streaming data makes it difficult to properly aggregate older data that lives in the kafka topic.
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tempfile = { version = "3" }
rdkafka = { workspace = true }
rand = "0.8.5"
51 changes: 51 additions & 0 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use datafusion::error::Result;
use rdkafka::producer::FutureProducer;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;

#[derive(Serialize, Deserialize)]
pub struct Measurment {
occurred_at_ms: u64,
temperature: f64,
}

/// docker run -p 9092:9092 --name kafka apache/kafka
#[tokio::main]
async fn main() -> Result<()> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", String::from("localhost:9092"))
.set("message.timeout.ms", "60000")
.create()
.expect("Producer creation error");

let topic = "temperature".to_string();

loop {
let msg = serde_json::to_vec(&Measurment {
occurred_at_ms: get_timestamp_ms(),
temperature: rand::random::<f64>() * 115.0,
})
.unwrap();

producer
.send(
FutureRecord::<(), Vec<u8>>::to(topic.as_str()).payload(&msg),
Timeout::Never,
)
.await
.unwrap();

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}

fn get_timestamp_ms() -> u64 {
let now = SystemTime::now();
let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");

since_the_epoch.as_millis() as u64
}
46 changes: 46 additions & 0 deletions examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion_expr::{col, max, min};

use df_streams_core::context::Context;
use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use df_streams_core::physical_plan::utils::time::TimestampUnit;

#[tokio::main]
async fn main() -> Result<()> {
let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event)?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?;

ds.clone().print_stream().await?;

Ok(())
}
Loading

0 comments on commit eaaf07c

Please sign in to comment.