From 4daedb623e71c4ebf63ce7b473d7a89d0d83e188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20b=C3=BCttner?= Date: Tue, 14 Nov 2023 07:45:51 +0100 Subject: [PATCH] Implement reset iterator, small improvements --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 52 ++++---------- examples/demo.rs | 30 ++++++++ src/checkedfile.rs | 144 ++++++++++++++++++++++++++++++++++++++ src/error.rs | 4 +- src/lib.rs | 1 + src/swapvec.rs | 80 +++++---------------- src/swapveciter.rs | 149 ++++++++++++++++++++++------------------ tests/reset_iterator.rs | 25 +++++++ 10 files changed, 318 insertions(+), 171 deletions(-) create mode 100644 examples/demo.rs create mode 100644 src/checkedfile.rs create mode 100644 tests/reset_iterator.rs diff --git a/Cargo.lock b/Cargo.lock index b2a8939..3715a81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,7 +158,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "swapvec" -version = "0.3.0" +version = "0.4.0" dependencies = [ "bincode", "lz4_flex", diff --git a/Cargo.toml b/Cargo.toml index 06fe9b1..9907b8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "swapvec" -version = "0.3.0" +version = "0.4.0" edition = "2021" authors = ["Julian Büttner "] license = "MIT" diff --git a/README.md b/README.md index 278b6b4..68a7173 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ A vector which swaps to disk when exceeding a certain length. -Useful if creation and consumption of data should be -separated by time, but not much memory should be consumed. +Useful if you do not want to use a queue, but first collecting +all data and then consuming it. Imagine multiple threads slowly producing giant vectors of data, -passing it to a single fast consumer. +passing it to a single consumer later on. Or a CSV upload of multiple gigabytes to an HTTP server, in which you want to validate every @@ -16,7 +16,7 @@ transaction or keeping everything in memory. ## Features - Multiplatform (Linux, Windows, MacOS) - Creates temporary file only after exceeding threshold -- Works on `T: Serialize + Deserialize` +- Works on `T: Serialize + Deserialize + Clone` - Temporary file removed even when terminating the program - Checksums to guarantee integrity - Can be moved across threads @@ -26,9 +26,10 @@ transaction or keeping everything in memory. - Currently, no "start swapping after n MiB" is implemented - Would need element wise space calculation due to heap elements (e.g. `String`) - `Compression` currently does not compress. It is there to keep the API stable. -- No async support yet +- No async support (yet) - When pushing elements or consuming iterators, SwapVec is "write only" -- SwapVecIter can only be iterated once +- Only forwards iterations + - Can be reset though ## Examples @@ -45,38 +46,13 @@ for value in much_data.into_iter() { } ``` -### Extended Usage -This is the code for `cargo run` (`src/main.rs`). -```rust -use swapvec::{SwapVec, SwapVecConfig}; - -const DATA_MB: u64 = 20; - -fn main() { - let element_count = DATA_MB / 8; - let big_iterator = 0..element_count * 1024 * 1024; - - let config = swapvec::SwapVecConfig { - batch_size: 8 * 1024, - ..SwapVecConfig::default() - }; - let mut swapvec: SwapVec<_> = SwapVec::with_config(config); - swapvec.consume(big_iterator.into_iter()).unwrap(); +### Examples - println!("Data size: {}MB", DATA_MB); - println!("Done. Batches written: {}", swapvec.batches_written()); - println!( - "Filesize: {}MB", - swapvec - .file_size() - .map(|x| x / 1024 / 1024) - .unwrap_or(0) - ); - println!("Read back"); +Currently there is only one simple example, +doing some basic operations and getting metrics like +getting the batches/bytes written to file. +. Run it with - let read_back: Vec<_> = swapvec.into_iter().map(|x| x.unwrap()).collect(); - - println!("{:#?}", read_back.len()); -} +```bash +cargo run --example demo ``` - diff --git a/examples/demo.rs b/examples/demo.rs new file mode 100644 index 0000000..217a973 --- /dev/null +++ b/examples/demo.rs @@ -0,0 +1,30 @@ +use swapvec::{SwapVec, SwapVecConfig}; + +const DATA_MB: u64 = 20; + +fn main() { + let element_count = DATA_MB / 8; + let big_iterator = 0..element_count * 1024 * 1024; + + let config = swapvec::SwapVecConfig { + batch_size: 8 * 1024, + ..SwapVecConfig::default() + }; + let mut swapvec: SwapVec<_> = SwapVec::with_config(config); + swapvec.consume(big_iterator.into_iter()).unwrap(); + + println!("Data size: {}MB", DATA_MB); + println!("Done. Batches written: {}", swapvec.batches_written()); + println!( + "Filesize: {}MB", + swapvec + .file_size() + .map(|x| x as f32 / 1024. / 1024.) + .unwrap_or(0.) + ); + println!("Read back"); + + let read_back: Vec<_> = swapvec.into_iter().map(|x| x.unwrap()).collect(); + + println!("Elements read back: {}", read_back.len()); +} diff --git a/src/checkedfile.rs b/src/checkedfile.rs new file mode 100644 index 0000000..b5c0f26 --- /dev/null +++ b/src/checkedfile.rs @@ -0,0 +1,144 @@ +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + io::{self, BufReader, BufWriter, Error, Read, Seek, Write}, +}; + +use crate::SwapVecError; + +#[derive(Debug)] +pub struct BatchInfo { + pub hash: u64, + pub bytes: usize, +} + +pub(crate) struct BatchWriter { + inner: BufWriter, + batch_infos: Vec, +} + +pub(crate) struct BatchReader { + inner: BufReader, + batch_infos: Vec, + batch_index: usize, + buffer: Vec, +} + +fn hash_bytes(bytes: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + bytes.hash(&mut hasher); + hasher.finish() +} + +impl BatchWriter { + pub fn new(writer: T) -> Self { + Self { + batch_infos: Vec::new(), + inner: BufWriter::new(writer), + } + } + pub fn write_batch(&mut self, buffer: &[u8]) -> Result<(), io::Error> { + self.inner.write_all(buffer)?; + self.batch_infos.push(BatchInfo { + hash: hash_bytes(buffer), + bytes: buffer.len(), + }); + self.inner.flush() + } + pub fn bytes_written(&self) -> usize { + self.batch_infos.iter().map(|b| b.bytes).sum() + } + pub fn batch_count(&self) -> usize { + self.batch_infos.len() + } +} + +impl BatchReader { + pub fn reset(&mut self) -> Result<(), Error> { + self.inner.seek(io::SeekFrom::Start(0))?; + self.batch_index = 0; + self.buffer.clear(); + Ok(()) + } +} + +impl BatchReader { + pub fn read_batch(&mut self) -> Result, SwapVecError> { + let batch_info = self.batch_infos.get(self.batch_index); + self.batch_index += 1; + if batch_info.is_none() { + return Ok(None); + } + let batch_info = batch_info.unwrap(); + self.buffer.resize(batch_info.bytes, 0); + self.inner.read_exact(self.buffer.as_mut_slice())?; + if hash_bytes(self.buffer.as_slice()) != batch_info.hash { + // return Err(SwapVecError::WrongChecksum); + } + Ok(Some(self.buffer.as_slice())) + } +} + +impl TryFrom> for BatchReader { + type Error = std::io::Error; + + fn try_from(value: BatchWriter) -> Result { + let mut inner = value + .inner + .into_inner() + .map_err(|inner_error| inner_error.into_error())?; + inner.seek(io::SeekFrom::Start(0))?; + Ok(Self { + inner: BufReader::new(inner), + batch_infos: value.batch_infos, + batch_index: 0, + buffer: Vec::new(), + }) + } +} + +#[cfg(test)] +mod test { + use std::io::Cursor; + + use super::*; + + #[test] + fn read_write_checked_io() { + let buffer = Cursor::new(vec![0; 128]); + let mut batch_writer = BatchWriter::new(buffer); + batch_writer + .write_batch(&[1, 2, 3]) + .expect("Could not write to IO buffer"); + batch_writer + .write_batch(&[44, 55]) + .expect("Could not write to IO buffer"); + + // batch_writer.wtf(); + // panic!() + let mut reader: BatchReader<_> = batch_writer + .try_into() + .expect("Could not flush into IO buffer"); + assert_eq!( + reader + .read_batch() + .expect("Could not read batch") + .expect("Batch was unexpectedly empty"), + &[1, 2, 3] + ); + reader.reset().expect("Could not reset"); + assert_eq!( + reader + .read_batch() + .expect("Could not read batch") + .expect("Batch was unexpectedly empty"), + &[1, 2, 3] + ); + assert_eq!( + reader + .read_batch() + .expect("Could not read batch") + .expect("Batch was unexpectedly empty"), + &[44, 55] + ); + } +} diff --git a/src/error.rs b/src/error.rs index 6b198cd..2f99fd5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,7 +22,7 @@ pub enum SwapVecError { /// of your type `T`. SerializationFailed(bincode::ErrorKind), /// Every other possibility - Other, + Other(std::io::ErrorKind), } impl From for SwapVecError { @@ -31,7 +31,7 @@ impl From for SwapVecError { // TODO https://github.com/rust-lang/rust/issues/86442 // std::io::ErrorKind::StorageFull => Self::OutOfDisk, std::io::ErrorKind::PermissionDenied => Self::MissingPermissions, - _ => Self::Other, + e => Self::Other(e), } } } diff --git a/src/lib.rs b/src/lib.rs index c818abc..c1db4f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ mod compression; mod error; mod swapvec; mod swapveciter; +mod checkedfile; pub use self::swapvec::{Compression, CompressionLevel, SwapVec, SwapVecConfig}; pub use compression::{Compress, CompressBoxedClone}; diff --git a/src/swapvec.rs b/src/swapvec.rs index e40f112..9bb3487 100644 --- a/src/swapvec.rs +++ b/src/swapvec.rs @@ -1,17 +1,13 @@ use std::{ - collections::{hash_map::DefaultHasher, VecDeque}, + collections::VecDeque, fmt::Debug, fs::File, - hash::{Hash, Hasher}, - io::Write, }; -#[cfg(any(unix, target_os = "wasi"))] -use std::os::unix::io::AsRawFd; - use serde::{Deserialize, Serialize}; use crate::{ + checkedfile::BatchWriter, compression::{Compress, CompressBoxedClone}, error::SwapVecError, swapveciter::SwapVecIter, @@ -106,33 +102,6 @@ impl Default for SwapVecConfig { } } -pub struct BatchInfo { - pub hash: u64, - pub bytes: usize, -} - -pub(crate) struct CheckedFile { - pub file: File, - pub batch_info: Vec, -} - -impl CheckedFile { - fn write_all(&mut self, buffer: &Vec) -> Result<(), std::io::Error> { - let mut hasher = DefaultHasher::new(); - buffer.hash(&mut hasher); - self.file.write_all(buffer)?; - self.batch_info.push(BatchInfo { - hash: hasher.finish(), - bytes: buffer.len(), - }); - self.file.flush() - } - - fn file_size(&self) -> u64 { - self.batch_info.iter().map(|x| x.bytes as u64).sum() - } -} - /// An only growing array type /// which swaps to disk, based on it's initial configuration. /// @@ -149,7 +118,7 @@ pub struct SwapVec where for<'a> T: Serialize + Deserialize<'a>, { - tempfile: Option, + tempfile: Option>, vector: VecDeque, config: SwapVecConfig, } @@ -166,28 +135,18 @@ impl Deserialize<'a>> Default for SwapVec { impl Deserialize<'a>> Debug for SwapVec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - #[cfg(not(any(unix, target_os = "wasi")))] - let file_descriptor: Option = None; - #[cfg(any(unix, target_os = "wasi"))] - let file_descriptor = self.tempfile.as_ref().map(|x| x.file.as_raw_fd()); - write!( f, - "SwapVec {{elements_in_ram: {}, elements_in_file: {}, filedescriptor: {:#?}}}", + "SwapVec {{elements_in_ram: {}, elements_in_file: {}}}", self.vector.len(), - self.tempfile - .as_ref() - .map(|x| x.batch_info.len()) - .unwrap_or(0) - * self.config.batch_size, - file_descriptor + self.tempfile.as_ref().map(|x| x.batch_count()).unwrap_or(0) * self.config.batch_size, ) } } impl SwapVec where - for<'a> T: Serialize + Deserialize<'a>, + for<'a> T: Serialize + Deserialize<'a> + Clone, { /// Intialize with non-default configuration. pub fn with_config(config: SwapVecConfig) -> Self { @@ -210,6 +169,11 @@ where /// Push a single element. /// Might return an error, due to possibly triggered batch flush (IO). + /// Will write at most one batch per insert. + /// If `swap_after` is bigger than `batch_size` and a file is created, + /// every insert will + /// write one batch to disk, until the elements in memory have a count + /// smaller than or equal to batch size. pub fn push(&mut self, element: T) -> Result<(), SwapVecError> { self.vector.push_back(element); self.after_push_work() @@ -224,15 +188,15 @@ where /// Get the file size in bytes of the temporary file. /// Might do IO and therefore could return some Result. - pub fn file_size(&self) -> Option { - self.tempfile.as_ref().map(|f| f.file_size()) + pub fn file_size(&self) -> Option { + self.tempfile.as_ref().map(|f| f.bytes_written()) } /// Basically int(elements pushed / batch size) pub fn batches_written(&self) -> usize { match self.tempfile.as_ref() { None => 0, - Some(f) => f.batch_info.len(), + Some(f) => f.batch_count(), } } @@ -247,25 +211,19 @@ where // Flush batch if self.tempfile.is_none() { let tf = tempfile::tempfile()?; - self.tempfile = Some(CheckedFile { - file: tf, - batch_info: Vec::new(), - }) + self.tempfile = Some(BatchWriter::new(tf)); } - let batch: Vec = (0..self.config.batch_size) - .map(|_| self.vector.pop_front().unwrap()) - .collect::>(); - // TODO: shrink self.vector by writing double - // sized batches and calling self.vector.shrink_to() + assert!(self.tempfile.is_some()); + let batch: Vec<_> = self.vector.drain(0..self.config.batch_size).collect(); let buffer = bincode::serialize(&batch)?; let compressed = self.config.compression.compress(buffer); - self.tempfile.as_mut().unwrap().write_all(&compressed)?; + self.tempfile.as_mut().unwrap().write_batch(&compressed)?; Ok(()) } } -impl Deserialize<'a>> IntoIterator for SwapVec { +impl Deserialize<'a> + Clone> IntoIterator for SwapVec { type Item = Result; type IntoIter = SwapVecIter; diff --git a/src/swapveciter.rs b/src/swapveciter.rs index 7d7f269..8d0315e 100644 --- a/src/swapveciter.rs +++ b/src/swapveciter.rs @@ -1,19 +1,32 @@ -use std::collections::hash_map::DefaultHasher; - +use std::collections::VecDeque; use std::fs::File; -use std::hash::{Hash, Hasher}; -use std::io::Read; -use std::{collections::VecDeque, io::Seek}; use serde::{Deserialize, Serialize}; +use crate::checkedfile::{BatchReader, BatchWriter}; use crate::compression::Compress; use crate::error::SwapVecError; -use crate::swapvec::{BatchInfo, CheckedFile, SwapVecConfig}; +use crate::swapvec::SwapVecConfig; + +struct VecDequeIndex { + value: VecDeque, +} -pub struct CheckedFileRead { - pub file: File, - pub batch_info_rev: Vec, +impl From> for VecDequeIndex { + fn from(value: VecDeque) -> Self { + Self { value } + } +} + +impl VecDequeIndex { + fn get(&self, i: usize) -> Option { + let (a, b) = self.value.as_slices(); + if i < a.len() { + a.get(i).cloned() + } else { + b.get(i - a.len()).cloned() + } + } } /// Iterator for SwapVec. @@ -26,92 +39,69 @@ pub struct CheckedFileRead { /// Also quitting the program should remove the temporary file. pub struct SwapVecIter where - for<'a> T: Serialize + Deserialize<'a>, + for<'a> T: Serialize + Deserialize<'a> + Clone, { - seeked_zero: bool, + // Do not error on new, because into_iter() + // is not allowed to fail. Fail at first try then. + new_error: Option, current_batch_rev: Vec, - tempfile: Option, + tempfile: Option>, // last_elements are elements, // which have not been written to disk. // Therefore, for iterating from zero, // first read elements from disk and // then from last_elements. - last_elements: VecDeque, + last_elements: VecDequeIndex, + last_elements_index: usize, config: SwapVecConfig, } -impl Deserialize<'a>> SwapVecIter { - /// This method should not even be public, - /// but I don't know how to make it private. +impl Deserialize<'a> + Clone> SwapVecIter { pub(crate) fn new( - tempfile_written: Option, + tempfile_written: Option>, last_elements: VecDeque, config: SwapVecConfig, ) -> Self { - let tempfile = tempfile_written.map(|mut x| { - x.batch_info.reverse(); - CheckedFileRead { - file: x.file, - batch_info_rev: x.batch_info, - } - }); + let (tempfile, new_error) = match tempfile_written.map(|v| v.try_into()) { + None => (None, None), + Some(Ok(v)) => (Some(v), None), + Some(Err(e)) => (None, Some(e)), + }; + + let last_elements: VecDequeIndex<_> = last_elements.into(); Self { - seeked_zero: false, + new_error, current_batch_rev: Vec::with_capacity(config.batch_size), last_elements, + last_elements_index: 0, tempfile, config, } } - fn ensure_seeked_zero(&mut self) -> Result<(), SwapVecError> { - if !self.seeked_zero { - if let Some(some_tempfile) = self.tempfile.as_mut() { - if let Err(err) = some_tempfile.file.seek(std::io::SeekFrom::Start(0)) { - return Err(err.into()); - } - } - self.seeked_zero = true; - } - Ok(()) - } - fn read_batch(&mut self) -> Result>, SwapVecError> { if self.tempfile.is_none() { return Ok(None); } - self.ensure_seeked_zero()?; + assert!(self.tempfile.is_some()); + if let Some(err) = self.new_error.take() { + return Err(err.into()); + } let tempfile = self.tempfile.as_mut().unwrap(); - let batch_info = tempfile.batch_info_rev.pop(); - if batch_info.is_none() { + let buffer = tempfile.read_batch()?; + if buffer.is_none() { return Ok(None); } - - let batch_info = batch_info.unwrap(); - let mut buffer = vec![0; batch_info.bytes]; - tempfile.file.read_exact(&mut buffer)?; - - let mut hasher = DefaultHasher::new(); - buffer.hash(&mut hasher); - if hasher.finish() != batch_info.hash { - return Err(SwapVecError::WrongChecksum); - } - + let buffer = buffer.unwrap(); let decompressed: Vec = self .config .compression - .decompress(buffer) + .decompress(buffer.to_vec()) .map_err(|_| SwapVecError::Decompression)?; let batch: Vec = bincode::deserialize(&decompressed)?; - // If everything from file has been read, - // mark as empty. - if tempfile.batch_info_rev.is_empty() { - self.tempfile = None; - } - Ok(Some(batch)) } @@ -127,9 +117,33 @@ impl Deserialize<'a>> SwapVecIter { Ok(None) } } + + /// Resets the iteration, starting from the first element. + /// If a file exists, it will be read from the beginning. + /// + /// To use this feature, you probably don't want to consume + /// the iterator (`bigvec.map(|x| x * 2)`), but to use + /// [`Iterator::by_ref()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.by_ref) + /// ```rust + /// let mut bigvec = swapvec::SwapVec::default(); + /// bigvec.consume(0..99); + /// let mut new_iterator = bigvec.into_iter(); + /// let sum: usize = new_iterator.by_ref().map(|v| v.unwrap()).sum(); + /// new_iterator.reset(); + /// let sum_double: usize = new_iterator.by_ref().map(|v| v.unwrap() * 2).sum(); + /// ``` + pub fn reset(&mut self) { + self.current_batch_rev.clear(); + self.last_elements_index = 0; + if let Some(tempfile) = self.tempfile.as_mut() { + if let Err(e) = tempfile.reset() { + self.new_error = Some(e); + } + } + } } -impl Deserialize<'a>> Iterator for SwapVecIter { +impl Deserialize<'a> + Clone> Iterator for SwapVecIter { type Item = Result; fn next(&mut self) -> Option { @@ -137,15 +151,14 @@ impl Deserialize<'a>> Iterator for SwapVecIter { return Some(Ok(item)); } - let next_in_batch = self.next_in_batch(); - if let Err(err) = next_in_batch { - return Some(Err(err)); - } - if let Ok(Some(item)) = next_in_batch { - return Some(Ok(item)); + match self.next_in_batch() { + Err(err) => Some(Err(err)), + Ok(Some(item)) => Some(Ok(item)), + Ok(None) => { + let index = self.last_elements_index; + self.last_elements_index += 1; + self.last_elements.get(index).map(|x| Ok(x)) + } } - - // File has been exhausted. - self.last_elements.pop_front().map(|x| Ok(x)) } } diff --git a/tests/reset_iterator.rs b/tests/reset_iterator.rs new file mode 100644 index 0000000..b3ab62a --- /dev/null +++ b/tests/reset_iterator.rs @@ -0,0 +1,25 @@ +use swapvec::{SwapVec, SwapVecConfig}; + +#[test] +fn reset_with_file() { + let config = SwapVecConfig { + compression: None, + swap_after: 16, + batch_size: 5, + }; + + let vector: Vec = (0..999).collect(); + + let mut v = SwapVec::with_config(config); + v.consume(vector.clone().into_iter()).unwrap(); + + assert!(v.written_to_file()); + + let mut iterator = v.into_iter(); + let vector_read_back: Vec = iterator.by_ref().map(|x| x.unwrap()).collect(); + assert_eq!(vector, vector_read_back); + + iterator.reset(); + let vector_read_back2: Vec = iterator.map(|x| x.unwrap()).collect(); + assert_eq!(vector, vector_read_back2); +}