diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 37e2872..553727e 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -15,8 +15,8 @@ use metrics::counter; use time::OffsetDateTime; use tracing::instrument; -use crate::billing_limits::QuotaResource; use crate::event::{Compression, ProcessingContext}; +use crate::limits::billing_limits::QuotaResource; use crate::prometheus::report_dropped_events; use crate::token::validate_token; use crate::{ diff --git a/capture/src/lib.rs b/capture/src/lib.rs index eea915c..a2d1cbb 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -1,10 +1,10 @@ pub mod api; -pub mod billing_limits; + pub mod capture; pub mod config; pub mod event; pub mod health; -pub mod partition_limits; +pub mod limits; pub mod prometheus; pub mod redis; pub mod router; diff --git a/capture/src/billing_limits.rs b/capture/src/limits/billing_limits.rs similarity index 99% rename from capture/src/billing_limits.rs rename to capture/src/limits/billing_limits.rs index 9fa0fdd..973d037 100644 --- a/capture/src/billing_limits.rs +++ b/capture/src/limits/billing_limits.rs @@ -166,7 +166,7 @@ mod tests { use time::Duration; use crate::{ - billing_limits::{BillingLimiter, QuotaResource}, + limits::billing_limits::{BillingLimiter, QuotaResource}, redis::MockRedisClient, }; diff --git a/capture/src/limits/mod.rs b/capture/src/limits/mod.rs new file mode 100644 index 0000000..47f32d5 --- /dev/null +++ b/capture/src/limits/mod.rs @@ -0,0 +1,2 @@ +pub mod billing_limits; +pub mod partition_limits; diff --git a/capture/src/partition_limits.rs b/capture/src/limits/partition_limits.rs similarity index 98% rename from capture/src/partition_limits.rs rename to capture/src/limits/partition_limits.rs index 3866657..71808d0 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/limits/partition_limits.rs @@ -42,7 +42,7 @@ impl PartitionLimiter { #[cfg(test)] mod tests { - use crate::partition_limits::PartitionLimiter; + use crate::limits::partition_limits::PartitionLimiter; use std::num::NonZeroU32; #[tokio::test] diff --git a/capture/src/router.rs b/capture/src/router.rs index 6f2f044..11f84d1 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -9,10 +9,13 @@ use axum::{ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; +use crate::capture; use crate::health::HealthRegistry; -use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource}; - +use crate::limits::billing_limits::BillingLimiter; use crate::prometheus::{setup_metrics_recorder, track_metrics}; +use crate::redis::Client; +use crate::sink; +use crate::time::TimeSource; #[derive(Clone)] pub struct State { diff --git a/capture/src/server.rs b/capture/src/server.rs index 32bafa8..36aa76b 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -4,12 +4,14 @@ use std::sync::Arc; use time::Duration; -use crate::billing_limits::BillingLimiter; use crate::config::Config; use crate::health::{ComponentStatus, HealthRegistry}; -use crate::partition_limits::PartitionLimiter; +use crate::limits::billing_limits::BillingLimiter; +use crate::limits::partition_limits::PartitionLimiter; use crate::redis::RedisClient; -use crate::{router, sink}; +use crate::router; +use crate::sink::kafka::KafkaSink; +use crate::sink::print; pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where @@ -34,7 +36,7 @@ where router::router( crate::time::SystemTime {}, liveness, - sink::PrintSink {}, + print::PrintSink {}, redis_client, billing, config.export_prometheus, @@ -49,7 +51,7 @@ where config.burst_limit, config.overflow_forced_keys, ); - let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) + let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); router::router( diff --git a/capture/src/sink.rs b/capture/src/sink/kafka.rs similarity index 92% rename from capture/src/sink.rs rename to capture/src/sink/kafka.rs index af83e20..8f367d7 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink/kafka.rs @@ -2,11 +2,10 @@ use std::time::Duration; use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; -use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; -use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; -use rdkafka::producer::{DeliveryFuture, Producer}; +use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; +use rdkafka::ClientConfig; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; @@ -15,38 +14,9 @@ use crate::api::CaptureError; use crate::config::KafkaConfig; use crate::event::ProcessedEvent; use crate::health::HealthHandle; -use crate::partition_limits::PartitionLimiter; +use crate::limits::partition_limits::PartitionLimiter; use crate::prometheus::report_dropped_events; - -#[async_trait] -pub trait EventSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; -} - -pub struct PrintSink {} - -#[async_trait] -impl EventSink for PrintSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - info!("single event: {:?}", event); - counter!("capture_events_ingested_total", 1); - - Ok(()) - } - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { - let span = tracing::span!(tracing::Level::INFO, "batch of events"); - let _enter = span.enter(); - - histogram!("capture_event_batch_size", events.len() as f64); - counter!("capture_events_ingested_total", events.len() as u64); - for event in events { - info!("event: {:?}", event); - } - - Ok(()) - } -} +use crate::sink::EventSink; struct KafkaContext { liveness: HealthHandle, @@ -294,8 +264,9 @@ mod tests { use crate::config; use crate::event::ProcessedEvent; use crate::health::HealthRegistry; - use crate::partition_limits::PartitionLimiter; - use crate::sink::{EventSink, KafkaSink}; + use crate::limits::partition_limits::PartitionLimiter; + use crate::sink::kafka::KafkaSink; + use crate::sink::EventSink; use crate::utils::uuid_v7; use rand::distributions::Alphanumeric; use rand::Rng; diff --git a/capture/src/sink/mod.rs b/capture/src/sink/mod.rs new file mode 100644 index 0000000..7183250 --- /dev/null +++ b/capture/src/sink/mod.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; + +pub mod kafka; +pub mod print; + +#[async_trait] +pub trait EventSink { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; +} diff --git a/capture/src/sink/print.rs b/capture/src/sink/print.rs new file mode 100644 index 0000000..449d00c --- /dev/null +++ b/capture/src/sink/print.rs @@ -0,0 +1,31 @@ +use async_trait::async_trait; +use metrics::{counter, histogram}; +use tracing::log::info; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; +use crate::sink::EventSink; + +pub struct PrintSink {} + +#[async_trait] +impl EventSink for PrintSink { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { + info!("single event: {:?}", event); + counter!("capture_events_ingested_total", 1); + + Ok(()) + } + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { + let span = tracing::span!(tracing::Level::INFO, "batch of events"); + let _enter = span.enter(); + + histogram!("capture_event_batch_size", events.len() as f64); + counter!("capture_events_ingested_total", events.len() as u64); + for event in events { + info!("event: {:?}", event); + } + + Ok(()) + } +} diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index d1d075b..a7791f2 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -5,9 +5,9 @@ use axum_test_helper::TestClient; use base64::engine::general_purpose; use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; -use capture::billing_limits::BillingLimiter; use capture::event::ProcessedEvent; use capture::health::HealthRegistry; +use capture::limits::billing_limits::BillingLimiter; use capture::redis::MockRedisClient; use capture::router::router; use capture::sink::EventSink;