Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
carve out the sink and limits sub-modules
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Dec 12, 2023
1 parent 332c3d5 commit 6f33e64
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 49 deletions.
2 changes: 1 addition & 1 deletion capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use metrics::counter;
use time::OffsetDateTime;
use tracing::instrument;

use crate::billing_limits::QuotaResource;
use crate::event::{Compression, ProcessingContext};
use crate::limits::billing_limits::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
Expand Down
4 changes: 2 additions & 2 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
pub mod api;
pub mod billing_limits;

pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod limits;
pub mod prometheus;
pub mod redis;
pub mod router;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
use time::Duration;

use crate::{
billing_limits::{BillingLimiter, QuotaResource},
limits::billing_limits::{BillingLimiter, QuotaResource},
redis::MockRedisClient,
};

Expand Down
2 changes: 2 additions & 0 deletions capture/src/limits/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod billing_limits;
pub mod partition_limits;
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl PartitionLimiter {

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use crate::limits::partition_limits::PartitionLimiter;
use std::num::NonZeroU32;

#[tokio::test]
Expand Down
7 changes: 5 additions & 2 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use axum::{
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::capture;
use crate::health::HealthRegistry;
use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource};

use crate::limits::billing_limits::BillingLimiter;
use crate::prometheus::{setup_metrics_recorder, track_metrics};
use crate::redis::Client;
use crate::sink;
use crate::time::TimeSource;

#[derive(Clone)]
pub struct State {
Expand Down
12 changes: 7 additions & 5 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::sync::Arc;

use time::Duration;

use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::limits::billing_limits::BillingLimiter;
use crate::limits::partition_limits::PartitionLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};
use crate::router;
use crate::sink::kafka::KafkaSink;
use crate::sink::print;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -34,7 +36,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
sink::PrintSink {},
print::PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -49,7 +51,7 @@ where
config.burst_limit,
config.overflow_forced_keys,
);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down
43 changes: 7 additions & 36 deletions capture/src/sink.rs → capture/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Duration;

use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand All @@ -15,38 +14,9 @@ use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::partition_limits::PartitionLimiter;
use crate::limits::partition_limits::PartitionLimiter;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
use crate::sink::EventSink;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -294,8 +264,9 @@ mod tests {
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::limits::partition_limits::PartitionLimiter;
use crate::sink::kafka::KafkaSink;
use crate::sink::EventSink;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand Down
13 changes: 13 additions & 0 deletions capture/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;

pub mod kafka;
pub mod print;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
31 changes: 31 additions & 0 deletions capture/src/sink/print.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::sink::EventSink;

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use axum_test_helper::TestClient;
use base64::engine::general_purpose;
use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode};
use capture::billing_limits::BillingLimiter;
use capture::event::ProcessedEvent;
use capture::health::HealthRegistry;
use capture::limits::billing_limits::BillingLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sink::EventSink;
Expand Down

0 comments on commit 6f33e64

Please sign in to comment.