Skip to content

Commit

Permalink
Disk::new() may leak file descriptors (openzfs#459)
Browse files Browse the repository at this point in the history
= Problem

If you try to add a disk that already exists in the ZettaCache we’ll end
up getting to this point:
```
fn add_disk(&mut self, path: &Path) -> Result<()> {
...
  let disk_id = self.block_access.add_disk(Disk::new(path, false)?)?;
```
At that point, `Disk::new()` has succeeded (opening a file descriptor
with a static lifetime to the dev device) but we are failing in
`BlockAccess.add_disk()` (which make us leak the file descriptor).

Here is a test where we can see how the file descriptor to the device
file is leaked:
```
~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l
58
~$ for i in {0..20}; do sudo zcache add /dev/sdb; done
Error: disk "/dev/sdb" ("/dev/sdb") is already part of the zettacache
.. 20 error messages ...
~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l
79
```

= Patch

The file descriptor is kept behind an `Arc` so when the read/write
threads terminate (they do so when all the Senders are dropped and their
`recv()` calls return an error) and the "main" thread drops `Disk`, the
file descriptor is automatically closed.

= Testing

I made sure that the file descriptor count stayed the same performing the above test:
```
~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l
1
~$ for i in {0..20}; do sudo zcache add /dev/sdb; done
Error: disk "/dev/sdb" ("/dev/sdb") is already part of the zettacache
... 20 error messages ...
~$ sudo lsof -p $(pgrep zfs_o) | grep "/dev/sdb" | wc -l
1
```
  • Loading branch information
sdimitro authored Jun 1, 2022
1 parent 27a45ca commit 3075380
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions cmd/zfs_object_agent/zettacache/src/block_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::thread::sleep;
Expand Down Expand Up @@ -141,13 +142,7 @@ pub struct BlockAccess {
#[derive(Derivative)]
#[derivative(Debug)]
pub struct Disk {
// We want all the reader/writer_threads to share the same file descriptor, but we don't have
// a mechanism to ensure that they stop using the fd when the DiskStruct is dropped and the
// fd is closed. To solve this we simply never close the fd. The fd is owned by the File,
// and we leave a reference to it here to indicate that it's related to this Disk, even
// though it's only used via the reader/writer_threads.
#[allow(dead_code)]
file: &'static File,
file: Arc<File>,

path: PathBuf,
canonical_path: PathBuf,
Expand Down Expand Up @@ -217,8 +212,8 @@ impl Disk {
.open(path)
.with_context(|| format!("opening disk {path:?}"))?;
// see comment in `struct Disk`
let file = &*Box::leak(Box::new(file));
let (sector_size, size) = disk_sizes(file)?;
let file = Arc::new(file);
let (sector_size, size) = disk_sizes(&file)?;

let short_name = path.file_name().unwrap().to_string_lossy().to_string();
let canonical_path = Path::new(path).canonicalize()?;
Expand Down Expand Up @@ -260,12 +255,14 @@ impl Disk {
// note, we want to use a "std" thread here rather than
// tokio::task::spawn_blocking() because the latter has a limit of how many
// threads it will create (default 512)
let file = this.file.clone();
std::thread::spawn(move || {
Self::reader_thread(file, io_stats, sector_size, rx);
});
}
if !readonly {
for rx in writer_rxs {
let file = this.file.clone();
std::thread::spawn(move || {
Self::writer_thread(
file,
Expand All @@ -276,6 +273,7 @@ impl Disk {
});
}
for rx in metadata_writer_rxs {
let file = this.file.clone();
std::thread::spawn(move || {
Self::writer_thread(
file,
Expand All @@ -292,17 +290,20 @@ impl Disk {
}

fn reader_thread(
file: &'static File,
file: Arc<File>,
io_stats: &'static DiskIoStats,
sector_size: usize,
rx: flume::Receiver<ReadMessage>,
) {
// When all the Senders of receiver `rx` are dropped (i.e. when the
// respective Disk of this thread is dropped), `recv()` will return an
// error and this thread will terminate without being leaked.
while let Ok(message) = rx.recv() {
let op = OpInProgress::new(&io_stats.stats[message.io_type]);
let vec = measure!()
.func(|| {
pread_aligned(
file,
&file,
message.offset.try_into().unwrap(),
message.size,
sector_size,
Expand Down Expand Up @@ -347,7 +348,7 @@ impl Disk {
}

fn writer_thread(
file: &'static File,
file: Arc<File>,
stat_values: &'static IoStatValues,
sector_size: usize,
mut rx: mpsc::UnboundedReceiver<WriteMessage>,
Expand Down Expand Up @@ -379,6 +380,9 @@ impl Disk {
let mut sorted: BTreeMap<u64, WriteMessage> = BTreeMap::new();
let mut prev_offset = 0;

// When all the Senders of receiver `rx` are dropped (i.e. when the
// respective Disk of this thread is dropped), `blocking_recv()` will
// return an error and this thread will terminate without being leaked.
loop {
// Look for next run of messages in sorted queue
let (run, len) = find_run(iter_wrapping(&sorted, prev_offset));
Expand Down Expand Up @@ -553,7 +557,7 @@ impl BlockAccess {
pub fn expand_disk(&self, disk: DiskId) -> Result<ExpandDiskResponse> {
let disks = self.disks.read().unwrap();
let disk = &disks[disk.index()];
let (_, new_size) = disk_sizes(disk.file)?;
let (_, new_size) = disk_sizes(&disk.file)?;
let mut size = disk.size.lock().unwrap();
let additional_bytes = new_size.checked_sub(*size).ok_or_else(|| {
anyhow!(
Expand Down

0 comments on commit 3075380

Please sign in to comment.