Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Register memory mapped files and raise when written to #16208

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
//! assert!(df.equals(&df_read));
//! ```
use std::io::{Read, Seek};
use std::path::PathBuf;

use arrow::datatypes::ArrowSchemaRef;
use arrow::io::ipc::read;
Expand Down Expand Up @@ -79,7 +80,8 @@ pub struct IpcReader<R: MmapBytesReader> {
pub(super) projection: Option<Vec<usize>>,
pub(crate) columns: Option<Vec<String>>,
pub(super) row_index: Option<RowIndex>,
memory_map: bool,
// Stores the as key semaphore to make sure we don't write to the memory mapped file.
pub(super) memory_map: Option<PathBuf>,
metadata: Option<read::FileMetadata>,
schema: Option<ArrowSchemaRef>,
}
Expand Down Expand Up @@ -138,8 +140,9 @@ impl<R: MmapBytesReader> IpcReader<R> {
}

/// Set if the file is to be memory_mapped. Only works with uncompressed files.
pub fn memory_mapped(mut self, toggle: bool) -> Self {
self.memory_map = toggle;
/// The file name must be passed to register the memory mapped file.
pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
self.memory_map = path_buf;
self
}

Expand All @@ -150,7 +153,7 @@ impl<R: MmapBytesReader> IpcReader<R> {
predicate: Option<Arc<dyn PhysicalIoExpr>>,
verbose: bool,
) -> PolarsResult<DataFrame> {
if self.memory_map && self.reader.to_file().is_some() {
if self.memory_map.is_some() && self.reader.to_file().is_some() {
if verbose {
eprintln!("memory map ipc file")
}
Expand Down Expand Up @@ -199,7 +202,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
columns: None,
projection: None,
row_index: None,
memory_map: true,
memory_map: None,
metadata: None,
schema: None,
}
Expand All @@ -211,7 +214,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
}

fn finish(mut self) -> PolarsResult<DataFrame> {
if self.memory_map && self.reader.to_file().is_some() {
if self.memory_map.is_some() && self.reader.to_file().is_some() {
match self.finish_memmapped(None) {
Ok(df) => return Ok(df),
Err(err) => check_mmap_err(err)?,
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow::record_batch::RecordBatch;
use memmap::Mmap;
use polars_core::prelude::*;

use super::ipc_file::IpcReader;
use crate::mmap::MmapBytesReader;
use crate::mmap::{MMapSemaphore, MmapBytesReader};
use crate::predicates::PhysicalIoExpr;
use crate::shared::{finish_reader, ArrowReader};
use crate::utils::{apply_projection, columns_to_projection};
Expand All @@ -19,7 +18,10 @@ impl<R: MmapBytesReader> IpcReader<R> {
match self.reader.to_file() {
Some(file) => {
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
let mmap_key = self.memory_map.take().unwrap();
let semaphore = MMapSemaphore::new(mmap_key, mmap);
let metadata =
read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;

if let Some(columns) = &self.columns {
let schema = &metadata.schema;
Expand All @@ -33,7 +35,7 @@ impl<R: MmapBytesReader> IpcReader<R> {
metadata.schema.clone()
};

let reader = MMapChunkIter::new(mmap, metadata, &self.projection)?;
let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;

finish_reader(
reader,
Expand All @@ -53,20 +55,18 @@ impl<R: MmapBytesReader> IpcReader<R> {
struct MMapChunkIter<'a> {
dictionaries: Dictionaries,
metadata: FileMetadata,
mmap: Arc<Mmap>,
mmap: Arc<MMapSemaphore>,
idx: usize,
end: usize,
projection: &'a Option<Vec<usize>>,
}

impl<'a> MMapChunkIter<'a> {
fn new(
mmap: Mmap,
mmap: Arc<MMapSemaphore>,
metadata: FileMetadata,
projection: &'a Option<Vec<usize>>,
) -> PolarsResult<Self> {
let mmap = Arc::new(mmap);

let end = metadata.blocks.len();
// mmap the dictionaries
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
Expand Down
58 changes: 58 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,63 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::path::{Path, PathBuf};
use std::sync::Mutex;

use memmap::Mmap;
use once_cell::sync::Lazy;
use polars_error::{polars_bail, PolarsResult};
use polars_utils::create_file;

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
static MEMORY_MAPPED_FILES: Lazy<Mutex<BTreeMap<PathBuf, u32>>> =
Lazy::new(|| Mutex::new(Default::default()));

pub(crate) struct MMapSemaphore {
path: PathBuf,
mmap: Mmap,
}

impl MMapSemaphore {
pub(super) fn new(path: PathBuf, mmap: Mmap) -> Self {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
guard.insert(path.clone(), 1);
Self { path, mmap }
}
}

impl AsRef<[u8]> for MMapSemaphore {
#[inline]
fn as_ref(&self) -> &[u8] {
self.mmap.as_ref()
}
}

impl Drop for MMapSemaphore {
fn drop(&mut self) {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
if let Entry::Occupied(mut e) = guard.entry(std::mem::take(&mut self.path)) {
let v = e.get_mut();
*v -= 1;

if *v == 0 {
e.remove_entry();
}
}
}
}

/// Open a file to get write access. This will check if the file is currently registered as memory mapped.
pub fn try_create_file(path: &Path) -> PolarsResult<File> {
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
if guard.contains_key(path) {
polars_bail!(ComputeError: "cannot write to file: already memory mapped")
}
drop(guard);
create_file(path)
}

/// Trait used to get a hold to file handler or to the underlying bytes
/// without performing a Read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ impl IpcExec {

let file = std::fs::File::open(path)?;

let memory_mapped = if self.options.memory_map {
Some(path.clone())
} else {
None
};

let df = IpcReader::new(file)
.with_n_rows(
// NOTE: If there is any file that by itself exceeds the
Expand All @@ -108,7 +114,7 @@ impl IpcExec {
)
.with_row_index(self.file_options.row_index.clone())
.with_projection(projection.clone())
.memory_mapped(self.options.memory_map)
.memory_mapped(memory_mapped)
.finish()?;

row_counter
Expand Down
31 changes: 20 additions & 11 deletions crates/polars-utils/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use std::fs::File;
use std::io::Error;
use std::io;
use std::path::Path;

use polars_error::*;

fn map_err(path: &Path, err: io::Error) -> PolarsError {
let path = path.to_string_lossy();
let msg = if path.len() > 88 {
let truncated_path: String = path.chars().skip(path.len() - 88).collect();
format!("{err}: ...{truncated_path}")
} else {
format!("{err}: {path}")
};
io::Error::new(err.kind(), msg).into()
}

pub fn open_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
std::fs::File::open(&path).map_err(|err| {
let path = path.as_ref().to_string_lossy();
let msg = if path.len() > 88 {
let truncated_path: String = path.chars().skip(path.len() - 88).collect();
format!("{err}: ...{truncated_path}")
} else {
format!("{err}: {path}")
};
Error::new(err.kind(), msg).into()
})
File::open(&path).map_err(|err| map_err(path.as_ref(), err))
}

pub fn create_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
File::create(&path).map_err(|err| map_err(path.as_ref(), err))
}
2 changes: 1 addition & 1 deletion crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ pub mod ord;
pub mod partitioned;

pub use index::{IdxSize, NullableIdxSize};
pub use io::open_file;
pub use io::*;
15 changes: 10 additions & 5 deletions py-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::ops::Deref;

#[cfg(feature = "avro")]
use polars::io::avro::AvroCompression;
use polars::io::mmap::ReaderBytes;
use polars::io::mmap::{try_create_file, ReaderBytes};
use polars::io::RowIndex;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
Expand All @@ -14,7 +14,8 @@ use super::*;
use crate::conversion::parse_parquet_compression;
use crate::conversion::Wrap;
use crate::file::{
get_either_file, get_file_like, get_mmap_bytes_reader, read_if_bytesio, EitherRustPythonFile,
get_either_file, get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path,
read_if_bytesio, EitherRustPythonFile,
};

#[pymethods]
Expand Down Expand Up @@ -279,14 +280,16 @@ impl PyDataFrame {
offset,
});
py_f = read_if_bytesio(py_f);
let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;

let mmap_path = if memory_map { mmap_path } else { None };
let df = py.allow_threads(move || {
IpcReader::new(mmap_bytes_r)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_index)
.memory_mapped(memory_map)
.memory_mapped(mmap_path)
.finish()
.map_err(PyPolarsErr::from)
})?;
Expand Down Expand Up @@ -488,7 +491,9 @@ impl PyDataFrame {
future: bool,
) -> PyResult<()> {
if let Ok(s) = py_f.extract::<PyBackedStr>(py) {
let f = std::fs::File::create(&*s)?;
let s: &str = s.as_ref();
let path = std::path::Path::new(s);
let f = try_create_file(path).map_err(PyPolarsErr::from)?;
py.allow_threads(|| {
IpcWriter::new(f)
.with_compression(compression.0)
Expand Down
17 changes: 12 additions & 5 deletions py-polars/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fs::File;
use std::io;
use std::io::{BufReader, Cursor, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;

use polars::io::mmap::MmapBytesReader;
use polars_error::polars_warn;
Expand Down Expand Up @@ -218,17 +219,23 @@ pub fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
pub fn get_mmap_bytes_reader<'a>(
py_f: &'a Bound<'a, PyAny>,
) -> PyResult<Box<dyn MmapBytesReader + 'a>> {
get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
}

pub fn get_mmap_bytes_reader_and_path<'a>(
py_f: &'a Bound<'a, PyAny>,
) -> PyResult<(Box<dyn MmapBytesReader + 'a>, Option<PathBuf>)> {
// bytes object
if let Ok(bytes) = py_f.downcast::<PyBytes>() {
Ok(Box::new(Cursor::new(bytes.as_bytes())))
Ok((Box::new(Cursor::new(bytes.as_bytes())), None))
}
// string so read file
else if let Ok(pstring) = py_f.downcast::<PyString>() {
let s = pstring.to_cow()?;
let p = std::path::Path::new(&*s);
let p = resolve_homedir(p);
let f = polars_utils::open_file(p).map_err(PyPolarsErr::from)?;
Ok(Box::new(f))
let p_resolved = resolve_homedir(p);
let f = polars_utils::open_file(p_resolved).map_err(PyPolarsErr::from)?;
Ok((Box::new(f), Some(p.to_path_buf())))
}
// hopefully a normal python file: with open(...) as f:.
else {
Expand All @@ -242,6 +249,6 @@ pub fn get_mmap_bytes_reader<'a>(
let f = Python::with_gil(|py| {
PyFileLikeObject::with_requirements(py_f.to_object(py), true, false, true)
})?;
Ok(Box::new(f))
Ok((Box::new(f), None))
}
}
16 changes: 16 additions & 0 deletions py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,19 @@ def test_ipc_decimal_15920(
path = f"{tmp_path}/data"
df.write_ipc(path)
assert_frame_equal(pl.read_ipc(path), df)


@pytest.mark.write_disk()
def test_ipc_raise_on_writing_mmap(tmp_path: Path) -> None:
p = tmp_path / "foo.ipc"
df = pl.DataFrame({"foo": [1, 2, 3]})
# first write is allowed
df.write_ipc(p)

# now open as memory mapped
df = pl.read_ipc(p, memory_map=True)

with pytest.raises(
pl.ComputeError, match="cannot write to file: already memory mapped"
):
df.write_ipc(p)