Skip to content

Commit

Permalink
feat(object_store): random IP address selection
Browse files Browse the repository at this point in the history
Closes apache#7117.
  • Loading branch information
crepererum committed Feb 12, 2025
1 parent a85fc03 commit 60732d4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
4 changes: 3 additions & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ walkdir = { version = "2", optional = true }
# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
hyper = { version = "1.2", default-features = false, optional = true }
hyper-util = { version = "0.1.10", default-features = false, optional = true, features = ["client", "client-legacy", "tokio"] }
quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
Expand All @@ -54,6 +55,7 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] }
tower-service = { version = "0.3", default-features = false, optional = true }
md-5 = { version = "0.10.6", default-features = false, optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }

Expand All @@ -62,7 +64,7 @@ nix = { version = "0.29.0", features = ["fs"] }

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
cloud = ["serde", "serde_json", "tower-service", "quick-xml", "hyper", "hyper-util", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud", "httparse"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
Expand Down
59 changes: 59 additions & 0 deletions object_store/src/client/dns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::str::FromStr;

use hyper_util::client::legacy::connect::dns::{
GaiResolver as HyperGaiResolver, Name as HyperName,
};
use rand::prelude::SliceRandom;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tower_service::Service;

type DynErr = Box<dyn std::error::Error + Send + Sync>;

#[derive(Debug)]
pub(crate) struct ShuffleResolver(HyperGaiResolver);

impl Default for ShuffleResolver {
fn default() -> Self {
Self(HyperGaiResolver::new())
}
}

impl Resolve for ShuffleResolver {
fn resolve(&self, name: Name) -> Resolving {
let inner = self.0.clone();

Box::pin(async move {
let mut inner = inner;

// convert name reqwest -> hyper
let name = HyperName::from_str(name.as_str()).map_err(|err| Box::new(err) as DynErr)?;

let mut addr = inner
.call(name)
.await
.map_err(|err| Box::new(err) as DynErr)?
.collect::<Vec<_>>();

addr.shuffle(&mut rand::rng());

Ok(Box::new(addr.into_iter()) as Addrs)
})
}
}
18 changes: 18 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
pub(crate) mod backoff;

mod dns;

#[cfg(test)]
pub(crate) mod mock_server;

Expand Down Expand Up @@ -110,6 +112,10 @@ pub enum ClientConfigKey {
ProxyCaCertificate,
/// List of hosts that bypass proxy
ProxyExcludes,
/// Randomize order addresses that the DNS resolution yields.
///
/// This will spread the connections accross more servers.
RandomizeAddresses,
/// Request timeout
///
/// The timeout is applied from when the request starts connecting until the
Expand Down Expand Up @@ -137,6 +143,7 @@ impl AsRef<str> for ClientConfigKey {
Self::ProxyUrl => "proxy_url",
Self::ProxyCaCertificate => "proxy_ca_certificate",
Self::ProxyExcludes => "proxy_excludes",
Self::RandomizeAddresses => "randomize_addresses",
Self::Timeout => "timeout",
Self::UserAgent => "user_agent",
}
Expand All @@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey {
"proxy_url" => Ok(Self::ProxyUrl),
"proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
"proxy_excludes" => Ok(Self::ProxyExcludes),
"randomize_addresses" => Ok(Self::RandomizeAddresses),
"timeout" => Ok(Self::Timeout),
"user_agent" => Ok(Self::UserAgent),
_ => Err(super::Error::UnknownConfigurationKey {
Expand Down Expand Up @@ -245,6 +253,7 @@ pub struct ClientOptions {
http2_max_frame_size: Option<ConfigValue<u32>>,
http1_only: ConfigValue<bool>,
http2_only: ConfigValue<bool>,
randomize_addresses: ConfigValue<bool>,
}

impl Default for ClientOptions {
Expand Down Expand Up @@ -280,6 +289,7 @@ impl Default for ClientOptions {
// https://github.com/apache/arrow-rs/issues/5194
http1_only: true.into(),
http2_only: Default::default(),
randomize_addresses: true.into(),
}
}
}
Expand Down Expand Up @@ -322,6 +332,9 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
ClientConfigKey::RandomizeAddresses => {
self.randomize_addresses.parse(value);
}
ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
ClientConfigKey::UserAgent => {
self.user_agent = Some(ConfigValue::Deferred(value.into()))
Expand Down Expand Up @@ -358,6 +371,7 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()),
ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
ClientConfigKey::UserAgent => self
.user_agent
Expand Down Expand Up @@ -675,6 +689,10 @@ impl ClientOptions {
// transparently decompress the body via the non-default `gzip` feature.
builder = builder.no_gzip();

if self.randomize_addresses.get()? {
builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver::default()));
}

builder
.https_only(!self.allow_http.get()?)
.build()
Expand Down

0 comments on commit 60732d4

Please sign in to comment.