Skip to content

Commit

Permalink
feat(object_store): random IP address selection (apache#7123)
Browse files Browse the repository at this point in the history
* feat(object_store): random IP address selection

Closes apache#7117.

* refactor: directly call stdlib w/o hyper-util
  • Loading branch information
crepererum authored Feb 12, 2025
1 parent ef7d753 commit d3a875f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
50 changes: 50 additions & 0 deletions object_store/src/client/dns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::net::ToSocketAddrs;

use rand::prelude::SliceRandom;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tokio::task::JoinSet;

type DynErr = Box<dyn std::error::Error + Send + Sync>;

#[derive(Debug)]
pub(crate) struct ShuffleResolver;

impl Resolve for ShuffleResolver {
fn resolve(&self, name: Name) -> Resolving {
Box::pin(async move {
// use `JoinSet` to propagate cancelation
let mut tasks = JoinSet::new();
tasks.spawn_blocking(move || {
let it = (name.as_str(), 0).to_socket_addrs()?;
let mut addrs = it.collect::<Vec<_>>();

addrs.shuffle(&mut rand::rng());

Ok(Box::new(addrs.into_iter()) as Addrs)
});

tasks
.join_next()
.await
.expect("spawned on task")
.map_err(|err| Box::new(err) as DynErr)?
})
}
}
18 changes: 18 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
pub(crate) mod backoff;

mod dns;

#[cfg(test)]
pub(crate) mod mock_server;

Expand Down Expand Up @@ -110,6 +112,10 @@ pub enum ClientConfigKey {
ProxyCaCertificate,
/// List of hosts that bypass proxy
ProxyExcludes,
/// Randomize order addresses that the DNS resolution yields.
///
/// This will spread the connections accross more servers.
RandomizeAddresses,
/// Request timeout
///
/// The timeout is applied from when the request starts connecting until the
Expand Down Expand Up @@ -137,6 +143,7 @@ impl AsRef<str> for ClientConfigKey {
Self::ProxyUrl => "proxy_url",
Self::ProxyCaCertificate => "proxy_ca_certificate",
Self::ProxyExcludes => "proxy_excludes",
Self::RandomizeAddresses => "randomize_addresses",
Self::Timeout => "timeout",
Self::UserAgent => "user_agent",
}
Expand All @@ -163,6 +170,7 @@ impl FromStr for ClientConfigKey {
"proxy_url" => Ok(Self::ProxyUrl),
"proxy_ca_certificate" => Ok(Self::ProxyCaCertificate),
"proxy_excludes" => Ok(Self::ProxyExcludes),
"randomize_addresses" => Ok(Self::RandomizeAddresses),
"timeout" => Ok(Self::Timeout),
"user_agent" => Ok(Self::UserAgent),
_ => Err(super::Error::UnknownConfigurationKey {
Expand Down Expand Up @@ -245,6 +253,7 @@ pub struct ClientOptions {
http2_max_frame_size: Option<ConfigValue<u32>>,
http1_only: ConfigValue<bool>,
http2_only: ConfigValue<bool>,
randomize_addresses: ConfigValue<bool>,
}

impl Default for ClientOptions {
Expand Down Expand Up @@ -280,6 +289,7 @@ impl Default for ClientOptions {
// https://github.com/apache/arrow-rs/issues/5194
http1_only: true.into(),
http2_only: Default::default(),
randomize_addresses: true.into(),
}
}
}
Expand Down Expand Up @@ -322,6 +332,9 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate = Some(value.into()),
ClientConfigKey::ProxyExcludes => self.proxy_excludes = Some(value.into()),
ClientConfigKey::RandomizeAddresses => {
self.randomize_addresses.parse(value);
}
ClientConfigKey::Timeout => self.timeout = Some(ConfigValue::Deferred(value.into())),
ClientConfigKey::UserAgent => {
self.user_agent = Some(ConfigValue::Deferred(value.into()))
Expand Down Expand Up @@ -358,6 +371,7 @@ impl ClientOptions {
ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
ClientConfigKey::ProxyCaCertificate => self.proxy_ca_certificate.clone(),
ClientConfigKey::ProxyExcludes => self.proxy_excludes.clone(),
ClientConfigKey::RandomizeAddresses => Some(self.randomize_addresses.to_string()),
ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
ClientConfigKey::UserAgent => self
.user_agent
Expand Down Expand Up @@ -675,6 +689,10 @@ impl ClientOptions {
// transparently decompress the body via the non-default `gzip` feature.
builder = builder.no_gzip();

if self.randomize_addresses.get()? {
builder = builder.dns_resolver(Arc::new(dns::ShuffleResolver));
}

builder
.https_only(!self.allow_http.get()?)
.build()
Expand Down

0 comments on commit d3a875f

Please sign in to comment.