From 60732d484fb8a4df96851c769c30339051783790 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 12 Feb 2025 12:22:43 +0100 Subject: [PATCH] feat(object_store): random IP address selection Closes #7117. --- object_store/Cargo.toml | 4 ++- object_store/src/client/dns.rs | 59 ++++++++++++++++++++++++++++++++++ object_store/src/client/mod.rs | 18 +++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 object_store/src/client/dns.rs diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 168d2eb6ae39..853bff38262a 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -46,6 +46,7 @@ walkdir = { version = "2", optional = true } # Cloud storage support base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } hyper = { version = "1.2", default-features = false, optional = true } +hyper-util = { version = "0.1.10", default-features = false, optional = true, features = ["client", "client-legacy", "tokio"] } quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1.0", default-features = false, optional = true } @@ -54,6 +55,7 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls- ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } +tower-service = { version = "0.3", default-features = false, optional = true } md-5 = { version = "0.10.6", default-features = false, optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } @@ -62,7 +64,7 @@ nix = { version = "0.29.0", features = ["fs"] } [features] default = ["fs"] -cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] +cloud = ["serde", "serde_json", "tower-service", "quick-xml", "hyper", "hyper-util", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] azure = ["cloud", "httparse"] fs = ["walkdir"] gcp = ["cloud", "rustls-pemfile"] diff --git a/object_store/src/client/dns.rs b/object_store/src/client/dns.rs new file mode 100644 index 000000000000..942b33619bc1 --- /dev/null +++ b/object_store/src/client/dns.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::str::FromStr; + +use hyper_util::client::legacy::connect::dns::{ + GaiResolver as HyperGaiResolver, Name as HyperName, +}; +use rand::prelude::SliceRandom; +use reqwest::dns::{Addrs, Name, Resolve, Resolving}; +use tower_service::Service; + +type DynErr = Box; + +#[derive(Debug)] +pub(crate) struct ShuffleResolver(HyperGaiResolver); + +impl Default for ShuffleResolver { + fn default() -> Self { + Self(HyperGaiResolver::new()) + } +} + +impl Resolve for ShuffleResolver { + fn resolve(&self, name: Name) -> Resolving { + let inner = self.0.clone(); + + Box::pin(async move { + let mut inner = inner; + + // convert name reqwest -> hyper + let name = HyperName::from_str(name.as_str()).map_err(|err| Box::new(err) as DynErr)?; + + let mut addr = inner + .call(name) + .await + .map_err(|err| Box::new(err) as DynErr)? + .collect::>(); + + addr.shuffle(&mut rand::rng()); + + Ok(Box::new(addr.into_iter()) as Addrs) + }) + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 1b7ce5aa7a78..ae211a7a413e 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -19,6 +19,8 @@ pub(crate) mod backoff; +mod dns; + #[cfg(test)] pub(crate) mod mock_server; @@ -110,6 +112,10 @@ pub enum ClientConfigKey { ProxyCaCertificate, /// List of hosts that bypass proxy ProxyExcludes, + /// Randomize order addresses that the DNS resolution yields. + /// + /// This will spread the connections accross more servers. + RandomizeAddresses, /// Request timeout /// /// The timeout is applied from when the request starts connecting until the @@ -137,6 +143,7 @@ impl AsRef for ClientConfigKey { Self::ProxyUrl => "proxy_url", Self::ProxyCaCertificate => "proxy_ca_certificate", Self::ProxyExcludes => "proxy_excludes", + Self::RandomizeAddresses => "randomize_addresses", Self::Timeout => "timeout", Self::UserAgent => "user_agent", } @@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey { "proxy_url" => Ok(Self::ProxyUrl), "proxy_ca_certificate" => Ok(Self::ProxyCaCertificate), "proxy_excludes" => Ok(Self::ProxyExcludes), + "randomize_addresses" => Ok(Self::RandomizeAddresses), "timeout" => Ok(Self::Timeout), "user_agent" => Ok(Self::UserAgent), _ => Err(super::Error::UnknownConfigurationKey { @@ -245,6 +253,7 @@ pub struct ClientOptions { http2_max_frame_size: Option>, http1_only: ConfigValue, http2_only: ConfigValue, + randomize_addresses: ConfigValue, } impl Default for ClientOptions { @@ -280,6 +289,7 @@ impl Default for ClientOptions { // https://github.com/apache/arrow-rs/issues/5194 http1_only: true.into(), http2_only: Default::default(), + randomize_addresses: true.into(), } } } @@ -322,6 +332,9 @@ impl ClientOptions { ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()), ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()), ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()), + ClientConfigKey::RandomizeAddresses => { + self.randomize_addresses.parse(value); + } ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())), ClientConfigKey::UserAgent => { self.user_agent = Some(ConfigValue::Deferred(value.into())) @@ -358,6 +371,7 @@ impl ClientOptions { ClientConfigKey::ProxyUrl => self.proxy_url.clone(), ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(), ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(), + ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()), ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration), ClientConfigKey::UserAgent => self .user_agent @@ -675,6 +689,10 @@ impl ClientOptions { // transparently decompress the body via the non-default `gzip` feature. builder = builder.no_gzip(); + if self.randomize_addresses.get()? { + builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver::default())); + } + builder .https_only(!self.allow_http.get()?) .build()