Skip to content

Commit

Permalink
free multiple blocks with one request (openzfs#516)
Browse files Browse the repository at this point in the history
The performance of removing a large file or filesystem can be dominated
by the CPU cost of issuing many small "free block" requests - one per
block.

This commit changes the kernel-agent API, replaceing the "free block"
(singular) request to "free blocks" (plural).  By sending many blocks
with one request, we reduce the CPU cost.  The time to `rm` a large file
is reduced to ~1/4 the previous time.
  • Loading branch information
ahrens authored Oct 18, 2021
1 parent 338808e commit 2710c72
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 35 deletions.
16 changes: 11 additions & 5 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1683,16 +1683,22 @@ impl Pool {
}
}

pub fn free_block(&self, block: BlockId, size: u32) {
pub fn free_blocks(&self, blocks: &[u64], sizes: &[u32]) {
// the syncing_state is only held from the thread that owns the Pool
// (i.e. this thread) and from end_txg(). It's not allowed to call this
// function while in the middle of an end_txg(), so the lock must not be
// held. XXX change this to return an error to the client
assert_eq!(blocks.len(), sizes.len());
self.state.with_syncing_state(|syncing_state| {
syncing_state.log_free(
PendingFreesLogEntry { block, size },
&self.state.object_block_map,
)
for (&block, &size) in blocks.iter().zip(sizes.iter()) {
syncing_state.log_free(
PendingFreesLogEntry {
block: BlockId(block),
size,
},
&self.state.object_block_map,
);
}
})
}

Expand Down
45 changes: 36 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/root_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use cstr_argument::CStrArgument;
use lazy_static::lazy_static;
use log::*;
use nvpair::{NvData, NvList, NvListRef};
use std::convert::TryFrom;
use std::sync::Arc;
use util::get_tunable;
use util::maybe_die_with;
Expand Down Expand Up @@ -72,7 +71,7 @@ impl RootConnectionState {
server.register_handler("flush writes", Box::new(Self::flush_writes));
server.register_handler("end txg", Box::new(Self::end_txg));
server.register_handler("write block", Box::new(Self::write_block));
server.register_handler("free block", Box::new(Self::free_block));
server.register_handler("free blocks", Box::new(Self::free_blocks));
server.register_handler("read block", Box::new(Self::read_block));
server.register_handler("get stats", Box::new(Self::get_stats));
server.register_handler("close pool", Box::new(Self::close_pool));
Expand Down Expand Up @@ -314,14 +313,14 @@ impl RootConnectionState {
}))
}

fn free_block(&mut self, nvl: NvList) -> HandlerReturn {
fn free_blocks(&mut self, nvl: NvList) -> HandlerReturn {
trace!("got request: {:?}", nvl);
let block = BlockId(nvl.lookup_uint64("block")?);
let size = u32::try_from(nvl.lookup_uint64("size")?)?;
let blocks = u64_array_value(&nvl, "block")?;
let sizes = u32_array_value(&nvl, "size")?;

let pool = self.pool.as_ref().ok_or_else(|| anyhow!("no pool open"))?;
pool.free_block(block, size);
maybe_die_with(|| format!("after free block request: {:?}", block));
pool.free_blocks(blocks, sizes);
maybe_die_with(|| "after free block request".to_string());
handler_return_ok(None)
}

Expand Down Expand Up @@ -482,8 +481,8 @@ where
}
}

/// Get the BoolV type value, or if not present then default to false.
/// Return Err if value is present but not BoolV type.
/// Get the uint8_array type value.
/// Return Err if value is not present, or not the expected type.
fn u8_array_value<S>(nvl: &NvListRef, name: S) -> Result<&[u8]>
where
S: CStrArgument,
Expand All @@ -495,3 +494,31 @@ where
Err(anyhow!("pair {:?} not expected type (uint8_array)", pair))
}
}

/// Get the uint32_array type value.
/// Return Err if value is not present, or not the expected type.
fn u32_array_value<S>(nvl: &NvListRef, name: S) -> Result<&[u32]>
where
S: CStrArgument,
{
let pair = nvl.lookup(name)?;
if let NvData::Uint32Array(slice) = pair.data() {
Ok(slice)
} else {
Err(anyhow!("pair {:?} not expected type (uint32_array)", pair))
}
}

/// Get the uint64_array type value.
/// Return Err if value is not present, or not the expected type.
fn u64_array_value<S>(nvl: &NvListRef, name: S) -> Result<&[u64]>
where
S: CStrArgument,
{
let pair = nvl.lookup(name)?;
if let NvData::Uint64Array(slice) = pair.data() {
Ok(slice)
} else {
Err(anyhow!("pair {:?} not expected type (uint64_array)", pair))
}
}
15 changes: 10 additions & 5 deletions cmd/zfs_object_agent/zettaobject/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use anyhow::anyhow;
use anyhow::Result;
use futures::{future, Future, FutureExt};
use lazy_static::lazy_static;
use log::*;
use nvpair::{NvEncoding, NvList};
use std::collections::HashMap;
Expand All @@ -19,8 +20,15 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, Mutex};
use util::get_tunable;
use util::From64;

lazy_static! {
// max zfs block size is 16MB
pub static ref UNREASONABLE_REQUEST_SIZE: u64 =
get_tunable("unreasonable_request_size", 20_000_000);
}

// Ss: ServerState (consumer's state associated with the server)
// Cs: ConnectionState (consumer's state associated with the connection)
pub struct Server<Ss, Cs> {
Expand Down Expand Up @@ -126,15 +134,12 @@ impl<Ss: Send + Sync + 'static, Cs: Send + Sync + 'static> Server<Ss, Cs> {
// XXX kernel sends this as host byte order
let len64 = input.read_u64_le().await?;
//trace!("got request len: {}", len64);
if len64 > 20_000_000 {
// max zfs block size is 16MB
if len64 > *UNREASONABLE_REQUEST_SIZE {
panic!("got unreasonable request length {} ({:#x})", len64, len64);
}

let mut v = Vec::new();
// XXX would be nice if we didn't have to zero it out. Should be able
// to do that using read_buf(), treating the Vec as a BufMut, but will
// require multiple calls to do the equivalent of read_exact().
// XXX Would be nice if we didn't have to zero it out.
v.resize(usize::from64(len64), 0);
input.read_exact(v.as_mut()).await?;
let nvl = NvList::try_unpack(v.as_ref()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion include/sys/vdev_object_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#define AGENT_TYPE_READ_DONE "read done"
#define AGENT_TYPE_WRITE_BLOCK "write block"
#define AGENT_TYPE_WRITE_DONE "write done"
#define AGENT_TYPE_FREE_BLOCK "free block"
#define AGENT_TYPE_FREE_BLOCKS "free blocks"
#define AGENT_TYPE_BEGIN_TXG "begin txg"
#define AGENT_TYPE_RESUME_COMPLETE "resume complete"
#define AGENT_TYPE_END_TXG "end txg"
Expand Down
74 changes: 59 additions & 15 deletions module/os/linux/zfs/vdev_object_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ struct sockaddr_un zfs_root_socket = {
AF_UNIX, "/etc/zfs/zfs_root_socket"
};

/*
* Free in batches of 100,000 blocks, to limit memory usage. Note that
* the Agent accepts requests of up to ~20MB, and each block uses 12 bytes,
* so the max "free blocks" request is ~1.2MB.
*/
int vdev_object_store_max_frees = 100000;

typedef enum {
VOS_SOCK_CLOSED = (1 << 0),
VOS_SOCK_SHUTTING_DOWN = (1 << 1),
Expand Down Expand Up @@ -118,6 +125,7 @@ typedef struct vdev_object_store {
uint64_t vos_flush_point;

list_t vos_free_list;
uint64_t vos_free_list_len;
} vdev_object_store_t;

/*
Expand Down Expand Up @@ -473,37 +481,68 @@ object_store_stop_agent(vdev_t *vd)
agent_wait_serial(vos, VOS_SERIAL_CLOSE_POOL);
}

static int
agent_free_blocks_impl(vdev_object_store_t *vos,
uint64_t *blkids, uint32_t *sizes, int num)
{
nvlist_t *nv = fnvlist_alloc();
fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_FREE_BLOCKS);
fnvlist_add_uint64_array(nv, AGENT_BLKID, blkids, num);
fnvlist_add_uint32_array(nv, AGENT_SIZE, sizes, num);
int err = agent_request(vos, nv, FTAG);
fnvlist_free(nv);
if (err == 0) {
zfs_dbgmsg("agent_free_blocks freed %d blocks", num);
} else {
zfs_dbgmsg("agnet_free_blocks failed to send: %d", err);
}
return (err);
}

static int
agent_free_blocks(vdev_object_store_t *vos)
{
ASSERT(MUTEX_HELD(&vos->vos_sock_lock));

int blocks_freed = 0;
int buf_len = MIN(vos->vos_free_list_len, vdev_object_store_max_frees);
if (buf_len == 0)
return (0);
int err = 0;

uint64_t *blkid_array =
vmem_alloc(buf_len * sizeof (*blkid_array), KM_SLEEP);
uint32_t *size_array =
vmem_alloc(buf_len * sizeof (*size_array), KM_SLEEP);

int num_freed = 0;
for (object_store_free_block_t *osfb = list_head(&vos->vos_free_list);
osfb != NULL; osfb = list_next(&vos->vos_free_list, osfb)) {

blocks_freed++;
uint64_t blockid = osfb->osfb_offset >> 9;
nvlist_t *nv = fnvlist_alloc();
fnvlist_add_string(nv, AGENT_TYPE, AGENT_TYPE_FREE_BLOCK);
blkid_array[num_freed] = blockid;
size_array[num_freed] = osfb->osfb_size;
num_freed++;

fnvlist_add_uint64(nv, AGENT_BLKID, blockid);
fnvlist_add_uint64(nv, AGENT_SIZE, osfb->osfb_size);
if (zfs_flags & ZFS_DEBUG_OBJECT_STORE) {
zfs_dbgmsg("agent_free_blocks(blkid=%llu, asize=%llu)",
(u_longlong_t)blockid,
(u_longlong_t)osfb->osfb_size);
}
int err = agent_request(vos, nv, FTAG);
if (err != 0) {
fnvlist_free(nv);
zfs_dbgmsg("agnet_free_block failed to send: %d", err);
return (err);

if (num_freed == buf_len) {
err = agent_free_blocks_impl(vos,
blkid_array, size_array, num_freed);
num_freed = 0;
if (err != 0)
break;
}
fnvlist_free(nv);
}
zfs_dbgmsg("agent_free_blocks freed %d blocks", blocks_freed);
return (0);
if (num_freed != 0) {
err = agent_free_blocks_impl(vos,
blkid_array, size_array, num_freed);
}
vmem_free(blkid_array, buf_len * sizeof (*blkid_array));
vmem_free(size_array, buf_len * sizeof (*size_array));
return (err);
}

static void
Expand Down Expand Up @@ -910,10 +949,14 @@ object_store_end_txg(vdev_t *vd, nvlist_t *config, uint64_t txg)
agent_wait_serial(vos, VOS_SERIAL_END_TXG);

object_store_free_block_t *osfb;
uint64_t len = 0;
while ((osfb = list_remove_head(&vos->vos_free_list)) != NULL) {
kmem_free(osfb, sizeof (object_store_free_block_t));
len++;
}
ASSERT(list_is_empty(&vos->vos_free_list));
ASSERT3U(len, ==, vos->vos_free_list_len);
vos->vos_free_list_len = 0;
vos->vos_send_txg_selector = VOS_TXG_NONE;
}

Expand All @@ -933,6 +976,7 @@ object_store_free_block(vdev_t *vd, uint64_t offset, uint64_t asize)
osfb->osfb_offset = offset;
osfb->osfb_size = asize;
list_insert_tail(&vos->vos_free_list, osfb);
vos->vos_free_list_len++;
}

void
Expand Down

0 comments on commit 2710c72

Please sign in to comment.