From 240be38dbd49b28b59b63190ef5bc418643e3fc0 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 11 Mar 2023 16:36:14 +0100 Subject: [PATCH] migrate to a real connection pool for the sqlite connections --- Cargo.lock | 134 ++++++++++++++++++++++++++++++++--- Cargo.toml | 3 +- src/config.rs | 4 ++ src/storage/archive_index.rs | 99 ++++++++------------------ src/storage/mod.rs | 20 +++++- src/storage/sqlite_pool.rs | 76 ++++++++++++++++++++ 6 files changed, 253 insertions(+), 83 deletions(-) create mode 100644 src/storage/sqlite_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 492ce81b5..7158c2440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,12 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -776,6 +782,37 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "camino" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -1426,11 +1463,11 @@ dependencies = [ "kuchiki", "log", "lol_html", - "lru", "memmap2", "mime", "mime_guess", "mockito", + "moka", "num_cpus", "once_cell", "path-slash", @@ -1441,6 +1478,7 @@ dependencies = [ "prometheus", "r2d2", "r2d2_postgres", + "r2d2_sqlite", "rand 0.8.5", "rayon", "regex", @@ -1556,6 +1594,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "extend" version = "0.1.2" @@ -2395,6 +2442,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "globset" version = "0.4.10" @@ -3042,15 +3095,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "lru" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" -dependencies = [ - "hashbrown 0.13.2", -] - [[package]] name = "mac" version = "0.1.1" @@ -3212,6 +3256,28 @@ dependencies = [ "similar", ] +[[package]] +name = "moka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b6446f16d504e3d575df79cabb11bfbe9f24b17e9562d964a815db7b28ae3ec" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "num_cpus", + "once_cell", + "parking_lot 0.12.1", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -3933,6 +3999,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + [[package]] name = "quick-error" version = "2.0.1" @@ -3969,6 +4046,16 @@ dependencies = [ "r2d2", ] +[[package]] +name = "r2d2_sqlite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4f5d0337e99cd5cacd91ffc326c6cc9d8078def459df560c4f9bf9ba4a51034" +dependencies = [ + "r2d2", + "rusqlite", +] + [[package]] name = "rand" version = "0.7.3" @@ -4735,6 +4822,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -4920,6 +5022,12 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tar" version = "0.4.38" @@ -5393,6 +5501,12 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index e52fdd7b7..8e2870ea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ semver = { version = "1.0.4", features = ["serde"] } slug = "0.1.1" r2d2 = "0.8" r2d2_postgres = "0.18" +r2d2_sqlite = "0.21.0" url = { version = "2.1.1", features = ["serde"] } docsrs-metadata = { path = "crates/metadata" } anyhow = { version = "1.0.42", features = ["backtrace"]} @@ -70,7 +71,7 @@ serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.10.5", optional = true} rusqlite = { version = "0.28.0", features = ["bundled"] } -lru = "0.9.0" +moka = { version ="0.10.0", default-features = false, features = ["sync"]} # Async tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } diff --git a/src/config.rs b/src/config.rs index 107016d70..d87a510e2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,9 @@ pub struct Config { pub(crate) max_pool_size: u32, pub(crate) min_pool_idle: u32, + // local pool for sqlite connections + pub(crate) max_sqlite_pool_size: u64, + // Storage params pub(crate) storage_backend: StorageKind, @@ -126,6 +129,7 @@ impl Config { database_url: require_env("DOCSRS_DATABASE_URL")?, max_pool_size: env("DOCSRS_MAX_POOL_SIZE", 90)?, + max_sqlite_pool_size: env("DOCSRS_MAX_SQLITE_POOL_SIZE", 500)?, min_pool_idle: env("DOCSRS_MIN_POOL_IDLE", 10)?, storage_backend: env("DOCSRS_STORAGE_BACKEND", StorageKind::Database)?, diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index a9b364af0..fb1d5bf41 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,41 +1,16 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; -use lru::LruCache; use memmap2::MmapOptions; -use rusqlite::{Connection, OpenFlags, OptionalExtension}; +use rusqlite::{Connection, OptionalExtension}; use serde::de::DeserializeSeed; use serde::de::{IgnoredAny, MapAccess, Visitor}; use serde::{Deserialize, Deserializer, Serialize}; -use std::num::NonZeroUsize; -use std::{ - cell::RefCell, - collections::HashMap, - fmt, fs, - fs::File, - io, - io::Read, - path::{Path, PathBuf}, -}; +use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path}; -static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; +use super::sqlite_pool::SqliteConnectionPool; -thread_local! { - // local SQLite connection cache. - // `rusqlite::Connection` is not `Sync`, so we need to keep this by thread. - // Parallel connections to the same SQLite file are handled by SQLite itself. - // - // Alternative would be to have this cache global, but to prevent using - // the same connection from multiple threads at once. - // - // The better solution probably depends on the request pattern: are we - // typically having many requests to a small group of crates? - // Or are the requests more spread over many crates the there wouldn't be - // many conflicts on the connection? - static SQLITE_CONNECTIONS: RefCell> = RefCell::new( - LruCache::new(NonZeroUsize::new(32).unwrap()) - ); -} +static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; #[derive(Deserialize, Serialize)] pub(crate) struct FileInfo { @@ -211,35 +186,6 @@ fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { .deserialize(&mut deserializer)?) } -/// try to open an index file as SQLite -/// Uses a thread-local cache of open connections to the index files. -/// Will test the connection before returning it, and attempt to -/// reconnect if the test fails. -fn with_sqlite_connection, F: Fn(&Connection) -> Result>( - path: P, - f: F, -) -> Result { - let path = path.as_ref().to_owned(); - SQLITE_CONNECTIONS.with(|connections| { - let mut connections = connections.borrow_mut(); - - if let Some(conn) = connections.get(&path) { - if conn.execute("SELECT 1", []).is_ok() { - return f(conn); - } - } - - let conn = Connection::open_with_flags( - &path, - OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, - )?; - - // we're using `get_or_insert` to save the second lookup receiving the - // reference into the cache, after having pushed the entry. - f(connections.get_or_insert(path, || conn)) - }) -} - fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { let mut stmt = conn.prepare( " @@ -288,9 +234,10 @@ fn is_sqlite_file>(archive_index_path: P) -> Result { pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, + pool: &SqliteConnectionPool, ) -> Result> { if is_sqlite_file(&archive_index_path)? { - with_sqlite_connection(archive_index_path, |connection| { + pool.with_connection(archive_index_path, |connection| { find_in_sqlite_index(connection, search_for) }) } else { @@ -381,15 +328,23 @@ mod tests { assert!(!is_sqlite_file(&cbor_index_file).unwrap()); - let fi = find_in_file(cbor_index_file.path(), "testfile1") - .unwrap() - .unwrap(); + let fi = find_in_file( + cbor_index_file.path(), + "testfile1", + &SqliteConnectionPool::default(), + ) + .unwrap() + .unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - assert!(find_in_file(cbor_index_file.path(), "some_other_file") - .unwrap() - .is_none()); + assert!(find_in_file( + cbor_index_file.path(), + "some_other_file", + &SqliteConnectionPool::default(), + ) + .unwrap() + .is_none()); } #[test] @@ -400,14 +355,20 @@ mod tests { create(&mut tf, &tempfile).unwrap(); assert!(is_sqlite_file(&tempfile).unwrap()); - let fi = find_in_file(&tempfile, "testfile1").unwrap().unwrap(); + let fi = find_in_file(&tempfile, "testfile1", &SqliteConnectionPool::default()) + .unwrap() + .unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - assert!(find_in_file(&tempfile, "some_other_file") - .unwrap() - .is_none()); + assert!(find_in_file( + &tempfile, + "some_other_file", + &SqliteConnectionPool::default(), + ) + .unwrap() + .is_none()); } #[test] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d06a84590..a65c51d58 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,10 +2,12 @@ mod archive_index; mod compression; mod database; mod s3; +mod sqlite_pool; pub use self::compression::{compress, decompress, CompressionAlgorithm, CompressionAlgorithms}; use self::database::DatabaseBackend; use self::s3::S3Backend; +use self::sqlite_pool::SqliteConnectionPool; use crate::error::Result; use crate::web::metrics::RenderingTimesRecorder; use crate::{db::Pool, Config, Metrics}; @@ -13,6 +15,7 @@ use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use path_slash::PathExt; use std::io::BufReader; +use std::num::NonZeroU64; use std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -113,6 +116,7 @@ enum StorageBackend { pub struct Storage { backend: StorageBackend, config: Arc, + sqlite_pool: SqliteConnectionPool, } impl Storage { @@ -123,6 +127,10 @@ impl Storage { runtime: Arc, ) -> Result { Ok(Storage { + sqlite_pool: SqliteConnectionPool::new( + NonZeroU64::new(config.max_sqlite_pool_size) + .ok_or_else(|| anyhow!("invalid sqlite pool size"))?, + ), config: config.clone(), backend: match config.storage_backend { StorageKind::Database => { @@ -229,7 +237,9 @@ impl Storage { pub(crate) fn exists_in_archive(&self, archive_path: &str, path: &str) -> Result { match self.get_index_filename(archive_path) { - Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)?.is_some()), + Ok(index_filename) => { + Ok(archive_index::find_in_file(index_filename, path, &self.sqlite_pool)?.is_some()) + } Err(err) => { if err.downcast_ref::().is_some() { Ok(false) @@ -306,8 +316,12 @@ impl Storage { if let Some(ref mut t) = fetch_time { t.step("find path in index"); } - let info = archive_index::find_in_file(self.get_index_filename(archive_path)?, path)? - .ok_or(PathNotFoundError)?; + let info = archive_index::find_in_file( + self.get_index_filename(archive_path)?, + path, + &self.sqlite_pool, + )? + .ok_or(PathNotFoundError)?; if let Some(t) = fetch_time { t.step("range request"); diff --git a/src/storage/sqlite_pool.rs b/src/storage/sqlite_pool.rs new file mode 100644 index 000000000..2bfe3ad3e --- /dev/null +++ b/src/storage/sqlite_pool.rs @@ -0,0 +1,76 @@ +use anyhow::Result; +use moka::sync::Cache; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{Connection, OpenFlags}; +use std::{ + num::NonZeroU64, + path::{Path, PathBuf}, + time::Duration, +}; + +static MAX_IDLE_TIME: Duration = Duration::from_secs(10 * 60); +static MAX_LIFE_TIME: Duration = Duration::from_secs(60 * 60); + +/// SQLite connection pool. +/// +/// Typical connection pools handle many connections to a single database, +/// while this one handles some connections to many databases. +/// +/// The more connections we keep alive, the more open files we have, +/// so you might need to tweak this limit based on the max open files +/// on your system. +/// +/// We open the databases in readonly mode. +/// We are using an additional connection pool per database to parallel requests +/// can be efficiently answered. Because of this the actual max connection count +/// might be higher than the given max_connections. +/// +/// We keep at minimum of one connection per database, for one hour. +/// Any additional connections will be dropped after 10 minutes of inactivity. +#[derive(Clone)] +pub(crate) struct SqliteConnectionPool { + pools: Cache>, +} + +impl Default for SqliteConnectionPool { + fn default() -> Self { + Self::new(NonZeroU64::new(10).unwrap()) + } +} + +impl SqliteConnectionPool { + pub(crate) fn new(max_connections: NonZeroU64) -> Self { + Self { + pools: Cache::builder() + .max_capacity(max_connections.get()) + .time_to_idle(MAX_LIFE_TIME) + .build(), + } + } + + pub(crate) fn with_connection, F: Fn(&Connection) -> Result>( + &self, + path: P, + f: F, + ) -> Result { + let path = path.as_ref().to_owned(); + + let pool = self + .pools + .entry(path.clone()) + .or_insert_with(|| { + let manager = SqliteConnectionManager::file(path) + .with_flags(OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX); + r2d2::Pool::builder() + .min_idle(Some(1)) + .max_lifetime(Some(MAX_LIFE_TIME)) + .idle_timeout(Some(MAX_IDLE_TIME)) + .max_size(10) + .build_unchecked(manager) + }) + .into_value(); + + let conn = pool.get()?; + f(&conn) + } +}