From e369bd7769281393c38f50119f3b650890ed294a Mon Sep 17 00:00:00 2001 From: "Jonathan D. Simms" Date: Wed, 27 Jun 2018 18:44:58 -0400 Subject: [PATCH] shit, revert this mess --- src/storage/cdb/cdb_rs/src/cdb/errors.rs | 6 +- src/storage/cdb/cdb_rs/src/cdb/mod.rs | 192 +++++++++++++++++----- src/storage/cdb/cdb_rs/src/cdb/storage.rs | 90 +++++----- src/storage/cdb/cdb_utils/src/main.rs | 10 +- 4 files changed, 202 insertions(+), 96 deletions(-) diff --git a/src/storage/cdb/cdb_rs/src/cdb/errors.rs b/src/storage/cdb/cdb_rs/src/cdb/errors.rs index 74f95104a..07260588b 100644 --- a/src/storage/cdb/cdb_rs/src/cdb/errors.rs +++ b/src/storage/cdb/cdb_rs/src/cdb/errors.rs @@ -1,9 +1,9 @@ use std::error; +use std::ffi::NulError; use std::fmt; use std::io; use std::num::ParseIntError; use std::str::Utf8Error; -use std::ffi::NulError; #[derive(Debug)] pub enum CDBError { @@ -32,7 +32,9 @@ impl From for CDBError { } impl From for CDBError { - fn from(err: NulError) -> CDBError { CDBError::NulError(err) } + fn from(err: NulError) -> CDBError { + CDBError::NulError(err) + } } impl error::Error for CDBError { diff --git a/src/storage/cdb/cdb_rs/src/cdb/mod.rs b/src/storage/cdb/cdb_rs/src/cdb/mod.rs index be835223b..a026db997 100644 --- a/src/storage/cdb/cdb_rs/src/cdb/mod.rs +++ b/src/storage/cdb/cdb_rs/src/cdb/mod.rs @@ -6,10 +6,10 @@ use std::result; pub mod errors; pub mod input; -pub mod writer; pub mod storage; +pub mod writer; -use self::storage::SliceFactory; +use self::storage::Sliceable; pub use self::errors::CDBError; @@ -124,7 +124,6 @@ impl KV { } } - struct IndexEntry { hash: CDBHash, // the hash of the stored key ptr: usize, // pointer to the absolute position of the data in the db @@ -141,7 +140,12 @@ pub struct KVIter { impl KVIter { fn new(cdb: Box) -> Result { let bkt = cdb.bucket_at(0)?; - Ok(KVIter{cdb, bkt_idx: 0, entry_n: 0, bkt}) + Ok(KVIter { + cdb, + bkt_idx: 0, + entry_n: 0, + bkt, + }) } } @@ -151,7 +155,7 @@ impl Iterator for KVIter { fn next(&mut self) -> Option { loop { if self.bkt_idx >= MAIN_TABLE_SIZE { - return None + return None; } if self.entry_n >= self.bkt.num_ents { @@ -163,24 +167,23 @@ impl Iterator for KVIter { Err(err) => return Some(Err(err)), } } - continue + continue; } - let idx_ent = - match self.cdb.index_entry_at(self.bkt.entry_n_pos(self.entry_n)) { - Ok(index_entry) => index_entry, - Err(err) => return Some(Err(err)), - }; + let idx_ent = match self.cdb.index_entry_at(self.bkt.entry_n_pos(self.entry_n)) { + Ok(index_entry) => index_entry, + Err(err) => return Some(Err(err)), + }; self.entry_n += 1; if idx_ent.ptr == 0 { - continue + continue; } else { return match self.cdb.get_kv(idx_ent) { Ok(kv) => Some(Ok(kv)), Err(err) => Some(Err(err)), - } + }; } } } @@ -189,29 +192,36 @@ impl Iterator for KVIter { #[derive(Debug)] #[repr(C)] pub struct CDB { - data: SliceFactory, + inner: Box, } impl Clone for CDB { fn clone(&self) -> Self { - CDB{data: self.data.clone()} + CDB { + inner: Box::new((*self.inner).clone()), + } } } impl CDB { - pub fn new(sf: SliceFactory) -> CDB { CDB{data: sf} } - - pub fn stdio(path: &str) -> Result { - Ok(CDB::new(SliceFactory::make_filewrap(path)?)) + pub fn new(inner: T) -> CDB + where + T: Sliceable + Clone, + { + CDB{inner: Box::new(inner)} } - pub fn mmap(path: &str) -> Result { - Ok(CDB::new(SliceFactory::make_map(path)?)) + pub fn stdio(path: &str) -> Result { + Ok(CDB::new(Sliceable::make_filewrap(path)?)) } - pub fn load(path: &str) -> Result { - Ok(CDB::new(SliceFactory::load(path)?)) - } + // pub fn mmap(path: &str) -> Result { + // Ok(CDB::new(Sliceable::make_map(path)?)) + // } + // + // pub fn load(path: &str) -> Result { + // Ok(CDB::new(Sliceable::load(path)?)) + // } pub fn kvs_iter(&self) -> Result { Ok(KVIter::new(Box::new(self.clone()))?) @@ -223,7 +233,7 @@ impl CDB { let off = 8 * idx; - let b = self.data.slice(off, off + 8)?; + let b = self.data.slice(off..(off + 8))?; assert_eq!(b.len(), 8); trace!("bucket_at idx: {}, got buf: {:?}", idx, b); @@ -232,7 +242,7 @@ impl CDB { let ptr = buf.get_u32_le() as usize; let num_ents = buf.get_u32_le() as usize; - Ok(Bucket{ptr, num_ents}) + Ok(Bucket { ptr, num_ents }) } // returns the index entry at absolute position 'pos' in the db @@ -244,16 +254,16 @@ impl CDB { panic!("position {:?} was in the main table!", pos) } - let mut b = self.data.slice(pos, pos + 8)?.into_buf(); + let mut b = self.data.slice(pos..(pos + 8))?.into_buf(); let hash = CDBHash(b.get_u32_le()); let ptr = b.get_u32_le() as usize; - Ok(IndexEntry{hash, ptr}) + Ok(IndexEntry { hash, ptr }) } #[inline] fn get_kv(&self, ie: IndexEntry) -> Result { - let b = self.data.slice(ie.ptr, ie.ptr + DATA_HEADER_SIZE)?; + let b = self.data.slice(ie.ptr..(ie.ptr + DATA_HEADER_SIZE))?; let ksize = b.slice_to(4).into_buf().get_u32_le() as usize; let vsize = b.slice_from(4).into_buf().get_u32_le() as usize; @@ -261,10 +271,13 @@ impl CDB { let kstart = ie.ptr + DATA_HEADER_SIZE; let vstart = kstart + ksize; - let k = self.data.slice(kstart, kstart + ksize)?; - let v = self.data.slice(vstart, vstart + vsize)?; + let k = self.data.slice(kstart..(kstart + ksize))?; + let v = self.data.slice(vstart..(vstart + vsize))?; - Ok(KV{k: Bytes::from(k), v: Bytes::from(v)}) + Ok(KV { + k: Bytes::from(k), + v: Bytes::from(v), + }) } pub fn get(&self, key: &[u8], buf: &mut Vec) -> Result> { @@ -301,8 +314,101 @@ impl CDB { } } +trait KVStore { + type S: Sliceable + Clone; + + fn inner<'a>(&'a self) -> &'a Self::S; + + #[inline] + fn bucket_at(&self, idx: usize) -> Result { + assert!(idx < MAIN_TABLE_SIZE); + + let off = 8 * idx; + + let b = self.inner().slice(off..(off + 8))?; + assert_eq!(b.len(), 8); + trace!("bucket_at idx: {}, got buf: {:?}", idx, b); + + let mut buf = b.into_buf(); + + let ptr = buf.get_u32_le() as usize; + let num_ents = buf.get_u32_le() as usize; + + Ok(Bucket { ptr, num_ents }) + } + + // returns the index entry at absolute position 'pos' in the db + #[inline] + fn index_entry_at(&self, pos: IndexEntryPos) -> Result { + let pos: usize = pos.into(); + + if pos < MAIN_TABLE_SIZE_BYTES { + panic!("position {:?} was in the main table!", pos) + } + + let mut b = self.inner().slice(pos..(pos + 8))?.into_buf(); + let hash = CDBHash(b.get_u32_le()); + let ptr = b.get_u32_le() as usize; + + Ok(IndexEntry { hash, ptr }) + } + + #[inline] + fn get_kv(&self, ie: IndexEntry) -> Result { + let b = self.inner().slice(ie.ptr..(ie.ptr + DATA_HEADER_SIZE))?; + + let ksize = b.slice_to(4).into_buf().get_u32_le() as usize; + let vsize = b.slice_from(4).into_buf().get_u32_le() as usize; + + let kstart = ie.ptr + DATA_HEADER_SIZE; + let vstart = kstart + ksize; + + let k = self.inner().slice(kstart..(kstart + ksize))?; + let v = self.inner().slice(vstart..(vstart + vsize))?; + + Ok(KV { + k: Bytes::from(k), + v: Bytes::from(v), + }) + } + + fn get(&self, key: &[u8], buf: &mut Vec) -> Result> { + let key = key.into(); + let hash = CDBHash::new(key); + let bucket = self.bucket_at(hash.table())?; + + if bucket.num_ents == 0 { + trace!("bucket empty, returning none"); + return Ok(None); + } + + let slot = hash.slot(bucket.num_ents); + + for x in 0..bucket.num_ents { + let index_entry_pos = bucket.entry_n_pos((x + slot) % bucket.num_ents); + + let idx_ent = self.index_entry_at(index_entry_pos)?; + + if idx_ent.ptr == 0 { + return Ok(None); + } else if idx_ent.hash == hash { + let kv = self.get_kv(idx_ent)?; + if &kv.k[..] == key { + buf.write_all(&kv.k[..]).unwrap(); + return Ok(Some(kv.k.len())); + } else { + continue; + } + } + } + + Ok(None) + } +} + #[cfg(test)] mod tests { + use super::*; use env_logger; use proptest::collection::vec; use proptest::prelude::*; @@ -313,7 +419,6 @@ mod tests { use std::io::{BufRead, BufReader}; use std::path::Path; use std::path::PathBuf; - use super::*; use tempfile::NamedTempFile; use tinycdb::Cdb as TCDB; @@ -325,7 +430,7 @@ mod tests { struct QueryResult(String, Option); #[allow(dead_code)] - fn create_temp_cdb<'a>(kvs: &Vec<(String, String)>) -> Result { + fn create_temp_cdb<'a>(kvs: &Vec<(String, String)>) -> Result> { let path: PathBuf; { @@ -349,9 +454,9 @@ mod tests { } }).unwrap(); - let sf = self::storage::SliceFactory::load(path.to_str().unwrap())?; + let sf = self::storage::Sliceable::load(path.to_str().unwrap())?; - Ok(sf) + Ok(Box::new(sf)) } proptest! { @@ -370,21 +475,20 @@ mod tests { type QueryResultIter<'a> = Box + 'a>; - fn read_keys<'a>(cdb: &'a CDB, xs: &'a Vec) -> QueryResultIter<'a> { + fn read_keys<'a, T>(cdb: &'a CDB, xs: &'a Vec) -> QueryResultIter<'a> + where + T: Sliceable + Clone, + { Box::new(xs.iter().map(move |x| { let mut buf = Vec::with_capacity(1024 * 1024); let res = cdb.get(x.as_ref(), &mut buf).unwrap(); - QueryResult( - x.clone(), - res.map(|_| String::from_utf8(buf).unwrap()), - ) + QueryResult(x.clone(), res.map(|_| String::from_utf8(buf).unwrap())) })) } #[allow(dead_code)] - fn make_temp_cdb_single_vals(xs: &Vec) -> SliceFactory { - let kvs: Vec<(String, String)> = - xs.iter().map(|k| (k.to_owned(), k.to_owned())).collect(); + fn make_temp_cdb_single_vals(xs: &Vec) -> Sliceable { + let kvs: Vec<(String, String)> = xs.iter().map(|k| (k.to_owned(), k.to_owned())).collect(); create_temp_cdb(&kvs).unwrap() } diff --git a/src/storage/cdb/cdb_rs/src/cdb/storage.rs b/src/storage/cdb/cdb_rs/src/cdb/storage.rs index 4926defe9..975a851ca 100644 --- a/src/storage/cdb/cdb_rs/src/cdb/storage.rs +++ b/src/storage/cdb/cdb_rs/src/cdb/storage.rs @@ -1,19 +1,15 @@ +use super::Result; use bytes::{Buf, Bytes, BytesMut}; use memmap::{Mmap, MmapOptions}; use std::cell::RefCell; use std::fs::File; use std::io::{Cursor, Read}; -use std::ops::Deref; +use std::ops::{Deref, Range}; use std::os::unix::fs::FileExt; use std::sync::Arc; -use super::Result; -#[derive(Debug)] -#[repr(C)] -pub enum SliceFactory { - HeapStorage(Bytes), - MmapStorage(MMapWrap), - StdioStorage(FileWrap), +pub trait Sliceable { + fn slice(&self, r: Range) -> Result; } const BUF_LEN: usize = 8192; @@ -24,15 +20,15 @@ pub fn readybuf(size: usize) -> BytesMut { b } -impl SliceFactory { - pub fn load(path: &str) -> Result { +impl Sliceable { + pub fn load(path: &str) -> Result { let mut f = File::open(path)?; let mut buffer = Vec::new(); f.read_to_end(&mut buffer)?; - Ok(SliceFactory::HeapStorage(Bytes::from(buffer))) + Ok(HeapWrap(Bytes::from(buffer))) } - pub fn make_map(path: &str) -> Result { + pub fn make_map(path: &str) -> Result { let f = File::open(path)?; let mmap: Mmap = unsafe { MmapOptions::new().map(&f)? }; @@ -57,53 +53,43 @@ impl SliceFactory { } debug!("end pretouch pages: {} bytes", count); - Ok(SliceFactory::MmapStorage(MMapWrap::new(mmap))) + Ok(MMapWrap::new(mmap)) } - pub fn make_filewrap(path: &str) -> Result { - Ok(SliceFactory::StdioStorage(FileWrap::open(path)?)) + pub fn make_filewrap(path: &str) -> Result { + Ok(FileWrap::open(path)?) } +} - pub fn slice(&self, start: usize, end: usize) -> Result { - assert!(end >= start); - - if end == start { - return Ok(Bytes::new()); - } +#[derive(Debug)] +#[repr(C)] +pub struct HeapWrap(Bytes); - let range_len = end - start; +impl Sliceable for HeapWrap { + fn slice(&self, r: Range) -> Result { + let Range { start, end } = r; - match self { - SliceFactory::HeapStorage(bytes) => Ok(Bytes::from(&bytes[start..end])), - SliceFactory::MmapStorage(mmap) => { - let mut v = Vec::with_capacity(range_len); - v.extend_from_slice(&mmap[start..end]); - Ok(Bytes::from(v)) - } - SliceFactory::StdioStorage(filewrap) => filewrap.slice(start, end), + if (end - 1) == start { + return Ok(Bytes::new()); } - } -} -impl Clone for SliceFactory { - fn clone(&self) -> Self { - match self { - SliceFactory::HeapStorage(bytes) => SliceFactory::HeapStorage(bytes.clone()), - SliceFactory::MmapStorage(mmap) => SliceFactory::MmapStorage(mmap.clone()), - SliceFactory::StdioStorage(fw) => SliceFactory::StdioStorage(fw.clone()), - } + // TODO(simms): yes this is a heap copy. change to zero-copy once we understand how to + // integrate that + let mut v = Vec::with_capacity(end - start); + v.extend_from_slice(&self.0[start..end]); + Ok(Bytes::from(v)) } } #[derive(Debug)] #[repr(C)] pub struct MMapWrap { - inner: Arc + inner: Arc, } impl MMapWrap { fn new(m: Mmap) -> MMapWrap { - MMapWrap{inner: Arc::new(m)} + MMapWrap { inner: Arc::new(m) } } } @@ -117,7 +103,9 @@ impl Deref for MMapWrap { impl Clone for MMapWrap { fn clone(&self) -> Self { - MMapWrap{inner: self.inner.clone()} + MMapWrap { + inner: self.inner.clone(), + } } } @@ -128,6 +116,18 @@ pub struct FileWrap { path: String, } +impl Sliceable for FileWrap { + fn slice(&self, r: Range) -> Result { + let Range { start, end } = r; + + if (end - 1) == start { + return Ok(Bytes::new()); + } + + self.slice(start, end) + } +} + impl FileWrap { fn new(f: File, path: &str) -> Self { FileWrap { @@ -172,7 +172,7 @@ struct BMString(BytesMut); impl ToString for BMString { fn to_string(&self) -> String { - String::from(self) + String::from(self) } } @@ -184,9 +184,9 @@ impl<'a> From<&'a BMString> for String { #[cfg(test)] mod tests { + use super::*; use std::fs::File; use std::io::prelude::*; - use super::*; use tempfile; fn assert_ok(f: T) @@ -215,7 +215,7 @@ mod tests { #[test] fn file_wrap_slice_test() { - assert_ok(||{ + assert_ok(|| { let fw = FileWrap::temp()?; { diff --git a/src/storage/cdb/cdb_utils/src/main.rs b/src/storage/cdb/cdb_utils/src/main.rs index 38193ea1e..d8612d557 100644 --- a/src/storage/cdb/cdb_utils/src/main.rs +++ b/src/storage/cdb/cdb_utils/src/main.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use std::time::{Duration, Instant}; use cdb_rs::cdb; -use cdb_rs::cdb::storage::SliceFactory; +use cdb_rs::cdb::storage::Sliceable; use cdb_rs::cdb::{Result, CDB}; @@ -130,17 +130,17 @@ fn dur2sec(d: &Duration) -> f64 { } fn randoread(filename: &str, config: &RandoConfig) -> Result<()> { - let sf: SliceFactory; + let sf: Sliceable; let db = if config.use_mmap { - cdb::CDB::new(SliceFactory::make_map(filename)?) + cdb::CDB::new(Sliceable::make_map(filename)?) } else { { if config.use_stdio { - sf = SliceFactory::make_filewrap(filename)?; + sf = Sliceable::make_filewrap(filename)?; } else { - sf = SliceFactory::load(filename)?; + sf = Sliceable::load(filename)?; } } cdb::CDB::new(sf)