From f13016e299f35bd4ceca0bba85312eba6961dba6 Mon Sep 17 00:00:00 2001 From: Philip Homburg Date: Mon, 28 Oct 2024 13:51:57 +0100 Subject: [PATCH] Qname router (#353) --- Cargo.toml | 4 + examples/query-routing.rs | 109 ++++++++++++++ src/net/server/adapter.rs | 245 +++++++++++++++++++++++++++++++ src/net/server/message.rs | 67 ++++++++- src/net/server/mod.rs | 52 +++++++ src/net/server/qname_router.rs | 119 +++++++++++++++ src/net/server/single_service.rs | 185 +++++++++++++++++++++++ 7 files changed, 780 insertions(+), 1 deletion(-) create mode 100644 examples/query-routing.rs create mode 100644 src/net/server/adapter.rs create mode 100644 src/net/server/qname_router.rs create mode 100644 src/net/server/single_service.rs diff --git a/Cargo.toml b/Cargo.toml index a9b938811..b1bda94ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,10 @@ required-features = ["zonefile", "net", "unstable-server-transport", "unstable-z name = "ixfr-client" required-features = ["zonefile", "net", "unstable-client-transport", "unstable-zonetree"] +[[example]] +name = "query-routing" +required-features = ["net", "unstable-client-transport", "unstable-server-transport", "tracing-subscriber"] + # This example is commented out because it is difficult, if not impossible, # when including the sqlx dependency, to make the dependency tree compatible # with both `cargo +nightly update -Z minimal versions` and the crate minimum diff --git a/examples/query-routing.rs b/examples/query-routing.rs new file mode 100644 index 000000000..a0b829133 --- /dev/null +++ b/examples/query-routing.rs @@ -0,0 +1,109 @@ +use domain::base::Name; +use domain::net::client::protocol::{TcpConnect, UdpConnect}; +use domain::net::client::request::RequestMessage; +use domain::net::client::{dgram_stream, redundant}; +use domain::net::server::adapter::{ + ClientTransportToSingleService, SingleServiceToService, +}; +use domain::net::server::buf::VecBufSource; +use domain::net::server::dgram::DgramServer; +use domain::net::server::middleware::mandatory::MandatoryMiddlewareSvc; +use domain::net::server::qname_router::QnameRouter; +use domain::net::server::single_service::ReplyMessage; +use domain::net::server::stream::StreamServer; +use std::net::{IpAddr, SocketAddr}; +use std::str::FromStr; +use std::sync::Arc; +use std::vec::Vec; +use tokio::net::{TcpSocket, UdpSocket}; +use tracing_subscriber::EnvFilter; + +//----------- main() --------------------------------------------------------- + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + eprintln!("Test with commands such as:"); + eprintln!(" dnsi query --server ::1 -p 8053 ietf.org"); + eprintln!(" dnsi query --server ::1 -p 8053 nlnetlabs.nl"); + eprintln!(" dnsi query --server ::1 -p 8053 google.com"); + eprintln!("Enable tracing with 'RUST_LOG=trace' before the command"); + + // ----------------------------------------------------------------------- + // Setup logging. You can override the log level by setting environment + // variable RUST_LOG, e.g. RUST_LOG=trace. + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_thread_ids(true) + .without_time() + .try_init() + .ok(); + + // Start building the query router plus upstreams. + let mut qr: QnameRouter, Vec, ReplyMessage> = + QnameRouter::new(); + + // Queries to the root go to 2606:4700:4700::1111 and 1.1.1.1. + let redun = example_redundant("2606:4700:4700::1111 ", "1.1.1.1").await; + let conn_service = ClientTransportToSingleService::new(redun); + qr.add(Name::root_slice(), conn_service); + + // Queries to .com go to 2001:4860:4860::8888 and 8.8.8.8. + let redun = example_redundant("2001:4860:4860::8888", "8.8.8.8").await; + let conn_service = ClientTransportToSingleService::new(redun); + qr.add(Name::>::from_str("com").unwrap(), conn_service); + + // Queries to .nl go to 2620:fe::9 and 9.9.9.9. + let redun = example_redundant("2620:fe::9", "9.9.9.9").await; + let conn_service = ClientTransportToSingleService::new(redun); + qr.add(Name::>::from_str("nl").unwrap(), conn_service); + + let srv = SingleServiceToService::new(qr); + let srv = MandatoryMiddlewareSvc::, _, _>::new(srv); + let my_svc = Arc::new(srv); + + let udpsocket = UdpSocket::bind("[::1]:8053").await.unwrap(); + let buf = Arc::new(VecBufSource); + let srv = DgramServer::new(udpsocket, buf.clone(), my_svc.clone()); + let udp_join_handle = tokio::spawn(async move { srv.run().await }); + + // ----------------------------------------------------------------------- + // Run a DNS server on TCP port 8053 on ::1. Test it like so: + // dnsi query -t --server 127.0.0.1 -p 8053 google.com + let v6socket = TcpSocket::new_v6().unwrap(); + v6socket.set_reuseaddr(true).unwrap(); + v6socket.bind("[::1]:8053".parse().unwrap()).unwrap(); + let v6listener = v6socket.listen(1024).unwrap(); + let buf = Arc::new(VecBufSource); + let srv = StreamServer::new(v6listener, buf.clone(), my_svc.clone()); + let tcp_join_handle = tokio::spawn(async move { srv.run().await }); + + // ----------------------------------------------------------------------- + // Keep the services running in the background + + udp_join_handle.await.unwrap(); + tcp_join_handle.await.unwrap(); +} + +async fn example_redundant( + dst1: &str, + dst2: &str, +) -> redundant::Connection>> { + let (redun, transport) = redundant::Connection::new(); + tokio::spawn(transport.run()); + let server_addr = SocketAddr::new(IpAddr::from_str(dst1).unwrap(), 53); + let udp_connect = UdpConnect::new(server_addr); + let tcp_connect = TcpConnect::new(server_addr); + let (conn, transport) = + dgram_stream::Connection::new(udp_connect, tcp_connect); + tokio::spawn(transport.run()); + redun.add(Box::new(conn)).await.unwrap(); + let server_addr = SocketAddr::new(IpAddr::from_str(dst2).unwrap(), 53); + let udp_connect = UdpConnect::new(server_addr); + let tcp_connect = TcpConnect::new(server_addr); + let (conn, transport) = + dgram_stream::Connection::new(udp_connect, tcp_connect); + tokio::spawn(transport.run()); + redun.add(Box::new(conn)).await.unwrap(); + + redun +} diff --git a/src/net/server/adapter.rs b/src/net/server/adapter.rs new file mode 100644 index 000000000..a28c1d80d --- /dev/null +++ b/src/net/server/adapter.rs @@ -0,0 +1,245 @@ +//! Service adapters. +//! +//! This module defines three adapters for [SingleService]. The first, +//! [ClientTransportToSingleService] implements [SingleService] for a +//! client transport ([SendRequest]). +//! The second one, [BoxClientTransportToSingleService], +//! implements [Service] for a boxed trait object of [SendRequest]. +//! The third one, [SingleServiceToService] implements [Service] for +//! [SingleService]. + +#![warn(missing_docs)] +#![warn(clippy::missing_docs_in_private_items)] + +use super::message::Request; +use super::service::{CallResult, Service, ServiceError, ServiceResult}; +use super::single_service::{ComposeReply, SingleService}; +use super::util::mk_error_response; +use crate::base::iana::{ExtendedErrorCode, OptRcode}; +use crate::base::message_builder::AdditionalBuilder; +use crate::base::opt::ExtendedError; +use crate::base::StreamTarget; +use crate::dep::octseq::Octets; +use crate::net::client::request::{RequestMessage, SendRequest}; +use futures_util::stream::{once, Once}; +use std::boxed::Box; +use std::fmt::Debug; +use std::future::{ready, Future, Ready}; +use std::marker::PhantomData; +use std::pin::Pin; +use std::string::ToString; +use std::vec::Vec; + +/// Provide a [Service] trait for an object that implements [SingleService]. +pub struct SingleServiceToService { + /// Service that is wrapped by this object. + service: SVC, + + /// Phantom field for RequestOcts and CR. + _phantom: PhantomData<(RequestOcts, CR)>, +} + +impl SingleServiceToService { + /// Create a new [SingleServiceToService] object. + pub fn new(service: SVC) -> Self { + Self { + service, + _phantom: PhantomData, + } + } +} + +impl Service + for SingleServiceToService +where + RequestOcts: Octets + Send + Sync, + SVC: SingleService, + CR: ComposeReply + 'static, +{ + type Target = Vec; + type Stream = Once>>; + type Future = Pin + Send>>; + + fn call(&self, request: Request) -> Self::Future { + let fut = self.service.call(request); + let fut = async move { + let reply = match fut.await { + Ok(reply) => reply, + Err(_) => { + // Every error gets mapped to InternalError. + // Should we add an EDE here? + return once(ready(Err(ServiceError::InternalError))); + } + }; + let abs = match reply.additional_builder_stream_target() { + Ok(reply) => reply, + Err(_) => { + // Every error gets mapped to InternalError. + // There is probably not much we could do here. + // The error results from a bad reply message. + return once(ready(Err(ServiceError::InternalError))); + } + }; + once(ready(Ok(CallResult::new(abs)))) + }; + Box::pin(fut) + } +} + +/// Provide a [SingleService] trait for an object that implements the +/// [SendRequest] trait. +pub struct ClientTransportToSingleService +where + RequestOcts: AsRef<[u8]>, + SR: SendRequest>, +{ + /// The client transport to use. + conn: SR, + + /// Phantom data for RequestOcts. + _phantom: PhantomData, +} + +impl ClientTransportToSingleService +where + RequestOcts: AsRef<[u8]>, + SR: SendRequest>, +{ + /// Create a new [ClientTransportToSingleService] object. + pub fn new(conn: SR) -> Self { + Self { + conn, + _phantom: PhantomData, + } + } +} + +impl SingleService + for ClientTransportToSingleService +where + RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, + SR: SendRequest> + Sync, + CR: ComposeReply + Send + Sync + 'static, +{ + fn call( + &self, + request: Request, + ) -> Pin> + Send + Sync>> + where + RequestOcts: AsRef<[u8]>, + { + // Prepare for an error. It is best to borrow request here. + let builder: AdditionalBuilder>> = + mk_error_response(request.message(), OptRcode::SERVFAIL); + + let req = match request.try_into() { + Ok(req) => req, + Err(_) => { + // Can this fail? Should the request be checked earlier. + // Just return ServFail. + return Box::pin(ready(Err(ServiceError::InternalError))); + } + }; + + let mut gr = self.conn.send_request(req); + let fut = async move { + match gr.get_response().await { + Ok(msg) => CR::from_message(&msg), + Err(e) => { + // The request failed. Create a ServFail response and + // add an EDE that describes the error. + let msg = builder.as_message(); + let mut cr = CR::from_message(&msg).expect( + "CR should be able to handle an error response", + ); + if let Ok(ede) = ExtendedError::>::new_with_str( + ExtendedErrorCode::OTHER, + &e.to_string(), + ) { + cr.add_opt(&ede) + .expect("Adding an ede should not fail"); + } + Ok(cr) + } + } + }; + Box::pin(fut) + } +} + +/// Implement the [SingleService] trait for a boxed [SendRequest] trait object. +pub struct BoxClientTransportToSingleService +where + RequestOcts: AsRef<[u8]>, +{ + /// The client transport to use. + conn: Box> + Send + Sync>, + + /// Phantom data for RequestOcts. + _phantom: PhantomData, +} + +impl BoxClientTransportToSingleService +where + RequestOcts: AsRef<[u8]>, +{ + /// Create a new [BoxClientTransportToSingleService] object. + pub fn new( + conn: Box> + Send + Sync>, + ) -> Self { + Self { + conn, + _phantom: PhantomData, + } + } +} + +impl SingleService + for BoxClientTransportToSingleService +where + RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, + CR: ComposeReply + Send + Sync + 'static, +{ + fn call( + &self, + request: Request, + ) -> Pin> + Send + Sync>> + where + RequestOcts: AsRef<[u8]>, + { + // Prepare for an error. It is best to borrow request here. + let builder: AdditionalBuilder>> = + mk_error_response(request.message(), OptRcode::SERVFAIL); + + let Ok(req) = request.try_into() else { + // Can this fail? Should the request be checked earlier. + // Just return ServFail. + return Box::pin(ready(Err(ServiceError::InternalError))); + }; + + let mut gr = self.conn.send_request(req); + let fut = async move { + let msg = match gr.get_response().await { + Ok(msg) => msg, + Err(e) => { + // The request failed. Create a ServFail response and + // add an EDE that describes the error. + let msg = builder.as_message(); + let mut cr = CR::from_message(&msg).expect( + "CR should be able to handle an error response", + ); + if let Ok(ede) = ExtendedError::>::new_with_str( + ExtendedErrorCode::OTHER, + &e.to_string(), + ) { + cr.add_opt(&ede) + .expect("Adding an ede should not fail"); + } + return Ok(cr); + } + }; + CR::from_message(&msg) + }; + Box::pin(fut) + } +} diff --git a/src/net/server/message.rs b/src/net/server/message.rs index 1ac11e3b1..197ab0718 100644 --- a/src/net/server/message.rs +++ b/src/net/server/message.rs @@ -1,11 +1,22 @@ //! Support for working with DNS messages in servers. + +#![warn(missing_docs)] +#![warn(clippy::missing_docs_in_private_items)] + +use bytes::Bytes; use core::time::Duration; +use std::fmt::Debug; use std::sync::{Arc, Mutex}; +use std::vec::Vec; use tokio::time::Instant; -use crate::base::Message; +use crate::base::opt::AllOptData; +use crate::base::{Message, Name}; +use crate::dep::octseq::Octets; +use crate::net::client::request; +use crate::net::client::request::{ComposeRequest, RequestMessage}; //------------ UdpTransportContext ------------------------------------------- @@ -284,3 +295,57 @@ where } } } + +//--- TryFrom> for RequestMessage> + +impl TryFrom> + for RequestMessage +{ + type Error = request::Error; + + fn try_from(req: Request) -> Result { + // Copy the ECS option from the message. This is just an example, + // there should be a separate plugin that deals with ECS. + + // We want the ECS options in Bytes. No clue how to do this. Just + // convert the message to Bytes and use that. + let mut extra_opts: Vec>> = vec![]; + + let bytes = Bytes::copy_from_slice(req.message.as_slice()); + let bytes_msg = Message::from_octets(bytes)?; + if let Some(optrec) = bytes_msg.opt() { + for opt in optrec.opt().iter::>() { + let opt = opt?; + if let AllOptData::ClientSubnet(_ecs) = opt { + extra_opts.push(opt); + } + } + } + + // We need to make a copy of message. Somehow we can't use the + // message in the Arc directly. + let set_do = dnssec_ok(&req.message); + let msg = Message::from_octets(req.message.as_octets().clone())?; + let mut reqmsg = RequestMessage::new(msg)?; + + // Copy DO bit + if set_do { + reqmsg.set_dnssec_ok(true); + } + + // Copy options + for opt in &extra_opts { + reqmsg.add_opt(opt)?; + } + Ok(reqmsg) + } +} + +/// Return whether the DO flag is set. This should move to Message. +fn dnssec_ok(msg: &Message) -> bool { + if let Some(opt) = msg.opt() { + opt.dnssec_ok() + } else { + false + } +} diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index 3842e3f6c..0cf38e232 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -13,6 +13,15 @@ //! DNS server that answers requests based on the application logic you //! specify. //! +//! In addtion, this module provides a less complex service interface called +//! [SingleService][`single_service::SingleService`]. This interface supports +//! only a single response per request. +//! In other words, it does not support the AXFR and IXFR requests. +//! Adaptors are available to connect SingleServer to [`Service`] and to the +//! [Client][`crate::net::client`] transports. See the +//! Section [Single Service][crate::net::server#single-service] for +//! more details. +//! //! # Architecture //! //! A layered stack of components is responsible for handling incoming @@ -217,6 +226,46 @@ //! https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html //! [`tokio::net::UdpSocket`]: //! https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html +//! +//! # Single Service +//! +//! The [SingleService][single_service::SingleService] trait has a single +//! method [call()][single_service::SingleService::call()] that takes a +//! [Request][message::Request] and returns a Future that results in +//! either an error or a reply. +//! To assist building reply messages there is the trait +//! [ComposeReply][single_service::ComposeReply]. +//! The [ComposeReply][single_service::ComposeReply] trait is implemented by +//! [ReplyMessage][single_service::ReplyMessage] +//! +//! To assist in connecting [SingleService][single_service::SingleService] +//! to the rest of the ecosystem, there are three adapters: +//! 1) The first adapter, +//! [SingleServiceToService][adapter::SingleServiceToService] implements +//! [Service][service::Service] for +//! [SingleService][single_service::SingleService]. This allows any +//! object that implements [SingleService][single_service::SingleService] +//! to connect to a place where [Service][service::Service] is required. +//! 2) The second adapter, +//! [ClientTransportToSingleService][adapter::ClientTransportToSingleService] +//! implements [SingleService][single_service::SingleService] for an +//! object that implements +//! [SendRequest][crate::net::client::request::SendRequest]. This +//! allows any [Client][crate::net::client] transport connection to be +//! used as a [SingleService][single_service::SingleService]. +//! 3) The third adapter, +//! [BoxClientTransportToSingleService][adapter::BoxClientTransportToSingleService] +//! is similar to the second one, except that it implements +//! [SingleService][single_service::SingleService] for a boxed +//! [SendRequest][crate::net::client::request::SendRequest] trait object. +//! +//! This module provides a simple query router called +//! [QnameRouter][qname_router::QnameRouter]. This router uses the query +//! name to decide with upstream [SingleService][single_service::SingleService] +//! has to handle the request. +//! This router is deliberately kept very simple. It is assumed that +//! applications that need more complex routers implement them themselves +//! in the application. #![cfg(feature = "unstable-server-transport")] #![cfg_attr(docsrs, doc(cfg(feature = "unstable-server-transport")))] @@ -224,6 +273,7 @@ mod connection; pub use connection::Config as ConnectionConfig; +pub mod adapter; pub mod batcher; pub mod buf; pub mod dgram; @@ -231,7 +281,9 @@ pub mod error; pub mod message; pub mod metrics; pub mod middleware; +pub mod qname_router; pub mod service; +pub mod single_service; pub mod sock; pub mod stream; pub mod util; diff --git a/src/net/server/qname_router.rs b/src/net/server/qname_router.rs new file mode 100644 index 000000000..749b8f7a5 --- /dev/null +++ b/src/net/server/qname_router.rs @@ -0,0 +1,119 @@ +//! This module provides an example query router using the Qname field. + +#![warn(missing_docs)] +#![warn(clippy::missing_docs_in_private_items)] + +use super::message::Request; +use super::service::ServiceError; +use super::single_service::{ComposeReply, SingleService}; +use super::util::mk_error_response; +use crate::base::iana::{ExtendedErrorCode, OptRcode}; +use crate::base::message_builder::AdditionalBuilder; +use crate::base::opt::ExtendedError; +use crate::base::StreamTarget; +use crate::base::{Name, ToName}; +use crate::dep::octseq::{EmptyBuilder, FromBuilder, Octets, OctetsBuilder}; +use std::boxed::Box; +use std::convert::Infallible; +use std::future::{ready, Future}; +use std::pin::Pin; +use std::vec::Vec; +use tracing::trace; + +/// A service that routes requests to other services based on the Qname in the +/// request. +pub struct QnameRouter { + /// List of names and services for routing requests. + list: Vec>, +} + +/// Element in the name space for the Qname router. +struct Element { + /// Name to match for this element. + name: Name, + + /// Service to call for this element. + service: Box + Send + Sync>, +} + +impl QnameRouter { + /// Create a new empty router. + pub fn new() -> Self { + Self { list: Vec::new() } + } + + /// Add a name and service to the router. + pub fn add(&mut self, name: TN, service: SVC) + where + Octs: FromBuilder, + ::Builder: + EmptyBuilder + OctetsBuilder, + TN: ToName, + RequestOcts: Send + Sync, + SVC: SingleService + Send + Sync + 'static, + { + let el = Element { + name: name.to_name(), + service: Box::new(service), + }; + self.list.push(el); + } +} + +impl Default for QnameRouter { + fn default() -> Self { + Self::new() + } +} + +impl SingleService + for QnameRouter +where + Octs: AsRef<[u8]>, + RequestOcts: Send + Sync, + CR: ComposeReply + Send + Sync + 'static, +{ + fn call( + &self, + request: Request, + ) -> Pin> + Send + Sync>> + where + RequestOcts: AsRef<[u8]> + Octets, + { + let question = request + .message() + .question() + .into_iter() + .next() + .expect("the caller need to make sure that there is question") + .expect("the caller need to make sure that the question can be parsed") + ; + let name = question.qname(); + let el = match self + .list + .iter() + .filter(|l| name.ends_with(&l.name)) + .max_by_key(|l| l.name.label_count()) + { + Some(el) => el, + None => { + // We can't find a suitable upstream. Generate a SERVFAIL + // reply with an EDE. + let builder: AdditionalBuilder>> = + mk_error_response(request.message(), OptRcode::SERVFAIL); + let msg = builder.as_message(); + let mut cr = CR::from_message(&msg) + .expect("CR should handle an error response"); + if let Ok(ede) = ExtendedError::>::new_with_str( + ExtendedErrorCode::OTHER, + "No upstream for request", + ) { + cr.add_opt(&ede).expect("Adding an ede should not fail"); + } + return Box::pin(ready(Ok(cr))); + } + }; + trace!("Routing request to '{}'", el.name); + el.service.call(request.clone()) + } +} diff --git a/src/net/server/single_service.rs b/src/net/server/single_service.rs new file mode 100644 index 000000000..64ef0aeff --- /dev/null +++ b/src/net/server/single_service.rs @@ -0,0 +1,185 @@ +//! This module provides the as simple service interface for services that +//! provide (at most) a single response. +//! +//! The simple service is represented by the trait [SingleService]. +//! Additionally, this module provide a new trait [ComposeReply] that +//! helps generating reply messages and [ReplyMessage] an implementation of +//! ComposeReply. + +#![warn(missing_docs)] +#![warn(clippy::missing_docs_in_private_items)] + +use super::message::Request; +use super::service::ServiceError; +use crate::base::message_builder::AdditionalBuilder; +use crate::base::opt::{AllOptData, ComposeOptData, LongOptData, OptRecord}; +use crate::base::{ + Message, MessageBuilder, Rtype, StreamTarget, UnknownRecordData, +}; +use crate::dep::octseq::Octets; +use std::boxed::Box; +use std::future::Future; +use std::pin::Pin; +use std::vec::Vec; + +/// Trait for a service that results in a single response. +pub trait SingleService { + /// Call the service with a request message. + /// + /// The service returns a boxed future. + fn call( + &self, + request: Request, + ) -> Pin> + Send + Sync>> + where + RequestOcts: AsRef<[u8]> + Octets; +} + +/// Trait for creating a reply message. +pub trait ComposeReply { + /// Start a reply from an existing message. + fn from_message(msg: &Message) -> Result + where + Octs: AsRef<[u8]>, + Self: Sized; + + /// Add an EDNS option. + fn add_opt( + &mut self, + opt: &impl ComposeOptData, + ) -> Result<(), LongOptData>; + + /// Return the reply message as an AdditionalBuilder with a StreamTarget. + fn additional_builder_stream_target( + &self, + ) -> Result>>, ServiceError>; +} + +/// Record changes to a Message for generating a reply message. +#[derive(Debug)] +pub struct ReplyMessage { + /// Field to store the underlying Message. + msg: Message>, + + /// The OPT record to add if required. + opt: Option>>, +} + +impl ReplyMessage { + /// Add an option that is to be included in the final message. + fn add_opt_impl(&mut self, opt: &impl ComposeOptData) { + self.opt_mut().push(opt).expect("push should not fail"); + } + + /// Returns a mutable reference to the OPT record. + /// + /// Adds one if necessary. + fn opt_mut(&mut self) -> &mut OptRecord> { + self.opt.get_or_insert_with(Default::default) + } +} + +impl ComposeReply for ReplyMessage { + fn from_message(msg: &Message) -> Result + where + Octs: AsRef<[u8]>, + { + let vec = msg.as_slice().to_vec(); + let msg = Message::from_octets(vec) + .expect("creating a Message from a Message should not fail"); + let mut repl = Self { msg, opt: None }; + + // As an example, copy any ECS option from the message. + // though this should be done in a separate ECS plugin. + let msg = repl.msg.clone(); + if let Some(optrec) = msg.opt() { + // Copy opt header. + let opt = repl.opt_mut(); + opt.set_udp_payload_size(optrec.udp_payload_size()); + //opt.set_version(optrec.version()); + opt.set_dnssec_ok(optrec.dnssec_ok()); + + for opt in optrec.opt().iter::>() { + let opt = opt?; + if let AllOptData::ClientSubnet(_ecs) = opt { + repl.add_opt_impl(&opt); + } + if let AllOptData::ExtendedError(ref _ede) = opt { + repl.add_opt_impl(&opt); + } + } + } + Ok(repl) + } + + fn add_opt( + &mut self, + opt: &impl ComposeOptData, + ) -> Result<(), LongOptData> { + self.add_opt_impl(opt); + Ok(()) + } + + fn additional_builder_stream_target( + &self, + ) -> Result>>, ServiceError> { + let source = &self.msg; + + let mut target = MessageBuilder::from_target( + StreamTarget::>::new(Default::default()) + .expect("new StreamTarget should not fail"), + ) + .expect("new MessageBuilder should not fail"); + + let header = source.header(); + *target.header_mut() = header; + + let source = source.question(); + let mut target = target.additional().builder().question(); + for rr in source { + let rr = rr?; + target.push(rr).expect("push should not fail"); + } + let mut source = source.answer()?; + let mut target = target.answer(); + for rr in &mut source { + let rr = rr?; + let rr = rr + .into_record::>()? + .expect("UnknownRecordData should not fail"); + target.push(rr).expect("push should not fail"); + } + + let mut source = source + .next_section()? + .expect("authority section should be present"); + let mut target = target.authority(); + for rr in &mut source { + let rr = rr?; + let rr = rr + .into_record::>()? + .expect("UnknownRecordData should not fail"); + target.push(rr).expect("push should not fail"); + } + + let source = source + .next_section()? + .expect("additional section should be present"); + let mut target = target.additional(); + for rr in source { + let rr = rr?; + if rr.rtype() == Rtype::OPT { + } else { + let rr = rr + .into_record::>()? + .expect("UnknownRecordData should not fail"); + target.push(rr).expect("push should not fail"); + } + } + if let Some(opt) = self.opt.as_ref() { + target.push(opt.as_record()).expect("push should not fail"); + } + + Ok(target) + } +}