Skip to content

Commit

Permalink
Implement reset iterator, small improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian büttner committed Nov 14, 2023
1 parent 3369988 commit 4daedb6
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 171 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "swapvec"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["Julian Büttner <git@julianbuettner.dev>"]
license = "MIT"
Expand Down
52 changes: 14 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

A vector which swaps to disk when exceeding a certain length.

Useful if creation and consumption of data should be
separated by time, but not much memory should be consumed.
Useful if you do not want to use a queue, but first collecting
all data and then consuming it.

Imagine multiple threads slowly producing giant vectors of data,
passing it to a single fast consumer.
passing it to a single consumer later on.

Or a CSV upload of multiple gigabytes to an HTTP server,
in which you want to validate every
Expand All @@ -16,7 +16,7 @@ transaction or keeping everything in memory.
## Features
- Multiplatform (Linux, Windows, MacOS)
- Creates temporary file only after exceeding threshold
- Works on `T: Serialize + Deserialize`
- Works on `T: Serialize + Deserialize + Clone`
- Temporary file removed even when terminating the program
- Checksums to guarantee integrity
- Can be moved across threads
Expand All @@ -26,9 +26,10 @@ transaction or keeping everything in memory.
- Currently, no "start swapping after n MiB" is implemented
- Would need element wise space calculation due to heap elements (e.g. `String`)
- `Compression` currently does not compress. It is there to keep the API stable.
- No async support yet
- No async support (yet)
- When pushing elements or consuming iterators, SwapVec is "write only"
- SwapVecIter can only be iterated once
- Only forwards iterations
- Can be reset though

## Examples

Expand All @@ -45,38 +46,13 @@ for value in much_data.into_iter() {
}
```

### Extended Usage
This is the code for `cargo run` (`src/main.rs`).
```rust
use swapvec::{SwapVec, SwapVecConfig};

const DATA_MB: u64 = 20;

fn main() {
let element_count = DATA_MB / 8;
let big_iterator = 0..element_count * 1024 * 1024;

let config = swapvec::SwapVecConfig {
batch_size: 8 * 1024,
..SwapVecConfig::default()
};
let mut swapvec: SwapVec<_> = SwapVec::with_config(config);
swapvec.consume(big_iterator.into_iter()).unwrap();
### Examples

println!("Data size: {}MB", DATA_MB);
println!("Done. Batches written: {}", swapvec.batches_written());
println!(
"Filesize: {}MB",
swapvec
.file_size()
.map(|x| x / 1024 / 1024)
.unwrap_or(0)
);
println!("Read back");
Currently there is only one simple example,
doing some basic operations and getting metrics like
getting the batches/bytes written to file.
. Run it with

let read_back: Vec<_> = swapvec.into_iter().map(|x| x.unwrap()).collect();

println!("{:#?}", read_back.len());
}
```bash
cargo run --example demo
```

30 changes: 30 additions & 0 deletions examples/demo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use swapvec::{SwapVec, SwapVecConfig};

const DATA_MB: u64 = 20;

fn main() {
let element_count = DATA_MB / 8;
let big_iterator = 0..element_count * 1024 * 1024;

let config = swapvec::SwapVecConfig {
batch_size: 8 * 1024,
..SwapVecConfig::default()
};
let mut swapvec: SwapVec<_> = SwapVec::with_config(config);
swapvec.consume(big_iterator.into_iter()).unwrap();

println!("Data size: {}MB", DATA_MB);
println!("Done. Batches written: {}", swapvec.batches_written());
println!(
"Filesize: {}MB",
swapvec
.file_size()
.map(|x| x as f32 / 1024. / 1024.)
.unwrap_or(0.)
);
println!("Read back");

let read_back: Vec<_> = swapvec.into_iter().map(|x| x.unwrap()).collect();

println!("Elements read back: {}", read_back.len());
}
144 changes: 144 additions & 0 deletions src/checkedfile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::{
hash::{DefaultHasher, Hash, Hasher},
io::{self, BufReader, BufWriter, Error, Read, Seek, Write},
};

use crate::SwapVecError;

#[derive(Debug)]
pub struct BatchInfo {
pub hash: u64,
pub bytes: usize,
}

pub(crate) struct BatchWriter<T: Write> {
inner: BufWriter<T>,
batch_infos: Vec<BatchInfo>,
}

pub(crate) struct BatchReader<T: Read> {
inner: BufReader<T>,
batch_infos: Vec<BatchInfo>,
batch_index: usize,
buffer: Vec<u8>,
}

fn hash_bytes(bytes: &[u8]) -> u64 {
let mut hasher = DefaultHasher::new();
bytes.hash(&mut hasher);
hasher.finish()
}

impl<T: Write> BatchWriter<T> {
pub fn new(writer: T) -> Self {
Self {
batch_infos: Vec::new(),
inner: BufWriter::new(writer),
}
}
pub fn write_batch(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
self.inner.write_all(buffer)?;
self.batch_infos.push(BatchInfo {
hash: hash_bytes(buffer),
bytes: buffer.len(),
});
self.inner.flush()
}
pub fn bytes_written(&self) -> usize {
self.batch_infos.iter().map(|b| b.bytes).sum()
}
pub fn batch_count(&self) -> usize {
self.batch_infos.len()
}
}

impl<T: Read + Seek> BatchReader<T> {
pub fn reset(&mut self) -> Result<(), Error> {
self.inner.seek(io::SeekFrom::Start(0))?;
self.batch_index = 0;
self.buffer.clear();
Ok(())
}
}

impl<T: Read> BatchReader<T> {
pub fn read_batch(&mut self) -> Result<Option<&[u8]>, SwapVecError> {
let batch_info = self.batch_infos.get(self.batch_index);
self.batch_index += 1;
if batch_info.is_none() {
return Ok(None);
}
let batch_info = batch_info.unwrap();
self.buffer.resize(batch_info.bytes, 0);
self.inner.read_exact(self.buffer.as_mut_slice())?;
if hash_bytes(self.buffer.as_slice()) != batch_info.hash {
// return Err(SwapVecError::WrongChecksum);
}
Ok(Some(self.buffer.as_slice()))
}
}

impl<T: Read + Write + Seek> TryFrom<BatchWriter<T>> for BatchReader<T> {
type Error = std::io::Error;

fn try_from(value: BatchWriter<T>) -> Result<Self, Self::Error> {
let mut inner = value
.inner
.into_inner()
.map_err(|inner_error| inner_error.into_error())?;
inner.seek(io::SeekFrom::Start(0))?;
Ok(Self {
inner: BufReader::new(inner),
batch_infos: value.batch_infos,
batch_index: 0,
buffer: Vec::new(),
})
}
}

#[cfg(test)]
mod test {
use std::io::Cursor;

use super::*;

#[test]
fn read_write_checked_io() {
let buffer = Cursor::new(vec![0; 128]);
let mut batch_writer = BatchWriter::new(buffer);
batch_writer
.write_batch(&[1, 2, 3])
.expect("Could not write to IO buffer");
batch_writer
.write_batch(&[44, 55])
.expect("Could not write to IO buffer");

// batch_writer.wtf();
// panic!()
let mut reader: BatchReader<_> = batch_writer
.try_into()
.expect("Could not flush into IO buffer");
assert_eq!(
reader
.read_batch()
.expect("Could not read batch")
.expect("Batch was unexpectedly empty"),
&[1, 2, 3]
);
reader.reset().expect("Could not reset");
assert_eq!(
reader
.read_batch()
.expect("Could not read batch")
.expect("Batch was unexpectedly empty"),
&[1, 2, 3]
);
assert_eq!(
reader
.read_batch()
.expect("Could not read batch")
.expect("Batch was unexpectedly empty"),
&[44, 55]
);
}
}
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub enum SwapVecError {
/// of your type `T`.
SerializationFailed(bincode::ErrorKind),
/// Every other possibility
Other,
Other(std::io::ErrorKind),
}

impl From<std::io::Error> for SwapVecError {
Expand All @@ -31,7 +31,7 @@ impl From<std::io::Error> for SwapVecError {
// TODO https://github.com/rust-lang/rust/issues/86442
// std::io::ErrorKind::StorageFull => Self::OutOfDisk,
std::io::ErrorKind::PermissionDenied => Self::MissingPermissions,
_ => Self::Other,
e => Self::Other(e),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod compression;
mod error;
mod swapvec;
mod swapveciter;
mod checkedfile;

pub use self::swapvec::{Compression, CompressionLevel, SwapVec, SwapVecConfig};
pub use compression::{Compress, CompressBoxedClone};
Expand Down
Loading

0 comments on commit 4daedb6

Please sign in to comment.