Skip to content

Commit

Permalink
Qname router (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip-NLnetLabs committed Oct 28, 2024
1 parent ec99e6b commit f13016e
Show file tree
Hide file tree
Showing 7 changed files with 780 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ required-features = ["zonefile", "net", "unstable-server-transport", "unstable-z
name = "ixfr-client"
required-features = ["zonefile", "net", "unstable-client-transport", "unstable-zonetree"]

[[example]]
name = "query-routing"
required-features = ["net", "unstable-client-transport", "unstable-server-transport", "tracing-subscriber"]

# This example is commented out because it is difficult, if not impossible,
# when including the sqlx dependency, to make the dependency tree compatible
# with both `cargo +nightly update -Z minimal versions` and the crate minimum
Expand Down
109 changes: 109 additions & 0 deletions examples/query-routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use domain::base::Name;
use domain::net::client::protocol::{TcpConnect, UdpConnect};
use domain::net::client::request::RequestMessage;
use domain::net::client::{dgram_stream, redundant};
use domain::net::server::adapter::{
ClientTransportToSingleService, SingleServiceToService,
};
use domain::net::server::buf::VecBufSource;
use domain::net::server::dgram::DgramServer;
use domain::net::server::middleware::mandatory::MandatoryMiddlewareSvc;
use domain::net::server::qname_router::QnameRouter;
use domain::net::server::single_service::ReplyMessage;
use domain::net::server::stream::StreamServer;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc;
use std::vec::Vec;
use tokio::net::{TcpSocket, UdpSocket};
use tracing_subscriber::EnvFilter;

//----------- main() ---------------------------------------------------------

#[tokio::main(flavor = "multi_thread")]
async fn main() {
eprintln!("Test with commands such as:");
eprintln!(" dnsi query --server ::1 -p 8053 ietf.org");
eprintln!(" dnsi query --server ::1 -p 8053 nlnetlabs.nl");
eprintln!(" dnsi query --server ::1 -p 8053 google.com");
eprintln!("Enable tracing with 'RUST_LOG=trace' before the command");

// -----------------------------------------------------------------------
// Setup logging. You can override the log level by setting environment
// variable RUST_LOG, e.g. RUST_LOG=trace.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_thread_ids(true)
.without_time()
.try_init()
.ok();

// Start building the query router plus upstreams.
let mut qr: QnameRouter<Vec<u8>, Vec<u8>, ReplyMessage> =
QnameRouter::new();

// Queries to the root go to 2606:4700:4700::1111 and 1.1.1.1.
let redun = example_redundant("2606:4700:4700::1111 ", "1.1.1.1").await;
let conn_service = ClientTransportToSingleService::new(redun);
qr.add(Name::root_slice(), conn_service);

// Queries to .com go to 2001:4860:4860::8888 and 8.8.8.8.
let redun = example_redundant("2001:4860:4860::8888", "8.8.8.8").await;
let conn_service = ClientTransportToSingleService::new(redun);
qr.add(Name::<Vec<u8>>::from_str("com").unwrap(), conn_service);

// Queries to .nl go to 2620:fe::9 and 9.9.9.9.
let redun = example_redundant("2620:fe::9", "9.9.9.9").await;
let conn_service = ClientTransportToSingleService::new(redun);
qr.add(Name::<Vec<u8>>::from_str("nl").unwrap(), conn_service);

let srv = SingleServiceToService::new(qr);
let srv = MandatoryMiddlewareSvc::<Vec<u8>, _, _>::new(srv);
let my_svc = Arc::new(srv);

let udpsocket = UdpSocket::bind("[::1]:8053").await.unwrap();
let buf = Arc::new(VecBufSource);
let srv = DgramServer::new(udpsocket, buf.clone(), my_svc.clone());
let udp_join_handle = tokio::spawn(async move { srv.run().await });

// -----------------------------------------------------------------------
// Run a DNS server on TCP port 8053 on ::1. Test it like so:
// dnsi query -t --server 127.0.0.1 -p 8053 google.com
let v6socket = TcpSocket::new_v6().unwrap();
v6socket.set_reuseaddr(true).unwrap();
v6socket.bind("[::1]:8053".parse().unwrap()).unwrap();
let v6listener = v6socket.listen(1024).unwrap();
let buf = Arc::new(VecBufSource);
let srv = StreamServer::new(v6listener, buf.clone(), my_svc.clone());
let tcp_join_handle = tokio::spawn(async move { srv.run().await });

// -----------------------------------------------------------------------
// Keep the services running in the background

udp_join_handle.await.unwrap();
tcp_join_handle.await.unwrap();
}

async fn example_redundant(
dst1: &str,
dst2: &str,
) -> redundant::Connection<RequestMessage<Vec<u8>>> {
let (redun, transport) = redundant::Connection::new();
tokio::spawn(transport.run());
let server_addr = SocketAddr::new(IpAddr::from_str(dst1).unwrap(), 53);
let udp_connect = UdpConnect::new(server_addr);
let tcp_connect = TcpConnect::new(server_addr);
let (conn, transport) =
dgram_stream::Connection::new(udp_connect, tcp_connect);
tokio::spawn(transport.run());
redun.add(Box::new(conn)).await.unwrap();
let server_addr = SocketAddr::new(IpAddr::from_str(dst2).unwrap(), 53);
let udp_connect = UdpConnect::new(server_addr);
let tcp_connect = TcpConnect::new(server_addr);
let (conn, transport) =
dgram_stream::Connection::new(udp_connect, tcp_connect);
tokio::spawn(transport.run());
redun.add(Box::new(conn)).await.unwrap();

redun
}
245 changes: 245 additions & 0 deletions src/net/server/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
//! Service adapters.
//!
//! This module defines three adapters for [SingleService]. The first,
//! [ClientTransportToSingleService] implements [SingleService] for a
//! client transport ([SendRequest]).
//! The second one, [BoxClientTransportToSingleService],
//! implements [Service] for a boxed trait object of [SendRequest].
//! The third one, [SingleServiceToService] implements [Service] for
//! [SingleService].
#![warn(missing_docs)]
#![warn(clippy::missing_docs_in_private_items)]

use super::message::Request;
use super::service::{CallResult, Service, ServiceError, ServiceResult};
use super::single_service::{ComposeReply, SingleService};
use super::util::mk_error_response;
use crate::base::iana::{ExtendedErrorCode, OptRcode};
use crate::base::message_builder::AdditionalBuilder;
use crate::base::opt::ExtendedError;
use crate::base::StreamTarget;
use crate::dep::octseq::Octets;
use crate::net::client::request::{RequestMessage, SendRequest};
use futures_util::stream::{once, Once};
use std::boxed::Box;
use std::fmt::Debug;
use std::future::{ready, Future, Ready};
use std::marker::PhantomData;
use std::pin::Pin;
use std::string::ToString;
use std::vec::Vec;

/// Provide a [Service] trait for an object that implements [SingleService].
pub struct SingleServiceToService<RequestOcts, SVC, CR> {
/// Service that is wrapped by this object.
service: SVC,

/// Phantom field for RequestOcts and CR.
_phantom: PhantomData<(RequestOcts, CR)>,
}

impl<RequestOcts, SVC, CR> SingleServiceToService<RequestOcts, SVC, CR> {
/// Create a new [SingleServiceToService] object.
pub fn new(service: SVC) -> Self {
Self {
service,
_phantom: PhantomData,
}
}
}

impl<RequestOcts, SVC, CR> Service<RequestOcts>
for SingleServiceToService<RequestOcts, SVC, CR>
where
RequestOcts: Octets + Send + Sync,
SVC: SingleService<RequestOcts, CR>,
CR: ComposeReply + 'static,
{
type Target = Vec<u8>;
type Stream = Once<Ready<ServiceResult<Self::Target>>>;
type Future = Pin<Box<dyn Future<Output = Self::Stream> + Send>>;

fn call(&self, request: Request<RequestOcts>) -> Self::Future {
let fut = self.service.call(request);
let fut = async move {
let reply = match fut.await {
Ok(reply) => reply,
Err(_) => {
// Every error gets mapped to InternalError.
// Should we add an EDE here?
return once(ready(Err(ServiceError::InternalError)));
}
};
let abs = match reply.additional_builder_stream_target() {
Ok(reply) => reply,
Err(_) => {
// Every error gets mapped to InternalError.
// There is probably not much we could do here.
// The error results from a bad reply message.
return once(ready(Err(ServiceError::InternalError)));
}
};
once(ready(Ok(CallResult::new(abs))))
};
Box::pin(fut)
}
}

/// Provide a [SingleService] trait for an object that implements the
/// [SendRequest] trait.
pub struct ClientTransportToSingleService<SR, RequestOcts>
where
RequestOcts: AsRef<[u8]>,
SR: SendRequest<RequestMessage<RequestOcts>>,
{
/// The client transport to use.
conn: SR,

/// Phantom data for RequestOcts.
_phantom: PhantomData<RequestOcts>,
}

impl<SR, RequestOcts> ClientTransportToSingleService<SR, RequestOcts>
where
RequestOcts: AsRef<[u8]>,
SR: SendRequest<RequestMessage<RequestOcts>>,
{
/// Create a new [ClientTransportToSingleService] object.
pub fn new(conn: SR) -> Self {
Self {
conn,
_phantom: PhantomData,
}
}
}

impl<SR, RequestOcts, CR> SingleService<RequestOcts, CR>
for ClientTransportToSingleService<SR, RequestOcts>
where
RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync,
SR: SendRequest<RequestMessage<RequestOcts>> + Sync,
CR: ComposeReply + Send + Sync + 'static,
{
fn call(
&self,
request: Request<RequestOcts>,
) -> Pin<Box<dyn Future<Output = Result<CR, ServiceError>> + Send + Sync>>
where
RequestOcts: AsRef<[u8]>,
{
// Prepare for an error. It is best to borrow request here.
let builder: AdditionalBuilder<StreamTarget<Vec<u8>>> =
mk_error_response(request.message(), OptRcode::SERVFAIL);

let req = match request.try_into() {
Ok(req) => req,
Err(_) => {
// Can this fail? Should the request be checked earlier.
// Just return ServFail.
return Box::pin(ready(Err(ServiceError::InternalError)));
}
};

let mut gr = self.conn.send_request(req);
let fut = async move {
match gr.get_response().await {
Ok(msg) => CR::from_message(&msg),
Err(e) => {
// The request failed. Create a ServFail response and
// add an EDE that describes the error.
let msg = builder.as_message();
let mut cr = CR::from_message(&msg).expect(
"CR should be able to handle an error response",
);
if let Ok(ede) = ExtendedError::<Vec<u8>>::new_with_str(
ExtendedErrorCode::OTHER,
&e.to_string(),
) {
cr.add_opt(&ede)
.expect("Adding an ede should not fail");
}
Ok(cr)
}
}
};
Box::pin(fut)
}
}

/// Implement the [SingleService] trait for a boxed [SendRequest] trait object.
pub struct BoxClientTransportToSingleService<RequestOcts>
where
RequestOcts: AsRef<[u8]>,
{
/// The client transport to use.
conn: Box<dyn SendRequest<RequestMessage<RequestOcts>> + Send + Sync>,

/// Phantom data for RequestOcts.
_phantom: PhantomData<RequestOcts>,
}

impl<RequestOcts> BoxClientTransportToSingleService<RequestOcts>
where
RequestOcts: AsRef<[u8]>,
{
/// Create a new [BoxClientTransportToSingleService] object.
pub fn new(
conn: Box<dyn SendRequest<RequestMessage<RequestOcts>> + Send + Sync>,
) -> Self {
Self {
conn,
_phantom: PhantomData,
}
}
}

impl<RequestOcts, CR> SingleService<RequestOcts, CR>
for BoxClientTransportToSingleService<RequestOcts>
where
RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync,
CR: ComposeReply + Send + Sync + 'static,
{
fn call(
&self,
request: Request<RequestOcts>,
) -> Pin<Box<dyn Future<Output = Result<CR, ServiceError>> + Send + Sync>>
where
RequestOcts: AsRef<[u8]>,
{
// Prepare for an error. It is best to borrow request here.
let builder: AdditionalBuilder<StreamTarget<Vec<u8>>> =
mk_error_response(request.message(), OptRcode::SERVFAIL);

let Ok(req) = request.try_into() else {
// Can this fail? Should the request be checked earlier.
// Just return ServFail.
return Box::pin(ready(Err(ServiceError::InternalError)));
};

let mut gr = self.conn.send_request(req);
let fut = async move {
let msg = match gr.get_response().await {
Ok(msg) => msg,
Err(e) => {
// The request failed. Create a ServFail response and
// add an EDE that describes the error.
let msg = builder.as_message();
let mut cr = CR::from_message(&msg).expect(
"CR should be able to handle an error response",
);
if let Ok(ede) = ExtendedError::<Vec<u8>>::new_with_str(
ExtendedErrorCode::OTHER,
&e.to_string(),
) {
cr.add_opt(&ede)
.expect("Adding an ede should not fail");
}
return Ok(cr);
}
};
CR::from_message(&msg)
};
Box::pin(fut)
}
}
Loading

0 comments on commit f13016e

Please sign in to comment.