From 9d2d9f11c77cc790f1e13a4f47ae82c2ada0a560 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Tue, 7 Dec 2021 10:57:05 -0800 Subject: [PATCH] support for creating zettacache with multiple disks (#32) Add support for creating zettacache with multiple disks. --- cmd/zfs_object_agent/Cargo.lock | 8 + cmd/zfs_object_agent/server/src/main.rs | 16 +- cmd/zfs_object_agent/util/src/btreemap_ext.rs | 9 + cmd/zfs_object_agent/util/src/lib.rs | 2 + cmd/zfs_object_agent/zcdb/src/main.rs | 20 +- cmd/zfs_object_agent/zettacache/Cargo.toml | 2 + .../zettacache/src/base_types.rs | 38 +- .../zettacache/src/block_access.rs | 218 ++++--- .../zettacache/src/block_allocator.rs | 576 ++++++++---------- .../zettacache/src/block_based_log.rs | 24 +- .../zettacache/src/extent_allocator.rs | 210 ++++--- cmd/zfs_object_agent/zettacache/src/index.rs | 6 +- .../zettacache/src/space_map.rs | 75 +-- .../zettacache/src/zcachedb.rs | 13 +- .../zettacache/src/zettacache.rs | 497 +++++++++------ cmd/zfs_object_agent/zettaobject/src/init.rs | 8 +- cmd/zfs_object_agent/zoa/src/lib.rs | 6 +- 17 files changed, 937 insertions(+), 791 deletions(-) create mode 100644 cmd/zfs_object_agent/util/src/btreemap_ext.rs diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index 32fbde752be7..f183029ecef0 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -348,6 +348,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bimap" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07" + [[package]] name = "bincode" version = "1.3.3" @@ -2695,6 +2701,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "bimap", "bincode", "bytes", "chrono", @@ -2709,6 +2716,7 @@ dependencies = [ "more-asserts", "nix", "num", + "rand", "roaring", "seahash", "serde 1.0.130", diff --git a/cmd/zfs_object_agent/server/src/main.rs b/cmd/zfs_object_agent/server/src/main.rs index 153613cce26a..b2a5f2afe0cc 100644 --- a/cmd/zfs_object_agent/server/src/main.rs +++ b/cmd/zfs_object_agent/server/src/main.rs @@ -38,12 +38,14 @@ fn main() { .takes_value(true), ) .arg( - Arg::with_name("cache-file") + Arg::with_name("cache-device") .short("c") - .long("cache-file") - .value_name("FILE") + .long("cache-device") + .value_name("PATH") .help("File/device to use for ZettaCache") - .takes_value(true), + .takes_value(true) + .multiple(true) + .number_of_values(1), ) .arg( Arg::with_name("config-file") @@ -143,7 +145,9 @@ fn main() { } _ => { let socket_dir = matches.value_of("socket-dir").unwrap(); - let cache_path = matches.value_of("cache-file"); + let cache_paths = matches + .values_of("cache-device") + .map_or(Vec::new(), |values| values.collect()); util::setup_logging( matches.occurrences_of("verbosity"), @@ -184,7 +188,7 @@ fn main() { // trace!() can be used indiscriminately. trace!("logging level TRACE enabled"); - zettaobject::init::start(socket_dir, cache_path); + zettaobject::init::start(socket_dir, cache_paths); } } } diff --git a/cmd/zfs_object_agent/util/src/btreemap_ext.rs b/cmd/zfs_object_agent/util/src/btreemap_ext.rs new file mode 100644 index 000000000000..ca1112b84dfa --- /dev/null +++ b/cmd/zfs_object_agent/util/src/btreemap_ext.rs @@ -0,0 +1,9 @@ +use std::collections::BTreeMap; + +/// Iterate over the values in the BTreeMap, starting with `start`, and looping +/// around such that all entries will be visited once. +pub fn iter_wrapping(map: &BTreeMap, start: K) -> impl Iterator { + map.range(start..) + .chain(map.iter().take_while(move |(&k, _v)| k != start)) + .map(|(_k, v)| v) +} diff --git a/cmd/zfs_object_agent/util/src/lib.rs b/cmd/zfs_object_agent/util/src/lib.rs index 684b8f30c716..85b0c0d94b0a 100644 --- a/cmd/zfs_object_agent/util/src/lib.rs +++ b/cmd/zfs_object_agent/util/src/lib.rs @@ -4,6 +4,7 @@ #![warn(clippy::cast_sign_loss)] mod bitmap_range_iterator; +mod btreemap_ext; mod die; mod from64; mod lock_set; @@ -15,6 +16,7 @@ mod tunable; mod vec_ext; pub use bitmap_range_iterator::BitmapRangeIterator; +pub use btreemap_ext::iter_wrapping; pub use die::maybe_die_with; pub use from64::From64; pub use lock_set::LockSet; diff --git a/cmd/zfs_object_agent/zcdb/src/main.rs b/cmd/zfs_object_agent/zcdb/src/main.rs index 5d4a8ea2bf6f..d97104ab244c 100644 --- a/cmd/zfs_object_agent/zcdb/src/main.rs +++ b/cmd/zfs_object_agent/zcdb/src/main.rs @@ -30,9 +30,14 @@ async fn main() { .about("ZFS ZettaCache Debugger") .version(GIT_VERSION) .arg( - Arg::with_name("device") - .help("ZettaCache Device") - .required(true), + Arg::with_name("cache-device") + .short("c") + .long("cache-device") + .value_name("PATH") + .help("File/device to use for ZettaCache") + .takes_value(true) + .multiple(true) + .number_of_values(1), ) .subcommand( SubCommand::with_name("dump-structures") @@ -71,7 +76,7 @@ async fn main() { .subcommand(SubCommand::with_name("space-usage").about("dump space usage statistics")) .get_matches(); - let device = matches.value_of("device").unwrap(); + let cache_paths = matches.values_of("cache-device").unwrap().collect(); match matches.subcommand() { ("dump-structures", Some(subcommand_matches)) => { ZettaCacheDBCommand::issue_command( @@ -82,7 +87,7 @@ async fn main() { .operation_log_raw(subcommand_matches.is_present("operation-log-raw")) .index_log_raw(subcommand_matches.is_present("index-log-raw")), ), - device, + cache_paths, ) .await; } @@ -91,12 +96,13 @@ async fn main() { ZettaCacheDBCommand::DumpSlabs( DumpSlabsOptions::default().verbosity(subcommand_matches.occurrences_of("v")), ), - device, + cache_paths, ) .await; } ("space-usage", Some(_)) => { - ZettaCacheDBCommand::issue_command(ZettaCacheDBCommand::DumpSpaceUsage, device).await; + ZettaCacheDBCommand::issue_command(ZettaCacheDBCommand::DumpSpaceUsage, cache_paths) + .await; } _ => { matches.usage(); diff --git a/cmd/zfs_object_agent/zettacache/Cargo.toml b/cmd/zfs_object_agent/zettacache/Cargo.toml index 3ecfe2b590e5..40edacfd2007 100644 --- a/cmd/zfs_object_agent/zettacache/Cargo.toml +++ b/cmd/zfs_object_agent/zettacache/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" [dependencies] anyhow = "1.0" async-stream = "0.3.0" +bimap = "0.6.1" bincode = "1.3.2" bytes = "1.0" chrono = "0.4" @@ -23,6 +24,7 @@ metered = "0.8" more-asserts = "0.2.1" nix = "0.23.0" num = "0.4.0" +rand = "0.8.3" roaring = "0.7.0" seahash = "4.1.0" serde = { version = "1.0.125", features = ["derive"] } diff --git a/cmd/zfs_object_agent/zettacache/src/base_types.rs b/cmd/zfs_object_agent/zettacache/src/base_types.rs index b9b5e1cd1643..72dafa846f26 100644 --- a/cmd/zfs_object_agent/zettacache/src/base_types.rs +++ b/cmd/zfs_object_agent/zettacache/src/base_types.rs @@ -1,6 +1,7 @@ use more_asserts::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use std::borrow::Borrow; use std::fmt::*; use std::ops::Add; use std::ops::Sub; @@ -34,15 +35,19 @@ impl BlockId { } } +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] +pub struct DiskId(pub u16); + #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] pub struct DiskLocation { - // note: will need to add disk ID to support multiple disks + pub disk: DiskId, pub offset: u64, } impl Add for DiskLocation { type Output = DiskLocation; fn add(self, rhs: u64) -> Self::Output { DiskLocation { + disk: self.disk, offset: self.offset + rhs, } } @@ -53,6 +58,14 @@ impl Add for DiskLocation { self + rhs as u64 } } +impl Sub for DiskLocation { + type Output = u64; + + fn sub(self, rhs: DiskLocation) -> Self::Output { + assert_eq!(self.disk, rhs.disk); + self.offset - rhs.offset + } +} #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] pub struct Extent { @@ -61,6 +74,15 @@ pub struct Extent { } impl Extent { + pub fn new(disk: DiskId, offset: u64, size: u64) -> Extent { + assert_eq!(offset % 512, 0, "offset {} is not 512-aligned", offset); + assert_eq!(size % 512, 0, "size {} is not 512-aligned", size); + Extent { + location: DiskLocation { disk, offset }, + size, + } + } + pub fn range(&self, relative_offset: u64, size: u64) -> Extent { assert_ge!(self.size, relative_offset + size); Extent { @@ -68,6 +90,20 @@ impl Extent { size, } } + + /// returns true if `sub` is entirely contained within this extent + pub fn contains(&self, sub: &Extent) -> bool { + sub.location.disk == self.location.disk + && sub.location.offset >= self.location.offset + && sub.location.offset + sub.size <= self.location.offset + self.size + } +} + +/// This allows Extents to be compared by their `location`s, ignoring the `size`s. +impl Borrow for Extent { + fn borrow(&self) -> &DiskLocation { + &self.location + } } #[derive(Serialize, Deserialize, Default, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index f7f2ee239d24..87e9b345295e 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -1,30 +1,29 @@ +use crate::base_types::DiskId; use crate::base_types::DiskLocation; use crate::base_types::Extent; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; +use anyhow::Context; +use anyhow::Result; use bincode::Options; use lazy_static::lazy_static; use libc::c_void; use log::*; -use metered::common::*; -use metered::hdr_histogram::AtomicHdrHistogram; -use metered::metered; -use metered::time_source::StdInstantMicros; use nix::errno::Errno; use nix::sys::stat::SFlag; use num::Num; use num::NumCast; use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; +use std::fmt::Debug; +use std::fmt::Display; use std::io::Read; use std::io::Write; use std::os::unix::prelude::AsRawFd; -use std::sync::Arc; +use std::os::unix::prelude::OpenOptionsExt; use std::time::Instant; use tokio::fs::File; -use tokio::fs::OpenOptions; -use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; -use tokio::task::JoinHandle; use util::get_tunable; use util::AlignedBytes; use util::AlignedVec; @@ -32,9 +31,8 @@ use util::From64; lazy_static! { static ref MIN_SECTOR_SIZE: usize = get_tunable("min_sector_size", 512); - static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = get_tunable("disk_write_max_queue_depth", 32); - static ref DISK_WRITE_METADATA_MAX_QUEUE_DEPTH: usize = - get_tunable("disk_write_metadata_max_queue_depth", 32); + pub static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = + get_tunable("disk_write_max_queue_depth", 32); static ref DISK_READ_MAX_QUEUE_DEPTH: usize = get_tunable("disk_read_max_queue_depth", 64); } @@ -48,14 +46,19 @@ struct BlockHeader { #[derive(Debug)] pub struct BlockAccess { - disk: File, + sector_size: usize, + disks: Vec, + readonly: bool, +} + +#[derive(Debug)] +pub struct Disk { + file: File, readonly: bool, size: u64, sector_size: usize, - metrics: BlockAccessMetrics, outstanding_reads: Semaphore, - outstanding_data_writes: Arc, - outstanding_metadata_writes: Arc, + outstanding_writes: Semaphore, } #[derive(Serialize, Deserialize, Debug)] @@ -70,8 +73,6 @@ pub enum EncodeType { Bincode, } -pub struct WritePermit(OwnedSemaphorePermit); - // Generate ioctl function nix::ioctl_read!(ioctl_blkgetsize64, 0x12u8, 114u8, u64); nix::ioctl_read_bad!(ioctl_blksszget, 0x1268, usize); @@ -81,20 +82,21 @@ const CUSTOM_OFLAGS: i32 = libc::O_DIRECT; #[cfg(not(target_os = "linux"))] const CUSTOM_OFLAGS: i32 = 0; -// XXX this is very thread intensive. On Linux, we can use "glommio" to use -// io_uring for much lower overheads. Or SPDK (which can use io_uring or nvme -// hardware directly). -#[metered(registry=BlockAccessMetrics)] -impl BlockAccess { - pub async fn new(disk_path: &str, readonly: bool) -> BlockAccess { - let disk = OpenOptions::new() - .read(true) - .write(!readonly) - .custom_flags(CUSTOM_OFLAGS) - .open(disk_path) - .await - .unwrap(); - let stat = nix::sys::stat::fstat(disk.as_raw_fd()).unwrap(); +impl Disk { + pub fn new(disk_path: &str, readonly: bool) -> Disk { + // Note: using std file open so that this func can be non-async. + // Although this is blocking from a tokio thread, it's used + // infrequently, and we're already blocking from the ioctls below. + let file = tokio::fs::File::from_std( + std::fs::OpenOptions::new() + .read(true) + .write(!readonly) + .custom_flags(CUSTOM_OFLAGS) + .open(disk_path) + .with_context(|| format!("opening disk '{}'", disk_path)) + .unwrap(), + ); + let stat = nix::sys::stat::fstat(file.as_raw_fd()).unwrap(); trace!("stat: {:?}", stat); let mode = SFlag::from_bits_truncate(stat.st_mode); let sector_size; @@ -103,13 +105,13 @@ impl BlockAccess { size = unsafe { let mut cap: u64 = 0; let cap_ptr = &mut cap as *mut u64; - ioctl_blkgetsize64(disk.as_raw_fd(), cap_ptr).unwrap(); + ioctl_blkgetsize64(file.as_raw_fd(), cap_ptr).unwrap(); cap }; sector_size = unsafe { let mut ssz: usize = 0; let ssz_ptr = &mut ssz as *mut usize; - ioctl_blksszget(disk.as_raw_fd(), ssz_ptr).unwrap(); + ioctl_blksszget(file.as_raw_fd(), ssz_ptr).unwrap(); ssz }; } else if mode.contains(SFlag::S_IFREG) { @@ -118,43 +120,68 @@ impl BlockAccess { } else { panic!("{}: invalid file type {:?}", disk_path, mode); } - - let this = BlockAccess { - disk, + let this = Disk { + file, readonly, size, sector_size, - metrics: Default::default(), outstanding_reads: Semaphore::new(*DISK_READ_MAX_QUEUE_DEPTH), - outstanding_data_writes: Arc::new(Semaphore::new(*DISK_WRITE_MAX_QUEUE_DEPTH)), - outstanding_metadata_writes: Arc::new(Semaphore::new( - *DISK_WRITE_METADATA_MAX_QUEUE_DEPTH, - )), + outstanding_writes: Semaphore::new(*DISK_WRITE_MAX_QUEUE_DEPTH), }; info!("opening cache file {}: {:?}", disk_path, this); this } +} + +// XXX this is very thread intensive. On Linux, we can use "glommio" to use +// io_uring for much lower overheads. Or SPDK (which can use io_uring or nvme +// hardware directly). +impl BlockAccess { + pub fn new(disks: Vec, readonly: bool) -> Self { + let sector_size = disks + .iter() + .reduce(|a, b| { + assert_eq!(a.sector_size, b.sector_size); + a + }) + .unwrap() + .sector_size; + BlockAccess { + sector_size, + disks, + readonly, + } + } + + /// Note: In the future we'll support device removal in which case the + /// DiskId's will probably not be sequential. By using this accessor we + /// need not assume anything about the values inside the DiskId's. + pub fn disks(&self) -> impl Iterator { + (0..u16::try_from(self.disks.len()).unwrap()).map(DiskId) + } - pub fn size(&self) -> u64 { - self.size + fn disk(&self, disk: DiskId) -> &Disk { + &self.disks[disk.0 as usize] } - pub fn dump_metrics(&self) { - debug!("metrics: {:#?}", self.metrics); + pub fn disk_size(&self, disk: DiskId) -> u64 { + self.disk(disk).size + } + + pub fn total_capacity(&self) -> u64 { + self.disks().map(|disk| self.disk_size(disk)).sum() } // offset and length must be sector-aligned - #[measure(type = ResponseTime)] - #[measure(InFlight)] - #[measure(Throughput)] - #[measure(HitCount)] pub async fn read_raw(&self, extent: Extent) -> AlignedBytes { - assert_eq!(extent.size, self.round_up_to_sector(extent.size)); - let fd = self.disk.as_raw_fd(); + self.verify_aligned(extent.location.offset); + self.verify_aligned(extent.size); + let disk = self.disk(extent.location.disk); + let fd = disk.file.as_raw_fd(); let sector_size = self.sector_size; let begin = Instant::now(); - let _permit = self.outstanding_reads.acquire().await.unwrap(); + let _permit = disk.outstanding_reads.acquire().await.unwrap(); let bytes = tokio::task::spawn_blocking(move || { let mut v = AlignedVec::with_capacity(usize::from64(extent.size), sector_size); // By using the unsafe libc::pread() instead of @@ -183,83 +210,31 @@ impl BlockAccess { bytes } - // Acquire a permit to write later. This should be used only for data - // writes. See the comment in write_raw() for details. - pub async fn acquire_write(&self) -> WritePermit { + // location.offset and bytes.len() must be sector-aligned. However, + // bytes.alignment() need not be the sector size (it will be copied if not). + pub async fn write_raw(&self, location: DiskLocation, mut bytes: AlignedBytes) { assert!( !self.readonly, "attempting zettacache write in readonly mode" ); - WritePermit( - self.outstanding_data_writes - .clone() - .acquire_owned() - .await - .unwrap(), - ) - } - - // offset and data.len() must be sector-aligned - // maybe this should take Bytes? - #[measure(type = ResponseTime)] - #[measure(InFlight)] - #[measure(Throughput)] - #[measure(HitCount)] - pub async fn write_raw(&self, location: DiskLocation, bytes: AlignedBytes) { - assert!( - !self.readonly, - "attempting zettacache write in readonly mode" - ); - // We need a different semaphore for metadata writes, so that - // outstanding data write permits can't starve/deadlock metadata writes. - // We may block on locks (e.g. waiting on the ZettaCacheState lock) - // while holding a data write permit, but we can't while holding a - // metadata write permit. - let permit = WritePermit( - self.outstanding_metadata_writes - .clone() - .acquire_owned() - .await - .unwrap(), - ); - self.write_raw_permit(permit, location, bytes) - .await - .unwrap(); - } - - // offset and data.len() must be sector-aligned - // maybe this should take Bytes? - #[measure(type = ResponseTime)] - #[measure(InFlight)] - #[measure(Throughput)] - #[measure(HitCount)] - pub fn write_raw_permit( - &self, - permit: WritePermit, - location: DiskLocation, - mut bytes: AlignedBytes, - ) -> JoinHandle<()> { - assert!( - !self.readonly, - "attempting zettacache write in readonly mode" - ); - // directio requires the pointer to be sector-aligned - let fd = self.disk.as_raw_fd(); + let disk = self.disk(location.disk); + let fd = disk.file.as_raw_fd(); let length = bytes.len(); let offset = location.offset; let alignment = bytes.alignment(); - assert_eq!(offset, self.round_up_to_sector(offset)); - assert_eq!(length, self.round_up_to_sector(length)); + self.verify_aligned(offset); + self.verify_aligned(length); + // directio requires the pointer to be sector-aligned if alignment != self.round_up_to_sector(alignment) { // XXX copying, this happens for AlignedBytes created from a plain Bytes bytes = AlignedBytes::copy_from_slice(&bytes, self.sector_size) } assert_eq!(bytes.as_ptr() as usize % self.sector_size, 0); let begin = Instant::now(); + let _permit = disk.outstanding_writes.acquire().await.unwrap(); tokio::task::spawn_blocking(move || { nix::sys::uio::pwrite(fd, &bytes, i64::try_from(offset).unwrap()).unwrap(); - drop(permit); trace!( "write({:?} len={}) returned in {}us", location, @@ -267,6 +242,8 @@ impl BlockAccess { begin.elapsed().as_micros() ); }) + .await + .unwrap(); } pub fn round_up_to_sector(&self, n: N) -> N { @@ -274,6 +251,17 @@ impl BlockAccess { (n + sector_size - N::one()) / sector_size * sector_size } + pub fn verify_aligned(&self, n: N) { + let sector_size: N = NumCast::from(self.sector_size).unwrap(); + assert_eq!( + n % sector_size, + N::zero(), + "{} is not sector-aligned ({})", + n, + sector_size + ); + } + // XXX ideally this would return a sector-aligned address, so it can be used directly for a directio write pub fn chunk_to_raw(&self, encoding: EncodeType, struct_obj: &T) -> AlignedBytes { let (payload, compression) = match encoding { diff --git a/cmd/zfs_object_agent/zettacache/src/block_allocator.rs b/cmd/zfs_object_agent/zettacache/src/block_allocator.rs index df24ed3510af..6b7a2a567a3f 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_allocator.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_allocator.rs @@ -3,14 +3,17 @@ use crate::extent_allocator::{ExtentAllocator, ExtentAllocatorBuilder}; use crate::space_map::{SpaceMap, SpaceMapEntry, SpaceMapPhys}; use crate::zettacache::DEFAULT_SLAB_SIZE; use crate::{base_types::*, DumpSlabsOptions}; +use bimap::BiBTreeMap; use lazy_static::lazy_static; use log::*; use more_asserts::*; +use rand::seq::SliceRandom; +use rand::thread_rng; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use std::cmp::{self, min}; use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::ops::Bound::*; +use std::ops::{Add, Bound::*, Sub}; use std::sync::Arc; use std::time::Instant; use std::{fmt, mem}; @@ -37,6 +40,20 @@ impl SlabId { } } +impl Add for SlabId { + type Output = SlabId; + fn add(self, rhs: u64) -> SlabId { + SlabId(self.0 + rhs) + } +} + +impl Sub for SlabId { + type Output = u64; + fn sub(self, rhs: SlabId) -> u64 { + self.0 - rhs.0 + } +} + #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct SlabGeneration(u64); impl SlabGeneration { @@ -52,11 +69,11 @@ trait SlabTrait { fn free(&mut self, extent: Extent); fn flush_to_spacemap(&mut self, spacemap: &mut SpaceMap); fn condense_to_spacemap(&self, spacemap: &mut SpaceMap); - fn get_max_size(&self) -> u32; - fn get_free_space(&self) -> u64; - fn get_allocated_space(&self) -> u64; - fn get_phys_type(&self) -> SlabPhysType; - fn get_num_segments(&self) -> u64; + fn max_size(&self) -> u32; + fn free_space(&self) -> u64; + fn allocated_space(&self) -> u64; + fn phys_type(&self) -> SlabPhysType; + fn num_segments(&self) -> u64; fn dump_info(&self); } @@ -67,17 +84,12 @@ struct BitmapSlab { total_slots: u32, slot_size: u32, - slab_location: DiskLocation, + location: DiskLocation, } impl BitmapSlab { - fn new_slab( - id: SlabId, - generation: SlabGeneration, - slab_location: DiskLocation, - slab_size: u32, - block_size: u32, - ) -> Slab { + fn new_slab(id: SlabId, generation: SlabGeneration, extent: Extent, block_size: u32) -> Slab { + let slab_size: u32 = extent.size.try_into().unwrap(); let free_slots = slab_size / block_size; let mut allocatable = RoaringBitmap::new(); @@ -93,32 +105,30 @@ impl BitmapSlab { freeing: Default::default(), total_slots: free_slots, slot_size: block_size, - slab_location, + location: extent.location, }), ) } - fn slot_to_offset(&self, slot: u32) -> u64 { - self.slab_location.offset + u64::from(slot * self.slot_size) + fn slot_to_offset(&self, slot: u32) -> DiskLocation { + self.location + u64::from(slot * self.slot_size) } fn slab_end(&self) -> DiskLocation { - DiskLocation { - offset: self.slot_to_offset(self.total_slots), - } + self.slot_to_offset(self.total_slots) } - fn verify_slab_extent(&self, extent: Extent) { - assert_eq!(extent.size % u64::from(self.get_max_size()), 0); - assert_ge!(extent.location, self.slab_location); + fn verify_contains(&self, extent: Extent) { + assert_eq!(extent.location.disk, self.location.disk); + assert_eq!(extent.size % u64::from(self.max_size()), 0); + assert_ge!(extent.location, self.location); assert_le!(extent.location + extent.size, self.slab_end()); } fn import_extent_impl(&mut self, extent: Extent, is_alloc: bool) { - self.verify_slab_extent(extent); + self.verify_contains(extent); - let internal_offset = - u32::try_from(extent.location.offset - self.slab_location.offset).unwrap(); + let internal_offset = u32::try_from(extent.location.offset - self.location.offset).unwrap(); assert_eq!(internal_offset % self.slot_size, 0); let num_slots = u32::try_from(extent.size).unwrap() / self.slot_size; assert_ge!(num_slots, 1); @@ -174,18 +184,15 @@ impl SlabTrait for BitmapSlab { assert!(!self.freeing.contains(slot)); Some(Extent { - location: DiskLocation { - offset: self.slot_to_offset(slot), - }, + location: self.slot_to_offset(slot), size: self.slot_size.into(), }) } fn free(&mut self, extent: Extent) { - self.verify_slab_extent(extent); + self.verify_contains(extent); - let internal_offset = - u32::try_from(extent.location.offset - self.slab_location.offset).unwrap(); + let internal_offset = u32::try_from(extent.location.offset - self.location.offset).unwrap(); assert_eq!(internal_offset % self.slot_size, 0); let slot = internal_offset / self.slot_size; @@ -208,19 +215,19 @@ impl SlabTrait for BitmapSlab { // checkpoint period. for (first, last) in self.allocating.iter_ranges() { assert_ge!(last, first); - spacemap.alloc( - self.slot_to_offset(first), - u64::from((last - first + 1) * self.slot_size), - ); + spacemap.alloc(Extent { + location: self.slot_to_offset(first), + size: u64::from((last - first + 1) * self.slot_size), + }); } self.allocating.clear(); for (first, last) in self.freeing.iter_ranges() { assert_ge!(last, first); - spacemap.free( - self.slot_to_offset(first), - u64::from((last - first + 1) * self.slot_size), - ); + spacemap.free(Extent { + location: self.slot_to_offset(first), + size: u64::from((last - first + 1) * self.slot_size), + }); } // Space freed during this checkpoint is now available for reallocation. for slot in self.freeing.iter() { @@ -237,10 +244,10 @@ impl SlabTrait for BitmapSlab { let mut written_slots = 0; for (first, last) in self.allocatable.iter_inverse_ranges(self.total_slots) { assert_ge!(last, first); - spacemap.alloc( - self.slot_to_offset(first), - u64::from((last - first + 1) * self.slot_size), - ); + spacemap.alloc(Extent { + location: self.slot_to_offset(first), + size: u64::from((last - first + 1) * self.slot_size), + }); written_slots += u64::from(last - first + 1); } assert_eq!( @@ -253,26 +260,26 @@ impl SlabTrait for BitmapSlab { // from the allocating bitmap as free. The latter is because these // entries will be later marked as allocated in flush_to_spacemap(). for (first, last) in self.allocating.iter_ranges() { - spacemap.free( - self.slot_to_offset(first), - u64::from((last - first + 1) * self.slot_size), - ); + spacemap.free(Extent { + location: self.slot_to_offset(first), + size: u64::from((last - first + 1) * self.slot_size), + }); } } - fn get_max_size(&self) -> u32 { + fn max_size(&self) -> u32 { self.slot_size } - fn get_free_space(&self) -> u64 { + fn free_space(&self) -> u64 { self.allocatable.len() * u64::from(self.slot_size) } - fn get_allocated_space(&self) -> u64 { + fn allocated_space(&self) -> u64 { (u64::from(self.total_slots) - self.allocatable.len()) * u64::from(self.slot_size) } - fn get_phys_type(&self) -> SlabPhysType { + fn phys_type(&self) -> SlabPhysType { SlabPhysType::BitmapBased { block_size: self.slot_size, } @@ -282,7 +289,7 @@ impl SlabTrait for BitmapSlab { let used_slots = self.total_slots - u32::try_from(self.allocatable.len()).unwrap(); println!( "slab_offset: {} slot_size: {} slots_used: {}/{} utilization: {}%", - self.slab_location.offset, + self.location.offset, nice_p2size(u64::from(self.slot_size)), used_slots, self.total_slots, @@ -292,9 +299,10 @@ impl SlabTrait for BitmapSlab { let first_offset = self.slot_to_offset(first); let last_offset = self.slot_to_offset(last + 1); println!( - "\tALLOC offset: [{}, {}) length: {} - slots: [{}, {}) count: {}", - first_offset, - last_offset, + "\tALLOC {:?} offset: [{}, {}) length: {} - slots: [{}, {}) count: {}", + first_offset.disk, + first_offset.offset, + last_offset.offset, nice_p2size(last_offset - first_offset), first, last + 1, @@ -304,7 +312,7 @@ impl SlabTrait for BitmapSlab { println!(); } - fn get_num_segments(&self) -> u64 { + fn num_segments(&self) -> u64 { self.allocatable .iter_inverse_ranges(self.total_slots) .count() as u64 @@ -319,19 +327,18 @@ struct ExtentSlab { total_space: u64, max_allowed_alloc_size: u32, - slab_location: DiskLocation, + location: DiskLocation, } impl ExtentSlab { fn new_slab( id: SlabId, generation: SlabGeneration, - slab_location: DiskLocation, - slab_size: u32, + extent: Extent, max_allowed_alloc_size: u32, ) -> Slab { let mut allocatable: RangeTree = Default::default(); - allocatable.add(slab_location.offset, slab_size.into()); + allocatable.add(extent.location.offset, extent.size); Slab::new( id, generation, @@ -340,43 +347,38 @@ impl ExtentSlab { allocating: Default::default(), freeing: Default::default(), last_location: 0, - total_space: slab_size.into(), + total_space: extent.size, max_allowed_alloc_size, - slab_location, + location: extent.location, }), ) } fn verify_slab_extent(&self, extent: Extent) { - assert_ge!(extent.location, self.slab_location); + assert_ge!(extent.location, self.location); assert_le!( extent.location + extent.size, - self.slab_location + self.total_space + self.location + self.total_space ); } - fn allocate_impl(&mut self, size: u64, min_location: u64, max_location: u64) -> Option { + fn allocate_impl(&mut self, size: u64, min_offset: u64, max_offset: u64) -> Option { for (&allocatable_offset, &allocatable_size) in - self.allocatable.range(min_location..max_location) + self.allocatable.range(min_offset..max_offset) { if allocatable_size >= size { self.freeing.verify_absent(allocatable_offset, size); self.allocatable.remove(allocatable_offset, size); self.allocating.add(allocatable_offset, size); self.last_location = allocatable_offset + size; - return Some(Extent { - location: DiskLocation { - offset: allocatable_offset, - }, - size, - }); + return Some(Extent::new(self.location.disk, allocatable_offset, size)); } } None } fn slab_end(&self) -> DiskLocation { - self.slab_location + self.total_space + self.location + self.total_space } } @@ -392,7 +394,7 @@ impl SlabTrait for ExtentSlab { } fn allocate(&mut self, size: u32) -> Option { - assert_le!(size, self.get_max_size()); + assert_le!(size, self.max_size()); let request_size = u64::from(size); // find next segment where this fits match self.allocate_impl(request_size, self.last_location, u64::MAX) { @@ -415,6 +417,8 @@ impl SlabTrait for ExtentSlab { self.allocating.verify_space(); self.allocatable.verify_space(); + let disk = self.location.disk; + // It could happen that a segment was allocated and then freed within // the same checkpoint period at which point it would be part of both // `allocating` and `freeing` sets. For this reason we always record @@ -423,25 +427,27 @@ impl SlabTrait for ExtentSlab { // checkpoint period. for (&start, &size) in self.allocating.iter() { self.allocatable.verify_absent(start, size); - spacemap.alloc(start, size); + spacemap.alloc(Extent::new(disk, start, size)); } self.allocating.clear(); // Space freed during this checkpoint is now available for reallocation. for (&start, &size) in self.freeing.iter() { self.allocating.verify_absent(start, size); - spacemap.free(start, size); + spacemap.free(Extent::new(disk, start, size)); self.allocatable.add(start, size); } self.freeing.clear(); } fn condense_to_spacemap(&self, spacemap: &mut SpaceMap) { + let disk = self.location.disk; + for (offset, size) in self .allocatable - .iter_inverse(self.slab_location.offset, self.slab_end().offset) + .iter_inverse(self.location.offset, self.slab_end().offset) { - spacemap.alloc(offset, size); + spacemap.alloc(Extent::new(disk, offset, size)); } // In our attempt to make this independent of flush_to_spacemap(), we do @@ -450,23 +456,23 @@ impl SlabTrait for ExtentSlab { // will be later marked as allocated in flush_to_spacemap(). for (&start, &size) in self.allocating.iter() { self.allocatable.verify_absent(start, size); - spacemap.free(start, size); + spacemap.free(Extent::new(disk, start, size)); } } - fn get_max_size(&self) -> u32 { + fn max_size(&self) -> u32 { self.max_allowed_alloc_size } - fn get_free_space(&self) -> u64 { + fn free_space(&self) -> u64 { self.allocatable.space() } - fn get_allocated_space(&self) -> u64 { - self.total_space - self.get_free_space() + fn allocated_space(&self) -> u64 { + self.total_space - self.free_space() } - fn get_phys_type(&self) -> SlabPhysType { + fn phys_type(&self) -> SlabPhysType { SlabPhysType::ExtentBased { max_size: self.max_allowed_alloc_size, } @@ -475,14 +481,14 @@ impl SlabTrait for ExtentSlab { fn dump_info(&self) { println!( "slab_offset: {} max_allowed_alloc_size: {} allocated_bytes: {} utilization: {}%", - self.slab_location.offset, + self.location.offset, nice_p2size(u64::from(self.max_allowed_alloc_size)), nice_p2size(self.total_space - self.allocatable.space()), ((self.total_space - self.allocatable.space()) * 100) / self.total_space ); for (offset, size) in self .allocatable - .iter_inverse(self.slab_location.offset, self.slab_end().offset) + .iter_inverse(self.location.offset, self.slab_end().offset) { println!( "\tALLOC offset: [{} {}) length: {}", @@ -494,33 +500,20 @@ impl SlabTrait for ExtentSlab { println!(); } - fn get_num_segments(&self) -> u64 { + fn num_segments(&self) -> u64 { self.allocatable - .iter_inverse(self.slab_location.offset, self.slab_end().offset) + .iter_inverse(self.location.offset, self.slab_end().offset) .count() as u64 } } struct FreeSlab { - slab_location: DiskLocation, - slab_size: u32, + extent: Extent, } impl FreeSlab { - fn new_slab( - id: SlabId, - generation: SlabGeneration, - slab_location: DiskLocation, - slab_size: u32, - ) -> Slab { - Slab::new( - id, - generation, - SlabType::Free(FreeSlab { - slab_location, - slab_size, - }), - ) + fn new_slab(id: SlabId, generation: SlabGeneration, extent: Extent) -> Slab { + Slab::new(id, generation, SlabType::Free(FreeSlab { extent })) } } @@ -552,28 +545,28 @@ impl SlabTrait for FreeSlab { // Nothing to condense for free slabs } - fn get_max_size(&self) -> u32 { + fn max_size(&self) -> u32 { panic!("free slab doesn't have a maximum allocation size"); } - fn get_free_space(&self) -> u64 { - u64::from(self.slab_size) + fn free_space(&self) -> u64 { + self.extent.size } - fn get_allocated_space(&self) -> u64 { + fn allocated_space(&self) -> u64 { 0 } - fn get_phys_type(&self) -> SlabPhysType { + fn phys_type(&self) -> SlabPhysType { SlabPhysType::Free } fn dump_info(&self) { - println!("slab_offset: {}", self.slab_location.offset); + println!("{:?}", self.extent); println!(); } - fn get_num_segments(&self) -> u64 { + fn num_segments(&self) -> u64 { 0 } } @@ -656,28 +649,32 @@ impl Slab { .with_trait_mut(|t| t.condense_to_spacemap(spacemap)); } - fn get_max_size(&self) -> u32 { - self.info.with_trait(|t| t.get_max_size()) + fn max_size(&self) -> u32 { + self.info.with_trait(|t| t.max_size()) } - fn get_free_space(&self) -> u64 { - self.info.with_trait(|t| t.get_free_space()) + fn free_space(&self) -> u64 { + self.info.with_trait(|t| t.free_space()) + } + + fn allocated_space(&self) -> u64 { + self.info.with_trait(|t| t.allocated_space()) } - fn get_allocated_space(&self) -> u64 { - self.info.with_trait(|t| t.get_allocated_space()) + fn num_segments(&self) -> u64 { + self.info.with_trait(|t| t.num_segments()) } fn get_phys(&self) -> SlabPhys { SlabPhys { generation: self.generation, - slab_type: self.info.with_trait(|t| t.get_phys_type()), + slab_type: self.info.with_trait(|t| t.phys_type()), } } fn to_sorted_slab_entry(&self) -> SortedSlabEntry { SortedSlabEntry { - allocated_space: self.get_allocated_space(), + allocated_space: self.allocated_space(), slab_id: self.id, } } @@ -686,10 +683,6 @@ impl Slab { println!("{:?} {:?}", self.id, self.generation); self.info.with_trait(|t| t.dump_info()); } - - fn get_num_segments(&self) -> u64 { - self.info.with_trait(|t| t.get_num_segments()) - } } #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -808,14 +801,14 @@ impl Slabs { } async fn open( + capacity: &BiBTreeMap, spacemap: &SpaceMap, spacemap_next: &SpaceMap, slab_size: u32, slabs_phys: &TerseVec, ) -> Self { - let coverage = spacemap.get_coverage(); - let num_slabs = usize::from64(coverage.size / u64::from(slab_size)); - assert_eq!(num_slabs, slabs_phys.0.len()); + let mut extent_iter = capacity.iter().map(|(&extent, _)| extent); + let mut current_extent = extent_iter.next().unwrap(); let mut slabs = Slabs( slabs_phys @@ -824,67 +817,46 @@ impl Slabs { .enumerate() .map(|(slab_id, phys_slab)| { let sid = SlabId(slab_id as u64); - let slab_location = DiskLocation { - offset: coverage.location.offset - + (u64::from(slab_size) * num_slabs as u64) - - (sid.0 + 1) * u64::from(slab_size), - }; + + if current_extent.size < slab_size.into() { + current_extent = extent_iter.next().unwrap(); + } + + let slab_extent = current_extent.range(0, slab_size.into()); + current_extent = current_extent + .range(slab_size.into(), current_extent.size - u64::from(slab_size)); + match phys_slab.slab_type { - SlabPhysType::BitmapBased { block_size } => BitmapSlab::new_slab( - sid, - phys_slab.generation, - slab_location, - slab_size, - block_size, - ), - SlabPhysType::ExtentBased { max_size } => ExtentSlab::new_slab( - sid, - phys_slab.generation, - slab_location, - slab_size, - max_size, - ), + SlabPhysType::BitmapBased { block_size } => { + BitmapSlab::new_slab(sid, phys_slab.generation, slab_extent, block_size) + } + SlabPhysType::ExtentBased { max_size } => { + ExtentSlab::new_slab(sid, phys_slab.generation, slab_extent, max_size) + } SlabPhysType::Free => { - FreeSlab::new_slab(sid, phys_slab.generation, slab_location, slab_size) + FreeSlab::new_slab(sid, phys_slab.generation, slab_extent) } } }) .collect(), ); - assert_eq!(slabs.0.len(), num_slabs); - let mut slab_import_generations = vec![SlabGeneration(0); num_slabs]; + // There should be no leftover capacity; it should have all been consumed by the slabs_phys. + assert_lt!(current_extent.size, slab_size.into()); + assert!(extent_iter.next().is_none()); + + let mut slab_import_generations = vec![SlabGeneration(0); slabs.0.len()]; let mut import_cb = |entry| match entry { SpaceMapEntry::Alloc(extent) => { - let extent = Extent { - location: DiskLocation { - offset: extent.offset, - }, - size: extent.size, - }; - let slab_id = BlockAllocator::slab_id_from_extent_impl( - coverage.location.offset, - slab_size, - num_slabs as u64, - extent, - ); + let slab_id = + BlockAllocator::slab_id_from_extent_impl(capacity, slab_size.into(), extent); if slabs.get(slab_id).generation == slab_import_generations[slab_id.as_index()] { slabs.get_mut(slab_id).import_alloc(extent) } } SpaceMapEntry::Free(extent) => { - let extent = Extent { - location: DiskLocation { - offset: extent.offset, - }, - size: extent.size, - }; - let slab_id = BlockAllocator::slab_id_from_extent_impl( - coverage.location.offset, - slab_size, - num_slabs as u64, - extent, - ); + let slab_id = + BlockAllocator::slab_id_from_extent_impl(capacity, slab_size.into(), extent); if slabs.get(slab_id).generation == slab_import_generations[slab_id.as_index()] { slabs.get_mut(slab_id).import_free(extent) } @@ -905,7 +877,7 @@ impl Slabs { } pub struct BlockAllocator { - coverage: Extent, + capacity: BiBTreeMap, slab_size: u32, // # Spacemap Condensing - Design Overview @@ -983,9 +955,19 @@ impl BlockAllocator { extent_allocator.clone(), phys.spacemap_next, ); - let coverage = spacemap.get_coverage(); let slab_size = phys.slab_size; - let slabs = Slabs::open(&spacemap, &spacemap_next, slab_size, &phys.slabs).await; + let capacity: BiBTreeMap = { + let mut id = SlabId(0); + phys.capacity + .into_iter() + .map(|extent| { + let start = id; + id = id + extent.size / u64::from(slab_size); + (extent, start) + }) + .collect() + }; + let slabs = Slabs::open(&capacity, &spacemap, &spacemap_next, slab_size, &phys.slabs).await; let mut available_space = 0u64; let mut free_slabs = Vec::new(); @@ -995,10 +977,10 @@ impl BlockAllocator { match &slab.info { SlabType::BitmapBased(_) | SlabType::ExtentBased(_) => { slabs_by_bucket - .entry(slab.get_max_size()) + .entry(slab.max_size()) .or_default() .push(slab.to_sorted_slab_entry()); - available_space += slab.get_free_space(); + available_space += slab.free_space(); } SlabType::Free(_) => { free_slabs.push(slab.id); @@ -1006,6 +988,8 @@ impl BlockAllocator { } } } + // So that we'll hit multiple disks. + free_slabs.shuffle(&mut thread_rng()); let slab_buckets = SlabAllocationBuckets::new(phys.slab_buckets, slabs_by_bucket); @@ -1015,7 +999,7 @@ impl BlockAllocator { ); BlockAllocator { - coverage, + capacity, slab_size, spacemap, spacemap_next, @@ -1039,14 +1023,13 @@ impl BlockAllocator { } fn allocate_from_new_slab(&mut self, request_size: u32) -> Option { - let slab_size = self.slab_size; let new_id = match self.free_slabs.pop() { Some(id) => id, None => { return None; } }; - let slab_location = self.slab_location_from_slab_id(new_id); + let extent = self.slab_extent_from_id(new_id); let slab_next_generation = self.slabs.get(new_id).generation.next(); let (&max_allocation_size, sorted_slabs) = self @@ -1054,21 +1037,9 @@ impl BlockAllocator { .get_bucket_for_allocation_size(request_size); let mut new_slab = if sorted_slabs.is_extent_based { - ExtentSlab::new_slab( - new_id, - slab_next_generation, - slab_location, - slab_size, - max_allocation_size, - ) + ExtentSlab::new_slab(new_id, slab_next_generation, extent, max_allocation_size) } else { - BitmapSlab::new_slab( - new_id, - slab_next_generation, - slab_location, - slab_size, - max_allocation_size, - ) + BitmapSlab::new_slab(new_id, slab_next_generation, extent, max_allocation_size) }; let target_spacemap = if self.next_slab_to_condense <= new_id { &mut self.spacemap @@ -1083,11 +1054,7 @@ impl BlockAllocator { assert!(matches!(self.slabs.get(new_id).info, SlabType::Free(_))); *self.slabs.get_mut(new_id) = new_slab; self.dirty_slab_id(new_id); - trace!( - "BLOCK-ALLOCATOR: {:?} added to {} byte bucket", - new_id, - max_allocation_size, - ); + trace!("{:?} added to {} byte bucket", new_id, max_allocation_size,); self.available_space -= extent.unwrap().size; extent } @@ -1097,10 +1064,7 @@ impl BlockAllocator { // Note: we assume allocation sizes are guaranteed to be aligned // from the caller for now. - assert_eq!( - request_size, - self.block_access.round_up_to_sector(request_size) - ); + self.block_access.verify_aligned(request_size); let (&max_allocation_size, sorted_slabs) = self .slab_buckets @@ -1134,7 +1098,7 @@ impl BlockAllocator { Some(id) => match self.slabs.get_mut(id).allocate(request_size) { Some(extent) => { trace!( - "BLOCK-ALLOCATOR: satisfied {} byte allocation request: {:?}", + "satisfied {} byte allocation request: {:?}", request_size, extent ); @@ -1145,7 +1109,7 @@ impl BlockAllocator { None => { let debug = sorted_slabs.advance(); trace!( - "BLOCK-ALLOCATOR: advance slab bucket {} cursor to {:?}", + "advance slab bucket {} cursor to {:?}", max_allocation_size, debug ); @@ -1154,7 +1118,7 @@ impl BlockAllocator { None => match self.allocate_from_new_slab(request_size) { Some(extent) => { trace!( - "BLOCK-ALLOCATOR: satisfied {} byte allocation request: {:?}", + "satisfied {} byte allocation request: {:?}", request_size, extent ); @@ -1162,7 +1126,7 @@ impl BlockAllocator { } None => { trace!( - "BLOCK-ALLOCATOR: allocation of {} bytes failed; no free slabs left; {} slabs used for {} byte bucket", + "allocation of {} bytes failed; no free slabs left; {} slabs used for {} byte bucket", request_size, slabs_in_bucket, max_allocation_size @@ -1175,11 +1139,9 @@ impl BlockAllocator { } pub fn free(&mut self, extent: Extent) { - assert_eq!( - extent.size, - self.block_access.round_up_to_sector(extent.size) - ); - trace!("BLOCK-ALLOCATOR: free request: {:?}", extent); + self.block_access.verify_aligned(extent.location.offset); + self.block_access.verify_aligned(extent.size); + trace!("free request: {:?}", extent); let slab_id = self.slab_id_from_extent(extent); self.slabs.get_mut(slab_id).free(extent); @@ -1197,7 +1159,7 @@ impl BlockAllocator { (self.slabs.0.len() - self.next_slab_to_condense.as_index()) as u64, ); trace!( - "BLOCK-ALLOCATOR: condensing the next {} slabs starting from {:?}", + "condensing the next {} slabs starting from {:?}", slabs_to_condense, self.next_slab_to_condense ); @@ -1214,26 +1176,23 @@ impl BlockAllocator { } assert_lt!(self.next_slab_to_condense.as_index(), self.slabs.0.len()); trace!( - "BLOCK-ALLOCATOR: spacemap has {} alloc and {} total entries", - self.spacemap.get_alloc_entries(), - self.spacemap.get_total_entries() + "spacemap has {} alloc and {} total entries", + self.spacemap.alloc_entries(), + self.spacemap.total_entries() ); trace!( - "BLOCK-ALLOCATOR: spacemap_next has {} alloc and {} total entries", - self.spacemap_next.get_alloc_entries(), - self.spacemap_next.get_total_entries() + "spacemap_next has {} alloc and {} total entries", + self.spacemap_next.alloc_entries(), + self.spacemap_next.total_entries() ); // Flush any dirty slabs. If any slab is completely empty mark it as free. // Keep track of the buckets/SortedSlabs sets that these dirty slabs belong // to so later we can update their slab order by freeness. - trace!( - "BLOCK-ALLOCATOR: flushing {} dirty slabs", - self.dirty_slabs.len() - ); - let num_slabs = self.slabs.0.len(); + trace!("flushing {} dirty slabs", self.dirty_slabs.len()); let mut dirty_buckets = HashSet::new(); for slab_id in std::mem::take(&mut self.dirty_slabs) { + let extent = self.slab_extent_from_id(slab_id); let slab = self.slabs.get_mut(slab_id); let target_spacemap = if self.next_slab_to_condense <= slab_id { &mut self.spacemap @@ -1241,23 +1200,18 @@ impl BlockAllocator { &mut self.spacemap_next }; slab.flush_to_spacemap(target_spacemap); - dirty_buckets.insert(slab.get_max_size()); - if slab.get_free_space() == u64::from(self.slab_size) { + dirty_buckets.insert(slab.max_size()); + if slab.free_space() == u64::from(self.slab_size) { self.free_slabs.push(slab.id); - *slab = FreeSlab::new_slab( - slab_id, - slab.generation.next(), - slab_location_from_slab_id(self.slab_size, num_slabs, self.coverage, slab_id), - self.slab_size, - ); + *slab = FreeSlab::new_slab(slab_id, slab.generation.next(), extent); target_spacemap.mark_generation(slab.id, slab.generation); } } - trace!( - "BLOCK-ALLOCATOR: allocation buckets to be resorted: {:?}", - dirty_buckets - ); + // So that we'll hit multiple disks. + self.free_slabs.shuffle(&mut thread_rng()); + + trace!("allocation buckets to be resorted: {:?}", dirty_buckets); // Update any buckets which we've performed any allocations/frees during // this checkpoint by recreating their SortedSlabs (which in turn @@ -1278,10 +1232,14 @@ impl BlockAllocator { self.available_space += self.freeing_space; self.freeing_space = 0; + let (spacemap, spacemap_next) = + futures::future::join(self.spacemap.flush(), self.spacemap_next.flush()).await; + let phys = BlockAllocatorPhys { + capacity: self.capacity.iter().map(|(&extent, _)| extent).collect(), slab_size: self.slab_size, - spacemap: self.spacemap.flush().await, - spacemap_next: self.spacemap_next.flush().await, + spacemap, + spacemap_next, next_slab_to_condense: self.next_slab_to_condense, slabs: self .slabs @@ -1306,85 +1264,57 @@ impl BlockAllocator { phys } - pub fn get_available(&self) -> u64 { + pub fn available(&self) -> u64 { self.available_space } - pub fn get_freeing(&self) -> u64 { + pub fn freeing(&self) -> u64 { self.freeing_space } pub fn size(&self) -> u64 { - self.coverage.size + self.capacity.iter().map(|(extent, _)| extent.size).sum() } - // - // |----------------| Device Offset 0 - // |... metadata ...| - // |----------------| coverage.offset - // | | - // | ...... | .... - // | | - // |----------------| - // | | Slab n - // |----------------| - // | ...... | .... - // |----------------| - // | | Slab 1 - // |----------------| - // | | Slab 0 - // |----------------| coverage.offset + (slab_size * slabs.len()) - // | ...... | .... - // fn slab_id_from_extent_impl( - coverage_start: u64, - slab_size: u32, - num_slabs: u64, + capacity: &BiBTreeMap, + slab_size: u64, extent: Extent, ) -> SlabId { - let id = - (num_slabs - 1) - ((extent.location.offset - coverage_start) / u64::from(slab_size)); - assert_lt!(id, num_slabs); - SlabId(id) + let (capacity_extent, &capacity_slab) = capacity + .left_range((Unbounded, Included(extent.location))) + .next_back() + .unwrap(); + + assert!(capacity_extent.contains(&extent)); + capacity_slab + ((extent.location - capacity_extent.location) / slab_size) } fn slab_id_from_extent(&self, extent: Extent) -> SlabId { let slab_size64 = u64::from(self.slab_size); - let num_slabs = self.slabs.0.len() as u64; assert_le!(extent.size, slab_size64); - let slab_id = BlockAllocator::slab_id_from_extent_impl( - self.coverage.location.offset, - self.slab_size, - num_slabs, - extent, - ); + let slab_id = BlockAllocator::slab_id_from_extent_impl(&self.capacity, slab_size64, extent); - // check all boundaries now before proceeding - let slab_location = self.slab_location_from_slab_id(slab_id); - assert_ge!(extent.location, slab_location); - assert_lt!(extent.location, slab_location + slab_size64); - assert_le!(extent.location + extent.size, slab_location + slab_size64); + assert_lt!(slab_id.0, self.slabs.0.len() as u64); + assert!(self.slab_extent_from_id(slab_id).contains(&extent)); slab_id } - fn slab_location_from_slab_id(&self, slab_id: SlabId) -> DiskLocation { - slab_location_from_slab_id(self.slab_size, self.slabs.0.len(), self.coverage, slab_id) + fn slab_extent_from_id(&self, slab_id: SlabId) -> Extent { + let (containing_extent, &extent_slab) = self + .capacity + .right_range((Unbounded, Included(slab_id))) + .next_back() + .unwrap(); + containing_extent.range( + (slab_id - extent_slab) * u64::from(self.slab_size), + self.slab_size.into(), + ) } } -fn slab_location_from_slab_id( - slab_size: u32, - num_slabs: usize, - allocator_coverage: Extent, - slab_id: SlabId, -) -> DiskLocation { - let slab_size64 = u64::from(slab_size); - let num_slabs64 = num_slabs as u64; - allocator_coverage.location + ((slab_size64 * num_slabs64) - (slab_id.0 + 1) * slab_size64) -} - #[derive(Debug, Serialize, Deserialize, Clone)] enum SlabPhysType { BitmapBased { block_size: u32 }, @@ -1433,32 +1363,45 @@ pub struct BlockAllocatorPhys { spacemap_next: SpaceMapPhys, next_slab_to_condense: SlabId, + capacity: Vec, + // TODO: if this is too big to be writing every checkpoint, // we could use a BlockBasedLog<(SlabId, SlabPhysType)> + // Note: slabs are located within the `capacity` in the order given slabs: TerseVec, slab_buckets: SlabAllocationBucketsPhys, } impl OnDisk for BlockAllocatorPhys {} impl BlockAllocatorPhys { - // TODO: eventually change this to indicate the size of each - // of the disks that we're managing - pub fn new(offset: u64, size: u64) -> BlockAllocatorPhys { + pub fn new(capacity: Vec) -> BlockAllocatorPhys { let slab_size = *DEFAULT_SLAB_SIZE; - let num_slabs = size / u64::from(slab_size); + let slab_size64 = u64::from(slab_size); + + // Truncate each extent to a multiple of slab_size + let capacity: Vec = capacity + .iter() + .map(|extent| extent.range(0, extent.size / slab_size64 * slab_size64)) + .collect(); let slabs = vec![ SlabPhys { generation: SlabGeneration(0), slab_type: SlabPhysType::Free }; - usize::from64(num_slabs) + usize::from64( + capacity + .iter() + .map(|extent| extent.size / slab_size64) + .sum() + ) ]; BlockAllocatorPhys { slab_size, - spacemap: SpaceMapPhys::new(offset, size), - spacemap_next: SpaceMapPhys::new(offset, size), + spacemap: SpaceMapPhys::new(), + spacemap_next: SpaceMapPhys::new(), next_slab_to_condense: SlabId(0), + capacity, slabs: slabs.into(), slab_buckets: DEFAULT_SLAB_BUCKETS.clone(), } @@ -1469,16 +1412,16 @@ impl BlockAllocatorPhys { self.spacemap_next.claim(builder); } - pub fn coverage(&self) -> Extent { - self.spacemap.coverage() + pub fn capacity(&self) -> Vec { + self.capacity.clone() } pub fn spacemap_bytes(&self) -> u64 { - self.spacemap.len_bytes() + self.spacemap.bytes() } pub fn spacemap_next_bytes(&self) -> u64 { - self.spacemap_next.len_bytes() + self.spacemap_next.bytes() } pub fn spacemap_capacity_bytes(&self) -> u64 { @@ -1531,7 +1474,18 @@ async fn zcachedb_load_slab_state( phys.spacemap_next, ); let slab_size = phys.slab_size; - Slabs::open(&spacemap, &spacemap_next, slab_size, &phys.slabs).await + let capacity: BiBTreeMap = { + let mut id = SlabId(0); + phys.capacity + .into_iter() + .map(|extent| { + let start = id; + id = id + extent.size / u64::from(slab_size); + (extent, start) + }) + .collect() + }; + Slabs::open(&capacity, &spacemap, &spacemap_next, slab_size, &phys.slabs).await } struct AllocationBucketStatistics { @@ -1552,8 +1506,8 @@ impl AllocationBucketStatistics { } fn add_slab(&mut self, slab: &Slab) { - self.total_segments += slab.get_num_segments(); - self.free_space += slab.get_free_space(); + self.total_segments += slab.num_segments(); + self.free_space += slab.free_space(); self.nslabs += 1; } @@ -1633,8 +1587,7 @@ impl AllocationBucketInfo { fn add_slab(&mut self, slab: &Slab) { self.stats.add_slab(slab); - self.slabs_by_freeness - .insert((slab.get_free_space(), slab.id)); + self.slabs_by_freeness.insert((slab.free_space(), slab.id)); } // Given a permille value (1000-quantile) for the number of slabs in this @@ -1703,12 +1656,7 @@ impl SlabBucketsReport { // Free slabs don't belong on a bucket, just log them for the total stats match slab.info { SlabType::BitmapBased(_) | SlabType::ExtentBased(_) => { - let bucket_info = self - .buckets - .range_mut(slab.get_max_size()..) - .next() - .unwrap() - .1; + let bucket_info = self.buckets.range_mut(slab.max_size()..).next().unwrap().1; bucket_info.add_slab(slab); let nslabs = bucket_info.stats.nslabs; self.reset_hist_scaling_factor(nslabs); diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs index 505aba623e78..5cab0cdf63bf 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs @@ -98,9 +98,7 @@ impl BlockBasedLogPhys { let extent_bytes = block_access.read_raw(truncated_extent).await; let mut total_consumed = 0; while total_consumed < extent_bytes.len() { - let chunk_location = DiskLocation { - offset: extent.location.offset + total_consumed as u64, - }; + let chunk_location = extent.location.offset + total_consumed as u64; trace!("decoding {:?} from {:?}", chunk_id, chunk_location); // XXX handle checksum error here let (chunk, consumed): (BlockBasedLogChunk, usize) = block_access @@ -136,7 +134,7 @@ impl BlockBasedLogPhys { } } - pub fn len_bytes(&self) -> u64 { + pub fn bytes(&self) -> u64 { self.next_chunk_offset.0 } @@ -185,11 +183,11 @@ impl BlockBasedLogWithSummaryPhys { self.chunk_summary.iter_chunks(block_access) } - pub fn num_bytes(&self) -> u64 { - self.chunk_summary.len_bytes() + self.this.len_bytes() + pub fn bytes(&self) -> u64 { + self.chunk_summary.bytes() + self.this.bytes() } - pub fn num_reserved_bytes(&self) -> u64 { + pub fn capacity_bytes(&self) -> u64 { self.chunk_summary.capacity_bytes() + self.this.capacity_bytes() } } @@ -384,15 +382,13 @@ impl BlockBasedLog { let offset_within_extent = self.phys.next_chunk_offset.0 - offset.0; // The last extent should go at least to the end of the chunks. assert_le!(offset_within_extent, extent.size); - Extent { - location: DiskLocation { - offset: extent.location.offset + offset_within_extent, - }, - size: extent.size - offset_within_extent, - } + extent.range(offset_within_extent, extent.size - offset_within_extent) } None => Extent { - location: DiskLocation { offset: 0 }, + location: DiskLocation { + disk: DiskId(0), + offset: 0, + }, size: 0, }, } diff --git a/cmd/zfs_object_agent/zettacache/src/extent_allocator.rs b/cmd/zfs_object_agent/zettacache/src/extent_allocator.rs index 6adc139c7109..085e1be4fccd 100644 --- a/cmd/zfs_object_agent/zettacache/src/extent_allocator.rs +++ b/cmd/zfs_object_agent/zettacache/src/extent_allocator.rs @@ -1,51 +1,75 @@ -use crate::base_types::DiskLocation; +use crate::base_types::DiskId; use crate::base_types::Extent; use log::*; use more_asserts::*; use serde::{Deserialize, Serialize}; +use std::cmp::min; +use std::collections::BTreeMap; use std::mem; +use util::iter_wrapping; use util::RangeTree; -#[derive(Serialize, Deserialize, Debug, Copy, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ExtentAllocatorPhys { - pub first_valid_offset: u64, - pub last_valid_offset: u64, + pub capacity: Vec, +} + +impl ExtentAllocatorPhys { + pub fn new(capacity: Vec) -> Self { + Self { capacity } + } } pub struct ExtentAllocator { - state: std::sync::Mutex, + inner: std::sync::Mutex, +} + +struct ExtentAllocatorInner { + disks: BTreeMap, + next: DiskId, } -struct ExtentAllocatorState { - phys: ExtentAllocatorPhys, +struct ExtentAllocatorDisk { + capacity: Extent, allocatable: RangeTree, freeing: RangeTree, // not yet available for reallocation until this checkpoint completes } pub struct ExtentAllocatorBuilder { - phys: ExtentAllocatorPhys, - allocatable: RangeTree, + allocatable: BTreeMap, } impl ExtentAllocatorBuilder { - pub fn new(phys: ExtentAllocatorPhys) -> ExtentAllocatorBuilder { - let mut metadata_allocatable = RangeTree::new(); - metadata_allocatable.add( - phys.first_valid_offset, - phys.last_valid_offset - phys.first_valid_offset, - ); - ExtentAllocatorBuilder { - phys, - allocatable: metadata_allocatable, + pub fn new(phys: &ExtentAllocatorPhys) -> ExtentAllocatorBuilder { + let mut allocatable = BTreeMap::new(); + for extent in &phys.capacity { + assert_eq!(extent.location.offset % 512, 0); + assert_eq!(extent.size % 512, 0); + let mut rt = RangeTree::new(); + rt.add(extent.location.offset, extent.size); + let existing = allocatable.insert(extent.location.disk, (*extent, rt)); + // phys must have at most one extent per disk + assert!(existing.is_none()); } + ExtentAllocatorBuilder { allocatable } } pub fn claim(&mut self, extent: &Extent) { - self.allocatable.remove(extent.location.offset, extent.size); + self.allocatable + .get_mut(&extent.location.disk) + .unwrap() + .1 + .remove(extent.location.offset, extent.size); } pub fn allocatable_bytes(&self) -> u64 { - self.allocatable.space() + self.allocatable.iter().map(|(_, (_, rt))| rt.space()).sum() + } +} + +impl ExtentAllocatorInner { + fn iter_disks(&self) -> impl Iterator { + iter_wrapping(&self.disks, self.next) } } @@ -54,81 +78,125 @@ impl ExtentAllocator { /// allocated, they must all be .claim()ed first, via the /// ExtentAllocatorBuilder. pub fn open(builder: ExtentAllocatorBuilder) -> ExtentAllocator { + let disks: BTreeMap = builder + .allocatable + .into_iter() + .map(|(disk, (capacity, allocatable))| { + ( + disk, + ExtentAllocatorDisk { + capacity, + allocatable, + freeing: Default::default(), + }, + ) + }) + .collect(); ExtentAllocator { - state: std::sync::Mutex::new(ExtentAllocatorState { - phys: builder.phys, - allocatable: builder.allocatable, - freeing: RangeTree::new(), + inner: std::sync::Mutex::new(ExtentAllocatorInner { + next: disks.iter().next().map(|(&disk, _)| disk).unwrap(), + disks, }), } } pub fn get_phys(&self) -> ExtentAllocatorPhys { - self.state.lock().unwrap().phys + ExtentAllocatorPhys { + capacity: self + .inner + .lock() + .unwrap() + .disks + .iter() + .map(|(&id, disk)| { + assert_eq!(id, disk.capacity.location.disk); + disk.capacity + }) + .collect(), + } } pub fn allocatable_bytes(&self) -> u64 { - self.state.lock().unwrap().allocatable.space() + self.inner + .lock() + .unwrap() + .disks + .iter() + .map(|(_, disk)| disk.allocatable.space()) + .sum() } pub fn checkpoint_done(&self) { - let mut state = self.state.lock().unwrap(); - - // Space freed during this checkpoint is now available for reallocation. - for (start, size) in mem::take(&mut state.freeing).iter() { - state.allocatable.add(*start, *size); + let mut inner = self.inner.lock().unwrap(); + + for (&id, disk) in inner.disks.iter_mut() { + assert_eq!(id, disk.capacity.location.disk); + // Space freed during this checkpoint is now available for reallocation. + for (&start, &size) in mem::take(&mut disk.freeing).iter() { + assert_eq!(start % 512, 0); + assert_eq!(size % 512, 0); + disk.allocatable.add(start, size); + } } } pub fn allocate(&self, min_size: u64, max_size: u64) -> Extent { - let mut state = self.state.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); // find first segment where this fits, or largest free segment. // XXX keep size-sorted tree as well? - let mut best_size = 0; - let mut best_offset = 0; - for (offset, size) in state.allocatable.iter() { - if *size > best_size { - best_size = *size; - best_offset = *offset; + let mut best_extent: Option = None; + for disk in inner.iter_disks() { + for (&offset, &size) in disk.allocatable.iter() { + if size > min_size && size > best_extent.map_or(0, |extent| extent.size) { + best_extent = Some(Extent::new( + disk.capacity.location.disk, + offset, + min(size, max_size), + )); + if size >= max_size { + break; + } + } } - if *size >= max_size { - best_size = max_size; - break; - } - } - assert_le!(best_size, max_size); - - if best_size < min_size { - /* - best_offset = state.phys.last_valid_offset; - best_size = max_size64; - state.phys.last_valid_offset += max_size64; - */ - // XXX the block allocator will keep using this, and overwriting our - // metadata, until we notify it. - panic!( - "no extents of at least {} bytes available; need to overwrite {} bytes of data blocks at offset {}", - min_size, max_size, state.phys.last_valid_offset - ); - } else { - // remove segment from allocatable - state.allocatable.remove(best_offset, best_size); } - - let this = Extent { - location: DiskLocation { - offset: best_offset, - }, - size: best_size, - }; - debug!("allocated {:?} for min={} max={}", this, min_size, max_size); - this + let extent = best_extent + .unwrap_or_else(|| panic!("no free metadata chunk of at least {}KB", min_size / 1024)); + assert_ge!(extent.size, min_size); + assert_le!(extent.size, max_size); + + // advance cursor + inner.next = iter_wrapping(&inner.disks, extent.location.disk) + .take(2) + .last() + .unwrap() + .capacity + .location + .disk; + + // remove segment from allocatable + inner + .disks + .get_mut(&extent.location.disk) + .unwrap() + .allocatable + .remove(extent.location.offset, extent.size); + + debug!( + "allocated {:?} for min={} max={}", + extent, min_size, max_size + ); + extent } /// extent can be a subset of what was previously allocated pub fn free(&self, extent: &Extent) { - let mut state = self.state.lock().unwrap(); - state.freeing.add(extent.location.offset, extent.size); + let mut inner = self.inner.lock().unwrap(); + inner + .disks + .get_mut(&extent.location.disk) + .unwrap() + .freeing + .add(extent.location.offset, extent.size); } } diff --git a/cmd/zfs_object_agent/zettacache/src/index.rs b/cmd/zfs_object_agent/zettacache/src/index.rs index 4368ce6ce461..b4809b5f895d 100644 --- a/cmd/zfs_object_agent/zettacache/src/index.rs +++ b/cmd/zfs_object_agent/zettacache/src/index.rs @@ -80,11 +80,11 @@ impl ZettaCacheIndexPhys { } pub fn log_bytes(&self) -> u64 { - self.log.num_bytes() + self.log.bytes() } - pub fn log_reserved_bytes(&self) -> u64 { - self.log.num_reserved_bytes() + pub fn log_capacity_bytes(&self) -> u64 { + self.log.capacity_bytes() } } diff --git a/cmd/zfs_object_agent/zettacache/src/space_map.rs b/cmd/zfs_object_agent/zettacache/src/space_map.rs index c853bc0523ef..3bfffaa40b2e 100644 --- a/cmd/zfs_object_agent/zettacache/src/space_map.rs +++ b/cmd/zfs_object_agent/zettacache/src/space_map.rs @@ -1,26 +1,19 @@ -use crate::base_types::DiskLocation; use crate::base_types::Extent; -use crate::block_access::*; +use crate::base_types::OnDisk; +use crate::block_access::BlockAccess; use crate::block_allocator::SlabGeneration; use crate::block_allocator::SlabId; -use crate::block_based_log::*; +use crate::block_based_log::BlockBasedLog; +use crate::block_based_log::BlockBasedLogEntry; +use crate::block_based_log::BlockBasedLogPhys; use crate::extent_allocator::ExtentAllocator; use crate::extent_allocator::ExtentAllocatorBuilder; -use crate::{ - base_types::OnDisk, - block_based_log::{BlockBasedLog, BlockBasedLogEntry}, -}; use futures::future; -use futures::stream::*; -use serde::{Deserialize, Serialize}; +use futures::stream::StreamExt; +use serde::Deserialize; +use serde::Serialize; use std::sync::Arc; -#[derive(Debug, Serialize, Deserialize, Copy, Clone)] -pub struct SpaceMapExtent { - pub offset: u64, - pub size: u64, -} - #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub struct MarkGenerationEntry { pub slab_id: SlabId, @@ -29,8 +22,8 @@ pub struct MarkGenerationEntry { #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum SpaceMapEntry { - Alloc(SpaceMapExtent), - Free(SpaceMapExtent), + Alloc(Extent), + Free(Extent), MarkGeneration(MarkGenerationEntry), } impl OnDisk for SpaceMapEntry {} @@ -38,8 +31,6 @@ impl BlockBasedLogEntry for SpaceMapEntry {} pub struct SpaceMap { log: BlockBasedLog, - coverage: SpaceMapExtent, - // This is only used currently for printing out the ideal size that the // spacemap would have if it was condensed to our logs. alloc_entries: u64, @@ -48,16 +39,14 @@ pub struct SpaceMap { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct SpaceMapPhys { log: BlockBasedLogPhys, - coverage: SpaceMapExtent, alloc_entries: u64, } impl OnDisk for SpaceMapPhys {} impl SpaceMapPhys { - pub fn new(offset: u64, size: u64) -> SpaceMapPhys { + pub fn new() -> SpaceMapPhys { SpaceMapPhys { log: Default::default(), - coverage: SpaceMapExtent { offset, size }, alloc_entries: 0, } } @@ -66,17 +55,8 @@ impl SpaceMapPhys { self.log.claim(builder); } - pub fn coverage(&self) -> Extent { - Extent { - location: DiskLocation { - offset: self.coverage.offset, - }, - size: self.coverage.size, - } - } - - pub fn len_bytes(&self) -> u64 { - self.log.len_bytes() + pub fn bytes(&self) -> u64 { + self.log.bytes() } pub fn capacity_bytes(&self) -> u64 { @@ -92,7 +72,6 @@ impl SpaceMap { ) -> SpaceMap { SpaceMap { log: BlockBasedLog::open(block_access, extent_allocator, phys.log), - coverage: phys.coverage, alloc_entries: phys.alloc_entries, } } @@ -110,18 +89,16 @@ impl SpaceMap { .await; } - pub fn alloc(&mut self, offset: u64, size: u64) { - if size != 0 { - self.log - .append(SpaceMapEntry::Alloc(SpaceMapExtent { offset, size })); + pub fn alloc(&mut self, extent: Extent) { + if extent.size != 0 { + self.log.append(SpaceMapEntry::Alloc(extent)); self.alloc_entries += 1; } } - pub fn free(&mut self, offset: u64, size: u64) { - if size != 0 { - self.log - .append(SpaceMapEntry::Free(SpaceMapExtent { offset, size })); + pub fn free(&mut self, extent: Extent) { + if extent.size != 0 { + self.log.append(SpaceMapEntry::Free(extent)); } } @@ -136,25 +113,15 @@ impl SpaceMap { pub async fn flush(&mut self) -> SpaceMapPhys { SpaceMapPhys { log: self.log.flush().await, - coverage: self.coverage, alloc_entries: self.alloc_entries, } } - pub fn get_coverage(&self) -> Extent { - Extent { - location: DiskLocation { - offset: self.coverage.offset, - }, - size: self.coverage.size, - } - } - - pub fn get_total_entries(&self) -> u64 { + pub fn total_entries(&self) -> u64 { self.log.len() } - pub fn get_alloc_entries(&self) -> u64 { + pub fn alloc_entries(&self) -> u64 { self.alloc_entries } diff --git a/cmd/zfs_object_agent/zettacache/src/zcachedb.rs b/cmd/zfs_object_agent/zettacache/src/zcachedb.rs index 7c696e3cf6e5..b30c28f4fb34 100644 --- a/cmd/zfs_object_agent/zettacache/src/zcachedb.rs +++ b/cmd/zfs_object_agent/zettacache/src/zcachedb.rs @@ -106,13 +106,12 @@ impl DumpSlabsOptions { } impl ZettaCacheDBCommand { - pub async fn issue_command(command: ZettaCacheDBCommand, path: &str) { - if let Some(handle) = ZCacheDBHandle::open(path).await { - match command { - ZettaCacheDBCommand::DumpStructures(opts) => handle.dump_structures(opts).await, - ZettaCacheDBCommand::DumpSlabs(opts) => handle.dump_slabs(opts).await, - ZettaCacheDBCommand::DumpSpaceUsage => handle.dump_free_space().await, - } + pub async fn issue_command(command: ZettaCacheDBCommand, paths: Vec<&str>) { + let handle = ZCacheDBHandle::open(paths).await.unwrap(); + match command { + ZettaCacheDBCommand::DumpStructures(opts) => handle.dump_structures(opts).await, + ZettaCacheDBCommand::DumpSlabs(opts) => handle.dump_slabs(opts).await, + ZettaCacheDBCommand::DumpSpaceUsage => handle.dump_free_space().await, } } } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache.rs b/cmd/zfs_object_agent/zettacache/src/zettacache.rs index 478365a948b0..e55b864c8b9b 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache.rs @@ -27,6 +27,7 @@ use metered::metered; use metered::time_source::StdInstantMicros; use more_asserts::*; use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; use std::collections::btree_map; use std::collections::BTreeMap; use std::ops::Bound::{Excluded, Unbounded}; @@ -34,21 +35,19 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::time::{sleep_until, timeout_at}; use util::get_tunable; use util::maybe_die_with; use util::nice_p2size; use util::AlignedBytes; -use util::From64; use util::LockSet; use util::LockedItem; use util::MutexExt; lazy_static! { static ref SUPERBLOCK_SIZE: u64 = get_tunable("superblock_size", 4 * 1024); - static ref DEFAULT_CHECKPOINT_RING_BUFFER_SIZE: u32 = get_tunable("default_checkpoint_ring_buffer_size", 1024 * 1024); + static ref DEFAULT_CHECKPOINT_SIZE_PCT: f64 = get_tunable("default_checkpoint_size_pct", 0.1); pub static ref DEFAULT_SLAB_SIZE: u32 = get_tunable("default_slab_size", 16 * 1024 * 1024); static ref DEFAULT_METADATA_SIZE_PCT: f64 = get_tunable("default_metadata_size_pct", 15.0); // Can lower this to test forced eviction. static ref MAX_PENDING_CHANGES: usize = get_tunable("max_pending_changes", 50_000); // XXX should be based on RAM usage, ~tens of millions at least @@ -58,41 +57,65 @@ lazy_static! { static ref TARGET_CACHE_SIZE_PCT: u64 = get_tunable("target_cache_size_pct", 80); static ref HIGH_WATER_CACHE_SIZE_PCT: u64 = get_tunable("high_water_cache_size_pct", 82); static ref QUANTILES_IN_SIZE_HISTOGRAM: usize = get_tunable("quantiles_in_size_histogram", 100); - static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000); static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000); } #[derive(Serialize, Deserialize, Debug)] struct ZettaSuperBlockPhys { - checkpoint_ring_buffer_size: u32, + checkpoint_id: CheckpointId, + checkpoint_capacity: Extent, // space available for checkpoints + checkpoint: Extent, // space used by latest checkpoint slab_size: u32, - last_checkpoint_id: CheckpointId, - last_checkpoint_extent: Extent, + disk: DiskId, + num_disks: usize, + guid: u64, // XXX put sector size in here too and verify it matches what the disk says now? // XXX put disk size in here so we can detect expansion? } impl ZettaSuperBlockPhys { - // XXX when we have multiple disks, will this be stored on a specific one? Or copied on all of them? - async fn read(block_access: &BlockAccess) -> Result { + async fn read(block_access: &BlockAccess, disk: DiskId) -> Result { let raw = block_access - .read_raw(Extent { - location: DiskLocation { offset: 0 }, - size: *SUPERBLOCK_SIZE, - }) + .read_raw(Extent::new(disk, 0, *SUPERBLOCK_SIZE)) .await; let (this, _): (Self, usize) = block_access.chunk_from_raw(&raw)?; debug!("got {:#?}", this); + assert_eq!(this.disk, disk); Ok(this) } - async fn write(&self, block_access: &BlockAccess) { + async fn read_all(block_access: &BlockAccess) -> Result> { + block_access + .disks() + .map(|disk| ZettaSuperBlockPhys::read(block_access, disk)) + .collect::>() + .try_collect() + .await + } + + async fn write(&self, block_access: &BlockAccess, disk: DiskId) { maybe_die_with(|| format!("before writing {:#?}", self)); debug!("writing {:#?}", self); let raw = block_access.chunk_to_raw(EncodeType::Json, self); + // XXX pad it out to SUPERBLOCK_SIZE? block_access - .write_raw(DiskLocation { offset: 0 }, raw) + .write_raw(DiskLocation { offset: 0, disk }, raw) + .await; + } + + /// Write superblock to all disks. Note that ZettaSuperBlockPhys::disk will + /// be changed to the DiskId of each disk that's written. + async fn write_all(&self, block_access: &BlockAccess) { + block_access + .disks() + .map(|disk| async move { + // Change the DiskId of this superblock to match the disk we're writing to. + let phys = Self { disk, ..*self }; + phys.write(block_access, disk).await + }) + .collect::>() + .for_each(|_| async move {}) .await; } } @@ -147,6 +170,7 @@ pub struct ZettaCache { metrics: Arc, blocking_buffer_bytes_available: Arc, nonblocking_buffer_bytes_available: Arc, + write_slots: Arc, } #[derive(Debug, Serialize, Deserialize, Copy, Clone)] @@ -471,83 +495,148 @@ pub enum InsertSource { #[metered(registry=ZettaCacheMetrics)] impl ZettaCache { - pub async fn create(path: &str) { - let block_access = BlockAccess::new(path, false).await; - let metadata_start = *SUPERBLOCK_SIZE + u64::from(*DEFAULT_CHECKPOINT_RING_BUFFER_SIZE); - let data_start = block_access.round_up_to_sector( - metadata_start - + (*DEFAULT_METADATA_SIZE_PCT / 100.0 * block_access.size() as f64) + pub async fn create(block_access: &BlockAccess) { + let guid: u64 = rand::random(); + + let total_capacity = block_access.total_capacity(); + + // checkpoint is stored on the largest disk, its size a percent of the whole cache + let checkpoint_capacity = Extent::new( + block_access + .disks() + .reduce(|a, b| { + if block_access.disk_size(a) > block_access.disk_size(b) { + a + } else { + b + } + }) + .unwrap(), + *SUPERBLOCK_SIZE, + block_access.round_up_to_sector( + (*DEFAULT_CHECKPOINT_SIZE_PCT / 100.0 * total_capacity as f64) .approx_as::() .unwrap(), + ), ); + + // metadata is stored on each disk, its size a percent of that disk + let metadata_capacity: Vec = block_access + .disks() + .map(|disk| { + let start = if disk == checkpoint_capacity.location.disk { + checkpoint_capacity.location.offset + checkpoint_capacity.size + } else { + *SUPERBLOCK_SIZE + }; + Extent::new( + disk, + start, + block_access.round_up_to_sector( + start + + (*DEFAULT_METADATA_SIZE_PCT / 100.0 + * block_access.disk_size(disk) as f64) + .approx_as::() + .unwrap(), + ), + ) + }) + .collect(); + + let data_capacity = metadata_capacity + .iter() + .map(|extent| { + Extent::new( + extent.location.disk, + extent.location.offset + extent.size, + block_access.disk_size(extent.location.disk) + - (extent.location.offset + extent.size), + ) + }) + .collect(); + let checkpoint = ZettaCheckpointPhys { generation: CheckpointId(0), - extent_allocator: ExtentAllocatorPhys { - first_valid_offset: metadata_start, - last_valid_offset: data_start, - }, + extent_allocator: ExtentAllocatorPhys::new(metadata_capacity), index: Default::default(), operation_log: Default::default(), last_atime: Atime(0), - block_allocator: BlockAllocatorPhys::new(data_start, block_access.size() - data_start), - size_histogram: SizeHistogramPhys::new( - block_access.size() - data_start, - *QUANTILES_IN_SIZE_HISTOGRAM, - ), + block_allocator: BlockAllocatorPhys::new(data_capacity), + size_histogram: SizeHistogramPhys::new(total_capacity, *QUANTILES_IN_SIZE_HISTOGRAM), + merge_progress: None, }; let raw = block_access.chunk_to_raw(EncodeType::Json, &checkpoint); - assert_le!(raw.len(), *DEFAULT_CHECKPOINT_RING_BUFFER_SIZE as usize); - let checkpoint_size = raw.len() as u64; + let checkpoint_extent = checkpoint_capacity.range(0, raw.len() as u64); + block_access - .write_raw( - DiskLocation { - offset: *SUPERBLOCK_SIZE, - }, - raw, - ) + .write_raw(checkpoint_extent.location, raw) .await; - let phys = ZettaSuperBlockPhys { - checkpoint_ring_buffer_size: *DEFAULT_CHECKPOINT_RING_BUFFER_SIZE, + let num_disks = block_access.disks().count(); + ZettaSuperBlockPhys { + checkpoint_id: CheckpointId(0), + checkpoint_capacity, + checkpoint: checkpoint_extent, slab_size: *DEFAULT_SLAB_SIZE, - last_checkpoint_extent: Extent { - location: DiskLocation { - offset: *SUPERBLOCK_SIZE, - }, - size: checkpoint_size, - }, - last_checkpoint_id: CheckpointId(0), - }; - phys.write(&block_access).await; + disk: DiskId(0), // will be changed by .write_all() + num_disks, + guid, + } + .write_all(block_access) + .await; } - pub async fn open(path: &str) -> ZettaCache { - let block_access = Arc::new(BlockAccess::new(path, false).await); + pub async fn open(paths: Vec<&str>) -> ZettaCache { + let block_access = Arc::new(BlockAccess::new( + paths.iter().map(|path| Disk::new(path, false)).collect(), + false, + )); - // if superblock not present, create new cache - // XXX need a real mechanism for creating/managing the cache devices - let phys = match ZettaSuperBlockPhys::read(&block_access).await { - Ok(phys) => phys, + let super_blocks = match ZettaSuperBlockPhys::read_all(&block_access).await { + Ok(super_blocks) => super_blocks, Err(_) => { - Self::create(path).await; - ZettaSuperBlockPhys::read(&block_access).await.unwrap() + // XXX need proper create CLI + Self::create(&block_access).await; + ZettaSuperBlockPhys::read_all(&block_access).await.unwrap() } }; - let checkpoint = - ZettaCheckpointPhys::read(&block_access, phys.last_checkpoint_extent).await; + let latest_super_block = super_blocks + .into_iter() + .enumerate() + .map(|(id, phys)| { + // XXX proper error handling + // XXX we should be able to reorder them? + assert_eq!(DiskId(id.try_into().unwrap()), phys.disk); + phys + }) + .reduce(|x, y| { + // XXX proper error handling + assert_eq!(x.guid, y.guid); + assert_eq!(x.num_disks, y.num_disks); + match x.checkpoint_id.cmp(&y.checkpoint_id) { + Ordering::Greater => x, + Ordering::Less => y, + Ordering::Equal => { + assert_eq!(x.checkpoint, y.checkpoint); + x + } + } + }) + .unwrap(); - assert_eq!(checkpoint.generation, phys.last_checkpoint_id); + // XXX proper error handling + assert_eq!(paths.len(), latest_super_block.num_disks); + assert!(latest_super_block + .checkpoint_capacity + .contains(&latest_super_block.checkpoint)); - let metadata_start = *SUPERBLOCK_SIZE + u64::from(phys.checkpoint_ring_buffer_size); - // XXX pass in the metadata_start to ExtentAllocator::open, rather than - // having this represented twice in the on-disk format? - assert_eq!( - metadata_start, - checkpoint.extent_allocator.first_valid_offset - ); + let checkpoint = + ZettaCheckpointPhys::read(&block_access, latest_super_block.checkpoint).await; - let mut builder = ExtentAllocatorBuilder::new(checkpoint.extent_allocator); + assert_eq!(checkpoint.generation, latest_super_block.checkpoint_id); + + let mut builder = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator); checkpoint.claim(&mut builder); let extent_allocator = Arc::new(ExtentAllocator::open(builder)); @@ -577,7 +666,7 @@ impl ZettaCache { atime_histogram, size_histogram: checkpoint.size_histogram, operation_log, - super_phys: phys, + super_phys: latest_super_block, outstanding_reads: Default::default(), outstanding_writes: Default::default(), atime: checkpoint.last_atime, @@ -591,7 +680,6 @@ impl ZettaCache { }; let this = ZettaCache { - block_access, index: Arc::new(tokio::sync::RwLock::new(index)), state: Arc::new(tokio::sync::Mutex::new(state)), outstanding_lookups: LockSet::new(), @@ -602,6 +690,10 @@ impl ZettaCache { nonblocking_buffer_bytes_available: Arc::new(Semaphore::new( *CACHE_INSERT_NONBLOCKING_BUFFER_BYTES, )), + write_slots: Arc::new(Semaphore::new( + block_access.disks().count() * *DISK_WRITE_MAX_QUEUE_DEPTH, + )), + block_access, }; let (merge_rx, merge_index) = match checkpoint.merge_progress { @@ -627,14 +719,12 @@ impl ZettaCache { my_cache.checkpoint_task(merge_rx, merge_index).await; }); - let state = this.state.clone(); let metrics = this.metrics.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { interval.tick().await; debug!("metrics: {:#?}", metrics); - state.lock().await.block_access.dump_metrics(); } }); @@ -1021,8 +1111,8 @@ impl ZettaCache { }, }; - let block_access = self.block_access.clone(); let state = self.state.clone(); + let write_slots = self.write_slots.clone(); tokio::spawn(async move { // Get a permit to write to disk before waiting on the state lock. // This ensures that once we assign this insertion to a checkpoint, @@ -1032,17 +1122,19 @@ impl ZettaCache { // progress. Acquiring the WritePermit may take a long time, // because we have to wait for any in-progress insertions (up to // CACHE_INSERT_MAX_BUFFER) to complete before we can write to disk. - let write_permit = block_access.acquire_write().await; + let _write_permit = write_slots.acquire_owned().await.unwrap(); // Now that we are ready to issue the write to disk, insert to the // cache in the current checkpoint (allocate a block, add to // pending_changes and outstanding_writes). - let fut = - state - .lock_non_send() - .await - .insert(insert_permit, write_permit, locked_key, bytes); + let fut = state.lock_non_send().await.insert(locked_key, bytes); fut.await; + // We want to hold onto the insert_permit until the write completes + // because it represents the memory that's required to buffer this + // insertion, which isn't released until the io completes. + // Similarly, the write_permit (roughly) represents the disks' + // capacity to perform i/o. + drop(insert_permit); }); } @@ -1098,8 +1190,10 @@ impl ZettaCache { pub async fn clear_hit_data(&self) { let mut state = self.state.lock().await; - state.size_histogram = - SizeHistogramPhys::new(state.block_access.size(), *QUANTILES_IN_SIZE_HISTOGRAM) + state.size_histogram = SizeHistogramPhys::new( + state.block_access.total_capacity(), + *QUANTILES_IN_SIZE_HISTOGRAM, + ) } } @@ -1111,28 +1205,55 @@ pub struct ZCacheDBHandle { } impl ZCacheDBHandle { - pub async fn open(path: &str) -> Option { - let block_access = Arc::new(BlockAccess::new(path, false).await); - let superblock = match ZettaSuperBlockPhys::read(&block_access).await { - Ok(phys) => phys, - Err(e) => { - println!("Couldn't read ZettaCache SuperBlock!"); - println!("{:?}", e); - return None; - } - }; - let checkpoint = Arc::new( - ZettaCheckpointPhys::read(&block_access, superblock.last_checkpoint_extent).await, - ); + pub async fn open(paths: Vec<&str>) -> Result { + let block_access = Arc::new(BlockAccess::new( + paths.iter().map(|path| Disk::new(path, true)).collect(), + true, + )); + + let super_blocks = ZettaSuperBlockPhys::read_all(&block_access).await?; + + let superblock = super_blocks + .into_iter() + .enumerate() + .map(|(id, phys)| { + // XXX proper error handling + // XXX we should be able to reorder them? + assert_eq!(DiskId(id.try_into().unwrap()), phys.disk); + phys + }) + .reduce(|x, y| { + // XXX proper error handling + assert_eq!(x.guid, y.guid); + assert_eq!(x.num_disks, y.num_disks); + match x.checkpoint_id.cmp(&y.checkpoint_id) { + Ordering::Greater => x, + Ordering::Less => y, + Ordering::Equal => { + assert_eq!(x.checkpoint, y.checkpoint); + x + } + } + }) + .unwrap(); + + // XXX proper error handling + assert_eq!(paths.len(), superblock.num_disks); + assert!(superblock + .checkpoint_capacity + .contains(&superblock.checkpoint)); - let mut builder = ExtentAllocatorBuilder::new(checkpoint.extent_allocator); + let checkpoint = + Arc::new(ZettaCheckpointPhys::read(&block_access, superblock.checkpoint).await); + + let mut builder = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator); // We should be able to get away without claiming the metadata space, // since we aren't allocating anything, but we may also want to do this // for verification (e.g. that there aren't overlapping Extents). checkpoint.claim(&mut builder); let extent_allocator = Arc::new(ExtentAllocator::open(builder)); - Some(ZCacheDBHandle { + Ok(ZCacheDBHandle { block_access, superblock, checkpoint, @@ -1151,34 +1272,33 @@ impl ZCacheDBHandle { .chunk_to_raw(EncodeType::Json, &self.superblock) .len() as u64; println!( - " {:>6} used out of {:>6}", + " {} used out of {} ({:.1}%)", nice_p2size(superblock_len), - nice_p2size(*SUPERBLOCK_SIZE) - ); - println!(); - println!( - "[{:>6}-{:>6}) Checkpoint Region", nice_p2size(*SUPERBLOCK_SIZE), - nice_p2size(*SUPERBLOCK_SIZE + u64::from(self.superblock.checkpoint_ring_buffer_size)) - ); - println!( - " checkpoint size: {:>6}", - nice_p2size(self.superblock.last_checkpoint_extent.size) + superblock_len as f64 * 100.0 / *SUPERBLOCK_SIZE as f64 ); println!(); + println!("Checkpoint Region"); + println!(" {:?}", self.superblock.checkpoint_capacity); println!( - "[{:>6}-{:>6}) Metadata Region", - nice_p2size(self.checkpoint.extent_allocator.first_valid_offset), - nice_p2size(self.checkpoint.extent_allocator.last_valid_offset) + " checkpoint: {} used out of {} ({:.1}%, must be <50%)", + nice_p2size(self.superblock.checkpoint.size), + nice_p2size(self.superblock.checkpoint_capacity.size), + self.superblock.checkpoint.size as f64 * 100.0 + / self.superblock.checkpoint_capacity.size as f64 ); + println!(); + println!("Metadata Region"); let mut total_used_bytes = 0; + let mut total_allocated_bytes = 0; println!( " {:>13} - {:>6} used out of {:>6} allocated", "operation log", - nice_p2size(self.checkpoint.operation_log.len_bytes()), + nice_p2size(self.checkpoint.operation_log.bytes()), nice_p2size(self.checkpoint.operation_log.capacity_bytes()) ); - total_used_bytes += self.checkpoint.operation_log.len_bytes(); + total_used_bytes += self.checkpoint.operation_log.bytes(); + total_allocated_bytes += self.checkpoint.operation_log.capacity_bytes(); println!( " {:>13} - {:>6} used out of {:>6} allocated", @@ -1187,6 +1307,7 @@ impl ZCacheDBHandle { nice_p2size(self.checkpoint.block_allocator.spacemap_capacity_bytes()) ); total_used_bytes += self.checkpoint.block_allocator.spacemap_bytes(); + total_allocated_bytes += self.checkpoint.block_allocator.spacemap_capacity_bytes(); println!( " {:>13} - {:>6} used out of {:>6} allocated", @@ -1199,50 +1320,65 @@ impl ZCacheDBHandle { ) ); total_used_bytes += self.checkpoint.block_allocator.spacemap_next_bytes(); + total_allocated_bytes += self + .checkpoint + .block_allocator + .spacemap_next_capacity_bytes(); println!( " {:>13} - {:>6} used out of {:>6} allocated", "index log", nice_p2size(self.checkpoint.index.log_bytes()), - nice_p2size(self.checkpoint.index.log_reserved_bytes()) + nice_p2size(self.checkpoint.index.log_capacity_bytes()) ); total_used_bytes += self.checkpoint.index.log_bytes(); + total_allocated_bytes += self.checkpoint.index.log_capacity_bytes(); if let Some((log, idx)) = self.checkpoint.merge_progress.clone() { println!( " {:>13} - {:>6} used out of {:>6} allocated", "progress log", - nice_p2size(log.len_bytes()), + nice_p2size(log.bytes()), nice_p2size(log.capacity_bytes()) ); - total_used_bytes += log.len_bytes(); + total_used_bytes += log.bytes(); + total_allocated_bytes += log.capacity_bytes(); println!( " {:>13} - {:>6} used out of {:>6} allocated", "progress index", nice_p2size(idx.log_bytes()), - nice_p2size(idx.log_reserved_bytes()) + nice_p2size(idx.log_capacity_bytes()) ); total_used_bytes += idx.log_bytes(); + total_allocated_bytes += idx.log_capacity_bytes(); } println!(" ----------------------"); - let metadata_region_size = self.checkpoint.extent_allocator.last_valid_offset - - self.checkpoint.extent_allocator.first_valid_offset; + let metadata_region_size = self + .checkpoint + .extent_allocator + .capacity + .iter() + .map(|extent| extent.size) + .sum(); println!( - " {:>13} - {:>6} used out of {:>6} allocated ({:>6} total)", + " {:>13} - {} ({:.1}%) used, {} ({:.1}%) allocated out of {:>6} total", "total", nice_p2size(total_used_bytes), - nice_p2size(metadata_region_size - self.extent_allocator.allocatable_bytes()), + total_used_bytes as f64 * 100.0 / metadata_region_size as f64, + nice_p2size(total_allocated_bytes), + total_allocated_bytes as f64 * 100.0 / metadata_region_size as f64, nice_p2size(metadata_region_size) ); println!(); - let balloc_coverage = self.checkpoint.block_allocator.coverage(); - let balloc_end = balloc_coverage.location.offset + balloc_coverage.size; - println!( - "[{:>6}-{:>6}) User Data Region", - nice_p2size(balloc_coverage.location.offset), - nice_p2size(balloc_end) - ); + let balloc_size = self + .checkpoint + .block_allocator + .capacity() + .iter() + .map(|extent| extent.size) + .sum(); + println!("{:>6} User Data Region", nice_p2size(balloc_size)); } pub async fn dump_structures(&self, opts: DumpStructuresOptions) { @@ -1336,20 +1472,6 @@ impl ZettaCacheState { } fn lookup(&mut self, key: IndexKey, mut value: IndexValue) -> DataReader { - if value.location.offset < self.extent_allocator.get_phys().last_valid_offset { - // The metadata overwrote this data, so it's no longer in the cache. - // Remove from index and return None. - trace!( - "cache miss: {:?} at {:?} was overwritten by metadata allocator; removing from cache", - key, - value - ); - // Note: we could pass in the (mutable) pending_change reference, - // which would let evict_block() avoid looking it up again. But - // this is not a common code path, and this interface seems cleaner. - self.remove_from_index(key, value); - return data_reader_none(); - } // If value.atime is before eviction cutoff, return a cache miss if let Some(ms) = &self.merging_state { if value.atime < ms.eviction_cutoff { @@ -1468,19 +1590,12 @@ impl ZettaCacheState { /// Insert this block to the cache, if space and performance parameters /// allow. It may be a recent cache miss, or a recently-written block. /// Returns a Future to be executed after the state lock has been dropped. - fn insert( - &mut self, - insert_permit: OwnedSemaphorePermit, - write_permit: WritePermit, - locked_key: LockedKey, - bytes: AlignedBytes, - ) -> impl Future { + fn insert(&mut self, locked_key: LockedKey, bytes: AlignedBytes) -> impl Future { let buf_size = bytes.len(); - let location_opt = self.allocate_block(u32::try_from(bytes.len()).unwrap()); - if location_opt.is_none() { - return future::Either::Left(async {}); - } - let location = location_opt.unwrap(); + let location = match self.allocate_block(u32::try_from(bytes.len()).unwrap()) { + Some(location) => location, + None => return future::Either::Left(async {}), + }; // XXX if this is past the last block of the main index, we can write it // there (and location_dirty:false) instead of logging it @@ -1529,25 +1644,19 @@ impl ZettaCacheState { let block_access = self.block_access.clone(); // Note: locked_key can be dropped before the i/o completes, since the - // changes to the State have already been made. We want to hold onto - // the insert_permit until the write completes because it represents the - // memory that's required to buffer this insertion, which isn't released - // until the io completes. + // changes to the State have already been made. future::Either::Right(async move { - block_access - .write_raw_permit(write_permit, location, bytes) - .await - .unwrap(); + block_access.write_raw(location, bytes).await; sem.add_permits(1); - drop(insert_permit); }) } /// returns offset, or None if there's no space fn allocate_block(&mut self, size: u32) -> Option { - self.block_allocator - .allocate(size) - .map(|extent| extent.location) + self.block_allocator.allocate(size).map(|extent| { + self.block_access.verify_aligned(extent.location.offset); + extent.location + }) } /// Flush out the current set of pending index changes. This is a recovery point in case of @@ -1559,7 +1668,7 @@ impl ZettaCacheState { ) { debug!( "flushing checkpoint {:?}", - self.super_phys.last_checkpoint_id.next() + self.super_phys.checkpoint_id.next() ); let begin_checkpoint = Instant::now(); @@ -1628,7 +1737,7 @@ impl ZettaCacheState { }); let checkpoint = ZettaCheckpointPhys { - generation: self.super_phys.last_checkpoint_id.next(), + generation: self.super_phys.checkpoint_id.next(), extent_allocator: self.extent_allocator.get_phys(), index: index.get_phys(), operation_log: operation_log_phys, @@ -1646,7 +1755,7 @@ impl ZettaCacheState { // checkpoint, it likely indicates a leak, e.g. a BlockBasedLog is no // longer in the Checkpoint but wasn't .clear()'ed. { - let mut checkpoint_extents = ExtentAllocatorBuilder::new(checkpoint.extent_allocator); + let mut checkpoint_extents = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator); checkpoint.claim(&mut checkpoint_extents); let checkpoint_bytes = checkpoint_extents.allocatable_bytes(); let allocator_bytes = self.extent_allocator.allocatable_bytes(); @@ -1656,25 +1765,30 @@ impl ZettaCacheState { } } - let mut checkpoint_location = self.super_phys.last_checkpoint_extent.location - + self.super_phys.last_checkpoint_extent.size; - let raw = self .block_access .chunk_to_raw(EncodeType::Json, &checkpoint); - if raw.len() - > usize::from64( - checkpoint.extent_allocator.first_valid_offset - checkpoint_location.offset, - ) + + let mut checkpoint_extent = Extent::new( + self.super_phys.checkpoint.location.disk, + self.super_phys.checkpoint.location.offset + self.super_phys.checkpoint.size, + raw.len() as u64, + ); + + if !self + .super_phys + .checkpoint_capacity + .contains(&checkpoint_extent) { // Out of space; go back to the beginning of the checkpoint space. - checkpoint_location.offset = *SUPERBLOCK_SIZE; + checkpoint_extent.location.offset = self.super_phys.checkpoint_capacity.location.offset; + assert!(self + .super_phys + .checkpoint_capacity + .contains(&checkpoint_extent)); assert_le!( - raw.len(), - usize::from64( - self.super_phys.last_checkpoint_extent.location.offset - - checkpoint_location.offset - ), + checkpoint_extent.location.offset + checkpoint_extent.size, + self.super_phys.checkpoint.location.offset ); // XXX The above assertion could fail if there isn't enough // checkpoint space for 3 checkpoints (the existing one that @@ -1685,22 +1799,21 @@ impl ZettaCacheState { // then part at the beginning of the space). } maybe_die_with(|| format!("before writing {:#?}", checkpoint)); - debug!("writing to {:?}: {:#?}", checkpoint_location, checkpoint); + debug!("writing to {:?}: {:#?}", checkpoint_extent, checkpoint); - self.super_phys.last_checkpoint_extent = Extent { - location: checkpoint_location, - size: raw.len() as u64, - }; - self.block_access.write_raw(checkpoint_location, raw).await; + self.block_access + .write_raw(checkpoint_extent.location, raw) + .await; - self.super_phys.last_checkpoint_id = self.super_phys.last_checkpoint_id.next(); - self.super_phys.write(&self.block_access).await; + self.super_phys.checkpoint = checkpoint_extent; + self.super_phys.checkpoint_id = self.super_phys.checkpoint_id.next(); + self.super_phys.write_all(&self.block_access).await; self.extent_allocator.checkpoint_done(); info!( "completed {:?} in {}ms; flushed {} operations ({}KB) to log", - self.super_phys.last_checkpoint_id, + self.super_phys.checkpoint_id, begin_checkpoint.elapsed().as_millis(), operation_log_len, operation_log_bytes / 1024, @@ -1770,9 +1883,9 @@ impl ZettaCacheState { "target cache size for storage size {}GB is {}GB; {}MB used; {}MB high-water; {}MB freeing; histogram covers {}MB", self.block_allocator.size() / 1024 / 1024 / 1024, target_size / 1024 / 1024 / 1024, - (self.block_allocator.size() - self.block_allocator.get_available()) / 1024 / 1024, + (self.block_allocator.size() - self.block_allocator.available()) / 1024 / 1024, (self.block_allocator.size() / 100) * *HIGH_WATER_CACHE_SIZE_PCT / 1024 / 1024, - self.block_allocator.get_freeing() / 1024 / 1024, + self.block_allocator.freeing() / 1024 / 1024, self.atime_histogram.sum() / 1024 / 1024, ); let eviction_atime = self.atime_histogram.atime_for_target_size(target_size); diff --git a/cmd/zfs_object_agent/zettaobject/src/init.rs b/cmd/zfs_object_agent/zettaobject/src/init.rs index 4f956e32f391..7ccfc6edc5fe 100644 --- a/cmd/zfs_object_agent/zettaobject/src/init.rs +++ b/cmd/zfs_object_agent/zettaobject/src/init.rs @@ -62,7 +62,7 @@ fn parse_id_from_file(id_path: &Path) -> Result { Ok(Uuid::parse_str(std::str::from_utf8(&bytes)?)?) } -pub fn start(socket_dir: &str, cache_path: Option<&str>) { +pub fn start(socket_dir: &str, cache_paths: Vec<&str>) { /* * Take an exclusive lock on a lock file. This prevents multiple agent * processes from operating out of the same socket_dir. @@ -78,9 +78,9 @@ pub fn start(socket_dir: &str, cache_path: Option<&str>) { // Kick off zpool destroy tasks. pool_destroy::init_pool_destroyer(socket_dir).await; - let cache = match cache_path { - Some(path) => Some(ZettaCache::open(path).await), - None => None, + let cache = match cache_paths.is_empty() { + false => Some(ZettaCache::open(cache_paths).await), + true => None, }; PublicServerState::start(socket_dir, cache.as_ref().cloned()); diff --git a/cmd/zfs_object_agent/zoa/src/lib.rs b/cmd/zfs_object_agent/zoa/src/lib.rs index 38be28e4c790..358714c5742a 100644 --- a/cmd/zfs_object_agent/zoa/src/lib.rs +++ b/cmd/zfs_object_agent/zoa/src/lib.rs @@ -7,7 +7,7 @@ use std::os::raw::c_char; pub unsafe extern "C" fn libzoa_init( socket_dir_ptr: *const c_char, log_file_ptr: *const c_char, - cache_path_ptr: *const c_char, + cache_path_ptr: *const c_char, // XXX change to take a list of paths ) { let socket_dir = CStr::from_ptr(socket_dir_ptr) .to_string_lossy() @@ -18,11 +18,11 @@ pub unsafe extern "C" fn libzoa_init( util::setup_logging(verbosity, Some(log_file.as_str()), None); if cache_path_ptr.is_null() { - zettaobject::init::start(&socket_dir, None); + zettaobject::init::start(&socket_dir, Vec::new()); } else { let cache = CStr::from_ptr(cache_path_ptr) .to_string_lossy() .into_owned(); - zettaobject::init::start(&socket_dir, Some(cache.as_str())); + zettaobject::init::start(&socket_dir, vec![cache.as_str()]); } }