From d3a875f8b603d2c7429964a6f8959bd055616d34 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 12 Feb 2025 14:17:09 +0100 Subject: [PATCH] feat(object_store): random IP address selection (#7123) * feat(object_store): random IP address selection Closes #7117. * refactor: directly call stdlib w/o hyper-util --- object_store/src/client/dns.rs | 50 ++++++++++++++++++++++++++++++++++ object_store/src/client/mod.rs | 18 ++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 object_store/src/client/dns.rs diff --git a/object_store/src/client/dns.rs b/object_store/src/client/dns.rs new file mode 100644 index 000000000000..32e9291bac76 --- /dev/null +++ b/object_store/src/client/dns.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::net::ToSocketAddrs; + +use rand::prelude::SliceRandom; +use reqwest::dns::{Addrs, Name, Resolve, Resolving}; +use tokio::task::JoinSet; + +type DynErr = Box; + +#[derive(Debug)] +pub(crate) struct ShuffleResolver; + +impl Resolve for ShuffleResolver { + fn resolve(&self, name: Name) -> Resolving { + Box::pin(async move { + // use `JoinSet` to propagate cancelation + let mut tasks = JoinSet::new(); + tasks.spawn_blocking(move || { + let it = (name.as_str(), 0).to_socket_addrs()?; + let mut addrs = it.collect::>(); + + addrs.shuffle(&mut rand::rng()); + + Ok(Box::new(addrs.into_iter()) as Addrs) + }); + + tasks + .join_next() + .await + .expect("spawned on task") + .map_err(|err| Box::new(err) as DynErr)? + }) + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 1b7ce5aa7a78..629715955647 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -19,6 +19,8 @@ pub(crate) mod backoff; +mod dns; + #[cfg(test)] pub(crate) mod mock_server; @@ -110,6 +112,10 @@ pub enum ClientConfigKey { ProxyCaCertificate, /// List of hosts that bypass proxy ProxyExcludes, + /// Randomize order addresses that the DNS resolution yields. + /// + /// This will spread the connections accross more servers. + RandomizeAddresses, /// Request timeout /// /// The timeout is applied from when the request starts connecting until the @@ -137,6 +143,7 @@ impl AsRef for ClientConfigKey { Self::ProxyUrl => "proxy_url", Self::ProxyCaCertificate => "proxy_ca_certificate", Self::ProxyExcludes => "proxy_excludes", + Self::RandomizeAddresses => "randomize_addresses", Self::Timeout => "timeout", Self::UserAgent => "user_agent", } @@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey { "proxy_url" => Ok(Self::ProxyUrl), "proxy_ca_certificate" => Ok(Self::ProxyCaCertificate), "proxy_excludes" => Ok(Self::ProxyExcludes), + "randomize_addresses" => Ok(Self::RandomizeAddresses), "timeout" => Ok(Self::Timeout), "user_agent" => Ok(Self::UserAgent), _ => Err(super::Error::UnknownConfigurationKey { @@ -245,6 +253,7 @@ pub struct ClientOptions { http2_max_frame_size: Option>, http1_only: ConfigValue, http2_only: ConfigValue, + randomize_addresses: ConfigValue, } impl Default for ClientOptions { @@ -280,6 +289,7 @@ impl Default for ClientOptions { // https://github.com/apache/arrow-rs/issues/5194 http1_only: true.into(), http2_only: Default::default(), + randomize_addresses: true.into(), } } } @@ -322,6 +332,9 @@ impl ClientOptions { ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()), ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()), ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()), + ClientConfigKey::RandomizeAddresses => { + self.randomize_addresses.parse(value); + } ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())), ClientConfigKey::UserAgent => { self.user_agent = Some(ConfigValue::Deferred(value.into())) @@ -358,6 +371,7 @@ impl ClientOptions { ClientConfigKey::ProxyUrl => self.proxy_url.clone(), ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(), ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(), + ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()), ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration), ClientConfigKey::UserAgent => self .user_agent @@ -675,6 +689,10 @@ impl ClientOptions { // transparently decompress the body via the non-default `gzip` feature. builder = builder.no_gzip(); + if self.randomize_addresses.get()? { + builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver)); + } + builder .https_only(!self.allow_http.get()?) .build()