Skip to content

Commit

Permalink
allow wildcard topics for routeviews kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
digizeph committed Oct 27, 2024
1 parent cfaa312 commit abf103e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ name = "bench_main"
harness = false

[dev-dependencies]
regex = "1"
anyhow = "1"
bgpkit-broker = "0.7.0-beta.5"
kafka = "0.10.0"
Expand Down
39 changes: 33 additions & 6 deletions examples/real-time-routeviews-kafka-openbmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,41 @@ use bgpkit_parser::parser::bmp::messages::BmpMessageBody;
use bgpkit_parser::Elementor;
pub use bgpkit_parser::{parse_bmp_msg, parse_openbmp_header};
use bytes::Bytes;
use kafka::client::KafkaClient;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafka::error::Error as KafkaError;
use log::{error, info};
use std::thread::sleep;
use std::time::Duration;

fn consume_and_print(group: String, topic: String, brokers: Vec<String>) -> Result<(), KafkaError> {
let mut con = Consumer::from_hosts(brokers)
.with_topic(topic)
fn get_matching_topics(client: &mut KafkaClient, pattern: &str) -> Vec<String> {
let re = regex::Regex::new(pattern).expect("Invalid regex pattern");
client.load_metadata_all().expect("Failed to load metadata");
client
.topics()
.iter()
.filter(|t| re.is_match(t.name())) // Adjust pattern matching as necessary
.map(|t| t.name().to_string())
.collect()
}

fn consume_and_print(
group: String,
pattern: String,
brokers: Vec<String>,
) -> Result<(), KafkaError> {
let mut client = KafkaClient::new(brokers);
client
.load_metadata_all()
.expect("Failed to connect to Kafka");
let topics = get_matching_topics(&mut client, pattern.as_str());
dbg!(&topics);

let mut builder = Consumer::from_client(client);
for topic in topics {
builder = builder.with_topic(topic);
}
let mut con = builder
.with_group(group)
.with_fetch_max_bytes_per_partition(100_000)
.with_retry_max_bytes_limit(1_000_000)
Expand Down Expand Up @@ -59,11 +85,12 @@ fn consume_and_print(group: String, topic: String, brokers: Vec<String>) -> Resu
}

pub fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
tracing_subscriber::fmt::init();

let broker = "stream.routeviews.org:9092".to_owned();
let topic = "routeviews.amsix.34968.bmp_raw".to_owned();
// "routeviews.amsix.61955.bmp_raw"
let pattern = r#"routeviews\.amsix\..*\.bmp_raw"#.to_owned();
let group = "bgpkit-parser-example".to_owned();

consume_and_print(group, topic, vec![broker]).unwrap();
consume_and_print(group, pattern, vec![broker]).unwrap();
}

0 comments on commit abf103e

Please sign in to comment.