Skip to content

Commit

Permalink
perf(runtime): add dedicated thread for request connect
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Dec 30, 2024
1 parent 264e18c commit 6b61a14
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 12 deletions.
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion spider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider"
version = "2.22.16"
version = "2.22.19"
authors = [
"j-mendez <jeff@spider.cloud>"
]
Expand Down Expand Up @@ -68,6 +68,7 @@ sysinfo = { version = "0.33", default-features = false, features = ["system"], o
sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite" ], optional = true }
h2 = "0.4"
tower = { version = "0.5", features = ["limit"]}
pin-project-lite = "0.2"

[dependencies.spider_chrome]
version = "2"
Expand All @@ -78,6 +79,9 @@ features = [
"stream"
]

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"

[target.'cfg(all(not(windows), not(target_os = "android"), not(target_env = "musl")))'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }

Expand Down
187 changes: 187 additions & 0 deletions spider/src/utils/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
sync::atomic::AtomicBool,
task::{Context, Poll},
};
use tokio::{
select,
sync::{mpsc::error::SendError, OnceCell},
};
use tower::{BoxError, Layer, Service};

/// A threadpool dedicated for connecting to services.
static CONNECT_THREAD_POOL: OnceCell<
tokio::sync::mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
> = OnceCell::const_new();

/// Is the background thread connect enabled.
static BACKGROUND_THREAD_CONNECT_ENABLED: AtomicBool = AtomicBool::new(true);

/// Is the background thread inited.
pub(crate) fn background_connect_threading() -> bool {
BACKGROUND_THREAD_CONNECT_ENABLED.load(std::sync::atomic::Ordering::Relaxed)
}

/// Init a background thread for request connect handling.
pub(crate) fn init_background_runtime() {
let _ = CONNECT_THREAD_POOL.set({
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let builder = std::thread::Builder::new();

if let Err(_) = builder.spawn(move || {
match tokio::runtime::Builder::new_multi_thread()
.thread_name("connect-background-pool-thread")
.worker_threads(num_cpus::get() as usize)
.on_thread_start(move || {
#[cfg(target_os = "linux")]
unsafe {
if libc::nice(10) == -1 && *libc::__errno_location() != 0 {
let error = std::io::Error::last_os_error();
log::error!("failed to set threadpool niceness: {}", error);
}
}
})
.enable_all()
.build()
{
Ok(rt) => {
rt.block_on(async move {
while let Some(work) = rx.recv().await {
tokio::task::spawn(work);
}
});
}
_ => {
BACKGROUND_THREAD_CONNECT_ENABLED
.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
}) {
let _ = tx.downgrade();
BACKGROUND_THREAD_CONNECT_ENABLED.store(false, std::sync::atomic::Ordering::Relaxed);
};

tx
});
}

/// This tower layer injects futures with a oneshot channel, and then sends them to the background runtime for processing.
#[derive(Copy, Clone)]
pub struct BackgroundProcessorLayer;

impl BackgroundProcessorLayer {
/// A new background proccess layer shortcut.
pub fn new() -> Self {
Self
}
}
impl<S> Layer<S> for BackgroundProcessorLayer {
type Service = BackgroundProcessor<S>;
fn layer(&self, service: S) -> Self::Service {
BackgroundProcessor::new(service)
}
}

impl std::fmt::Debug for BackgroundProcessorLayer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("BackgroundProcessorLayer").finish()
}
}

/// Send to the background runtime.
fn send_to_background_runtime(future: impl Future<Output = ()> + Send + 'static) {
let tx = CONNECT_THREAD_POOL.get().expect(
"background runtime should be initialized by calling init_background_runtime before use",
);

if let Err(SendError(_)) = tx.send(Box::pin(future)) {
log::error!("Failed to send future - background connect runtime channel is closed. Abandoning task.");
}
}

/// This tower service injects futures with a oneshot channel, and then sends them to the background runtime for processing.
#[derive(Debug, Clone)]
pub struct BackgroundProcessor<S> {
inner: S,
}

impl<S> BackgroundProcessor<S> {
/// Setup a new connect background processor.
pub fn new(inner: S) -> Self {
BackgroundProcessor { inner }
}
}

impl<S, Request> Service<Request> for BackgroundProcessor<S>
where
S: Service<Request>,
S::Response: Send + 'static,
S::Error: Into<BoxError> + Send,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = BoxError;
type Future = BackgroundResponseFuture<S::Response>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.inner.poll_ready(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)),
}
}

fn call(&mut self, req: Request) -> Self::Future {
let response = self.inner.call(req);
let (mut tx, rx) = tokio::sync::oneshot::channel();

let future = async move {
select! {
_ = tx.closed() => (),
result = response => {
let _ = tx.send(result.map_err(Into::into));
}
}
};

send_to_background_runtime(future);
BackgroundResponseFuture::new(rx)
}
}

pin_project! {
#[derive(Debug)]
/// A new background response future.
pub struct BackgroundResponseFuture<S> {
#[pin]
rx: tokio::sync::oneshot::Receiver<Result<S, BoxError>>,
}
}

impl<S> BackgroundResponseFuture<S> {
pub(crate) fn new(rx: tokio::sync::oneshot::Receiver<Result<S, BoxError>>) -> Self {
BackgroundResponseFuture { rx }
}
}

impl<S> Future for BackgroundResponseFuture<S>
where
S: Send + 'static,
{
type Output = Result<S, BoxError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.rx.poll(cx) {
Poll::Ready(v) => match v {
Ok(v) => Poll::Ready(v.map_err(Into::into)),
Err(err) => Poll::Ready(Err(Box::new(err) as BoxError)),
},
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 2 additions & 0 deletions spider/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// Absolute path domain handling.
pub mod abs;
/// Connect layer for reqwest.
pub mod connect;
/// Utils to modify the HTTP header.
pub mod header_utils;
/// String interner.
Expand Down
15 changes: 15 additions & 0 deletions spider/src/website.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,12 @@ impl Website {
_ => client,
};

let client = if crate::utils::connect::background_connect_threading() {
client.connector_layer(crate::utils::connect::BackgroundProcessorLayer::new())
} else {
client
};

let client = match self.configuration.concurrency_limit {
Some(limit) => {
client.connector_layer(tower::limit::concurrency::ConcurrencyLimitLayer::new(limit))
Expand Down Expand Up @@ -1121,6 +1127,13 @@ impl Website {
};

let client = self.configure_http_client_cookies(client);

let client = if crate::utils::connect::background_connect_threading() {
client.connector_layer(crate::utils::connect::BackgroundProcessorLayer::new())
} else {
client
};

let client = match self.configuration.concurrency_limit {
Some(limit) => {
client.connector_layer(tower::limit::concurrency::ConcurrencyLimitLayer::new(limit))
Expand Down Expand Up @@ -1416,6 +1429,7 @@ impl Website {
async fn setup(&mut self) -> (Client, Option<(Arc<AtomicI8>, tokio::task::JoinHandle<()>)>) {
self.determine_limits();
self.setup_disk();
crate::utils::connect::init_background_runtime();

if self.status != CrawlStatus::Active {
self.clear_all().await;
Expand All @@ -1437,6 +1451,7 @@ impl Website {
async fn setup(&mut self) -> (Client, Option<(Arc<AtomicI8>, tokio::task::JoinHandle<()>)>) {
self.determine_limits();
self.setup_disk();
crate::utils::connect::init_background_runtime();

if self.status != CrawlStatus::Active {
self.clear_all().await;
Expand Down
2 changes: 1 addition & 1 deletion spider_chrome/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider_chrome"
version = "2.22.16"
version = "2.22.19"
rust-version = "1.70"
authors = [
"j-mendez <jeff@spider.cloud>"
Expand Down
2 changes: 1 addition & 1 deletion spider_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider_cli"
version = "2.22.16"
version = "2.22.19"
authors = [
"j-mendez <jeff@spider.cloud>"
]
Expand Down
2 changes: 1 addition & 1 deletion spider_transformations/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider_transformations"
version = "2.22.16"
version = "2.22.19"
authors = [
"j-mendez <jeff@spider.cloud>"
]
Expand Down
2 changes: 1 addition & 1 deletion spider_utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider_utils"
version = "2.22.16"
version = "2.22.19"
authors = [
"j-mendez <jeff@spider.cloud>"
]
Expand Down
2 changes: 1 addition & 1 deletion spider_worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spider_worker"
version = "2.22.16"
version = "2.22.19"
authors = [
"j-mendez <jeff@spider.cloud>"
]
Expand Down

0 comments on commit 6b61a14

Please sign in to comment.