diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index 10ea57198e2f..cf1a0407017d 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -1683,16 +1683,22 @@ impl Pool { } } - pub fn free_block(&self, block: BlockId, size: u32) { + pub fn free_blocks(&self, blocks: &[u64], sizes: &[u32]) { // the syncing_state is only held from the thread that owns the Pool // (i.e. this thread) and from end_txg(). It's not allowed to call this // function while in the middle of an end_txg(), so the lock must not be // held. XXX change this to return an error to the client + assert_eq!(blocks.len(), sizes.len()); self.state.with_syncing_state(|syncing_state| { - syncing_state.log_free( - PendingFreesLogEntry { block, size }, - &self.state.object_block_map, - ) + for (&block, &size) in blocks.iter().zip(sizes.iter()) { + syncing_state.log_free( + PendingFreesLogEntry { + block: BlockId(block), + size, + }, + &self.state.object_block_map, + ); + } }) } diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index 57deb00a41e3..d23bd86bace3 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -13,7 +13,6 @@ use cstr_argument::CStrArgument; use lazy_static::lazy_static; use log::*; use nvpair::{NvData, NvList, NvListRef}; -use std::convert::TryFrom; use std::sync::Arc; use util::get_tunable; use util::maybe_die_with; @@ -72,7 +71,7 @@ impl RootConnectionState { server.register_handler("flush writes", Box::new(Self::flush_writes)); server.register_handler("end txg", Box::new(Self::end_txg)); server.register_handler("write block", Box::new(Self::write_block)); - server.register_handler("free block", Box::new(Self::free_block)); + server.register_handler("free blocks", Box::new(Self::free_blocks)); server.register_handler("read block", Box::new(Self::read_block)); server.register_handler("get stats", Box::new(Self::get_stats)); server.register_handler("close pool", Box::new(Self::close_pool)); @@ -314,14 +313,14 @@ impl RootConnectionState { })) } - fn free_block(&mut self, nvl: NvList) -> HandlerReturn { + fn free_blocks(&mut self, nvl: NvList) -> HandlerReturn { trace!("got request: {:?}", nvl); - let block = BlockId(nvl.lookup_uint64("block")?); - let size = u32::try_from(nvl.lookup_uint64("size")?)?; + let blocks = u64_array_value(&nvl, "block")?; + let sizes = u32_array_value(&nvl, "size")?; let pool = self.pool.as_ref().ok_or_else(|| anyhow!("no pool open"))?; - pool.free_block(block, size); - maybe_die_with(|| format!("after free block request: {:?}", block)); + pool.free_blocks(blocks, sizes); + maybe_die_with(|| "after free block request".to_string()); handler_return_ok(None) } @@ -482,8 +481,8 @@ where } } -/// Get the BoolV type value, or if not present then default to false. -/// Return Err if value is present but not BoolV type. +/// Get the uint8_array type value. +/// Return Err if value is not present, or not the expected type. fn u8_array_value(nvl: &NvListRef, name: S) -> Result<&[u8]> where S: CStrArgument, @@ -495,3 +494,31 @@ where Err(anyhow!("pair {:?} not expected type (uint8_array)", pair)) } } + +/// Get the uint32_array type value. +/// Return Err if value is not present, or not the expected type. +fn u32_array_value(nvl: &NvListRef, name: S) -> Result<&[u32]> +where + S: CStrArgument, +{ + let pair = nvl.lookup(name)?; + if let NvData::Uint32Array(slice) = pair.data() { + Ok(slice) + } else { + Err(anyhow!("pair {:?} not expected type (uint32_array)", pair)) + } +} + +/// Get the uint64_array type value. +/// Return Err if value is not present, or not the expected type. +fn u64_array_value(nvl: &NvListRef, name: S) -> Result<&[u64]> +where + S: CStrArgument, +{ + let pair = nvl.lookup(name)?; + if let NvData::Uint64Array(slice) = pair.data() { + Ok(slice) + } else { + Err(anyhow!("pair {:?} not expected type (uint64_array)", pair)) + } +} diff --git a/cmd/zfs_object_agent/zettaobject/src/server.rs b/cmd/zfs_object_agent/zettaobject/src/server.rs index bf0d4e2389a9..1dfa01571fcc 100644 --- a/cmd/zfs_object_agent/zettaobject/src/server.rs +++ b/cmd/zfs_object_agent/zettaobject/src/server.rs @@ -10,6 +10,7 @@ use anyhow::anyhow; use anyhow::Result; use futures::{future, Future, FutureExt}; +use lazy_static::lazy_static; use log::*; use nvpair::{NvEncoding, NvList}; use std::collections::HashMap; @@ -19,8 +20,15 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, Mutex}; +use util::get_tunable; use util::From64; +lazy_static! { + // max zfs block size is 16MB + pub static ref UNREASONABLE_REQUEST_SIZE: u64 = + get_tunable("unreasonable_request_size", 20_000_000); +} + // Ss: ServerState (consumer's state associated with the server) // Cs: ConnectionState (consumer's state associated with the connection) pub struct Server { @@ -126,15 +134,12 @@ impl Server { // XXX kernel sends this as host byte order let len64 = input.read_u64_le().await?; //trace!("got request len: {}", len64); - if len64 > 20_000_000 { - // max zfs block size is 16MB + if len64 > *UNREASONABLE_REQUEST_SIZE { panic!("got unreasonable request length {} ({:#x})", len64, len64); } let mut v = Vec::new(); - // XXX would be nice if we didn't have to zero it out. Should be able - // to do that using read_buf(), treating the Vec as a BufMut, but will - // require multiple calls to do the equivalent of read_exact(). + // XXX Would be nice if we didn't have to zero it out. v.resize(usize::from64(len64), 0); input.read_exact(v.as_mut()).await?; let nvl = NvList::try_unpack(v.as_ref()).unwrap(); diff --git a/include/sys/vdev_object_store.h b/include/sys/vdev_object_store.h index b60517b11cb5..31f265cc4617 100644 --- a/include/sys/vdev_object_store.h +++ b/include/sys/vdev_object_store.h @@ -31,7 +31,7 @@ #define AGENT_TYPE_READ_DONE "read done" #define AGENT_TYPE_WRITE_BLOCK "write block" #define AGENT_TYPE_WRITE_DONE "write done" -#define AGENT_TYPE_FREE_BLOCK "free block" +#define AGENT_TYPE_FREE_BLOCKS "free blocks" #define AGENT_TYPE_BEGIN_TXG "begin txg" #define AGENT_TYPE_RESUME_COMPLETE "resume complete" #define AGENT_TYPE_END_TXG "end txg" diff --git a/module/os/linux/zfs/vdev_object_store.c b/module/os/linux/zfs/vdev_object_store.c index 8e6fa07a32fb..b64bb3643a95 100644 --- a/module/os/linux/zfs/vdev_object_store.c +++ b/module/os/linux/zfs/vdev_object_store.c @@ -48,6 +48,13 @@ struct sockaddr_un zfs_root_socket = { AF_UNIX, "/etc/zfs/zfs_root_socket" }; +/* + * Free in batches of 100,000 blocks, to limit memory usage. Note that + * the Agent accepts requests of up to ~20MB, and each block uses 12 bytes, + * so the max "free blocks" request is ~1.2MB. + */ +int vdev_object_store_max_frees = 100000; + typedef enum { VOS_SOCK_CLOSED = (1 << 0), VOS_SOCK_SHUTTING_DOWN = (1 << 1), @@ -118,6 +125,7 @@ typedef struct vdev_object_store { uint64_t vos_flush_point; list_t vos_free_list; + uint64_t vos_free_list_len; } vdev_object_store_t; /* @@ -473,37 +481,68 @@ object_store_stop_agent(vdev_t *vd) agent_wait_serial(vos, VOS_SERIAL_CLOSE_POOL); } +static int +agent_free_blocks_impl(vdev_object_store_t *vos, + uint64_t *blkids, uint32_t *sizes, int num) +{ + nvlist_t *nv = fnvlist_alloc(); + fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_FREE_BLOCKS); + fnvlist_add_uint64_array(nv, AGENT_BLKID, blkids, num); + fnvlist_add_uint32_array(nv, AGENT_SIZE, sizes, num); + int err = agent_request(vos, nv, FTAG); + fnvlist_free(nv); + if (err == 0) { + zfs_dbgmsg("agent_free_blocks freed %d blocks", num); + } else { + zfs_dbgmsg("agnet_free_blocks failed to send: %d", err); + } + return (err); +} + static int agent_free_blocks(vdev_object_store_t *vos) { ASSERT(MUTEX_HELD(&vos->vos_sock_lock)); - int blocks_freed = 0; + int buf_len = MIN(vos->vos_free_list_len, vdev_object_store_max_frees); + if (buf_len == 0) + return (0); + int err = 0; + + uint64_t *blkid_array = + vmem_alloc(buf_len * sizeof (*blkid_array), KM_SLEEP); + uint32_t *size_array = + vmem_alloc(buf_len * sizeof (*size_array), KM_SLEEP); + + int num_freed = 0; for (object_store_free_block_t *osfb = list_head(&vos->vos_free_list); osfb != NULL; osfb = list_next(&vos->vos_free_list, osfb)) { - - blocks_freed++; uint64_t blockid = osfb->osfb_offset >> 9; - nvlist_t *nv = fnvlist_alloc(); - fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_FREE_BLOCK); + blkid_array[num_freed] = blockid; + size_array[num_freed] = osfb->osfb_size; + num_freed++; - fnvlist_add_uint64(nv, AGENT_BLKID, blockid); - fnvlist_add_uint64(nv, AGENT_SIZE, osfb->osfb_size); if (zfs_flags & ZFS_DEBUG_OBJECT_STORE) { zfs_dbgmsg("agent_free_blocks(blkid=%llu, asize=%llu)", (u_longlong_t)blockid, (u_longlong_t)osfb->osfb_size); } - int err = agent_request(vos, nv, FTAG); - if (err != 0) { - fnvlist_free(nv); - zfs_dbgmsg("agnet_free_block failed to send: %d", err); - return (err); + + if (num_freed == buf_len) { + err = agent_free_blocks_impl(vos, + blkid_array, size_array, num_freed); + num_freed = 0; + if (err != 0) + break; } - fnvlist_free(nv); } - zfs_dbgmsg("agent_free_blocks freed %d blocks", blocks_freed); - return (0); + if (num_freed != 0) { + err = agent_free_blocks_impl(vos, + blkid_array, size_array, num_freed); + } + vmem_free(blkid_array, buf_len * sizeof (*blkid_array)); + vmem_free(size_array, buf_len * sizeof (*size_array)); + return (err); } static void @@ -910,10 +949,14 @@ object_store_end_txg(vdev_t *vd, nvlist_t *config, uint64_t txg) agent_wait_serial(vos, VOS_SERIAL_END_TXG); object_store_free_block_t *osfb; + uint64_t len = 0; while ((osfb = list_remove_head(&vos->vos_free_list)) != NULL) { kmem_free(osfb, sizeof (object_store_free_block_t)); + len++; } ASSERT(list_is_empty(&vos->vos_free_list)); + ASSERT3U(len, ==, vos->vos_free_list_len); + vos->vos_free_list_len = 0; vos->vos_send_txg_selector = VOS_TXG_NONE; } @@ -933,6 +976,7 @@ object_store_free_block(vdev_t *vd, uint64_t offset, uint64_t asize) osfb->osfb_offset = offset; osfb->osfb_size = asize; list_insert_tail(&vos->vos_free_list, osfb); + vos->vos_free_list_len++; } void