Skip to content

Commit

Permalink
Completely remove the option of a zero-sized block cache to simplify …
Browse files Browse the repository at this point in the history
…the implementation.
  • Loading branch information
adamreichold committed Jan 10, 2023
1 parent 7a8fce0 commit 1d0fbf4
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 41 deletions.
3 changes: 2 additions & 1 deletion src/core/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::{fmt, io};

Expand Down Expand Up @@ -250,7 +251,7 @@ impl SearcherInner {
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
doc_store_cache_size: NonZeroUsize,
) -> io::Result<SearcherInner> {
assert_eq!(
&segment_readers
Expand Down
6 changes: 4 additions & 2 deletions src/core/segment_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::{Arc, RwLock};
use std::{fmt, io};

Expand Down Expand Up @@ -134,7 +135,7 @@ impl SegmentReader {
}

/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
pub fn get_store_reader(&self, cache_size: NonZeroUsize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_size)
}

Expand Down Expand Up @@ -327,7 +328,8 @@ impl SegmentReader {
self.positions_composite.space_usage(),
self.fast_fields_readers.space_usage(),
self.fieldnorm_readers.space_usage(),
self.get_store_reader(0)?.space_usage(),
self.get_store_reader(NonZeroUsize::new(1).unwrap())?
.space_usage(),
self.alive_bitset_opt
.as_ref()
.map(AliveBitSet::space_usage)
Expand Down
3 changes: 2 additions & 1 deletion src/functional_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;

use rand::{thread_rng, Rng};

Expand All @@ -9,7 +10,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader(1)?;
let store_reader = segment_reader.get_store_reader(NonZeroUsize::new(1).unwrap())?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
Expand Down
5 changes: 3 additions & 2 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io::Write;
use std::num::NonZeroUsize;
use std::sync::Arc;

use fastfield_codecs::VecColumn;
Expand Down Expand Up @@ -953,7 +954,7 @@ impl IndexMerger {
let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader(50))
.map(|reader| reader.get_store_reader(NonZeroUsize::new(50).unwrap()))
.collect::<Result<_, _>>()?;

let mut document_iterators: Vec<_> = store_readers
Expand All @@ -978,7 +979,7 @@ impl IndexMerger {
} else {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader(1)?;
let store_reader = reader.get_store_reader(NonZeroUsize::new(1).unwrap())?;
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
Expand Down
14 changes: 11 additions & 3 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::num::NonZeroUsize;

use fastfield_codecs::MonotonicallyMappableToU64;
use itertools::Itertools;

Expand Down Expand Up @@ -429,8 +431,9 @@ fn remap_and_write(
serializer
.segment()
.open_read(SegmentComponent::TempStore)?,
1, /* The docstore is configured to have one doc per block, and each doc is accessed
* only once: we don't need caching. */
// The docstore is configured to have one doc per block,
// and each doc is accessed only once: we don't need caching.
NonZeroUsize::new(1).unwrap(),
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
Expand All @@ -446,6 +449,7 @@ fn remap_and_write(

#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;
use std::path::Path;

use super::compute_initial_table_size;
Expand Down Expand Up @@ -500,7 +504,11 @@ mod tests {
store_writer.store(&doc, &schema).unwrap();
store_writer.close().unwrap();

let reader = StoreReader::open(directory.open_read(path).unwrap(), 0).unwrap();
let reader = StoreReader::open(
directory.open_read(path).unwrap(),
NonZeroUsize::new(1).unwrap(),
)
.unwrap();
let doc = reader.get(0).unwrap();

assert_eq!(doc.field_values().len(), 2);
Expand Down
14 changes: 9 additions & 5 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod warming;

use std::convert::TryInto;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc, Weak};

Expand Down Expand Up @@ -44,7 +45,7 @@ pub struct IndexReaderBuilder {
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_size: usize,
doc_store_cache_size: NonZeroUsize,
}

impl IndexReaderBuilder {
Expand Down Expand Up @@ -119,7 +120,10 @@ impl IndexReaderBuilder {
///
/// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks.
#[must_use]
pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder {
pub fn doc_store_cache_size(
mut self,
doc_store_cache_size: NonZeroUsize,
) -> IndexReaderBuilder {
self.doc_store_cache_size = doc_store_cache_size;
self
}
Expand Down Expand Up @@ -151,7 +155,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {
}

struct InnerIndexReader {
doc_store_cache_size: usize,
doc_store_cache_size: NonZeroUsize,
index: Index,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
Expand All @@ -161,7 +165,7 @@ struct InnerIndexReader {

impl InnerIndexReader {
fn new(
doc_store_cache_size: usize,
doc_store_cache_size: NonZeroUsize,
index: Index,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
Expand Down Expand Up @@ -214,7 +218,7 @@ impl InnerIndexReader {

fn create_searcher(
index: &Index,
doc_store_cache_size: usize,
doc_store_cache_size: NonZeroUsize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
searcher_generation_inventory: &Inventory<SearcherGeneration>,
Expand Down
18 changes: 11 additions & 7 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mod compression_zstd_block;
#[cfg(test)]
pub mod tests {

use std::num::NonZeroUsize;
use std::path::Path;

use super::*;
Expand Down Expand Up @@ -121,7 +122,7 @@ pub mod tests {
write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
let store = StoreReader::open(store_file, NonZeroUsize::new(10).unwrap())?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -166,7 +167,7 @@ pub mod tests {
write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
let store = StoreReader::open(store_file, NonZeroUsize::new(10).unwrap())?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -253,7 +254,7 @@ pub mod tests {

let searcher = index.reader()?.searcher();
let reader = searcher.segment_reader(0);
let store = reader.get_store_reader(10)?;
let store = reader.get_store_reader(NonZeroUsize::new(10).unwrap())?;
for doc in store.iter(reader.alive_bitset()) {
assert_eq!(
*doc?.get_first(text_field).unwrap().as_text().unwrap(),
Expand Down Expand Up @@ -289,7 +290,7 @@ pub mod tests {
}
assert_eq!(
index.reader().unwrap().searcher().segment_readers()[0]
.get_store_reader(10)
.get_store_reader(NonZeroUsize::new(10).unwrap())
.unwrap()
.decompressor(),
Decompressor::Lz4
Expand All @@ -310,7 +311,9 @@ pub mod tests {
let searcher = index.reader().unwrap().searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader(10).unwrap();
let store = reader
.get_store_reader(NonZeroUsize::new(10).unwrap())
.unwrap();

for doc in store.iter(reader.alive_bitset()).take(50) {
assert_eq!(
Expand Down Expand Up @@ -357,7 +360,7 @@ pub mod tests {
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader(10)?;
let store = reader.get_store_reader(NonZeroUsize::new(10).unwrap())?;
assert_eq!(store.block_checkpoints().count(), 1);
Ok(())
}
Expand All @@ -366,6 +369,7 @@ pub mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {

use std::num::NonZeroUsize;
use std::path::Path;

use test::Bencher;
Expand Down Expand Up @@ -403,7 +407,7 @@ mod bench {
true,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file, 10).unwrap();
let store = StoreReader::open(store_file, NonZeroUsize::new(10).unwrap()).unwrap();
b.iter(|| store.iter(None).collect::<Vec<_>>());
}
}
30 changes: 10 additions & 20 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;

pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100;
pub(crate) const DOCSTORE_CACHE_CAPACITY: NonZeroUsize =
unsafe { NonZeroUsize::new_unchecked(100) };

type Block = OwnedBytes;

Expand All @@ -34,29 +35,23 @@ pub struct StoreReader {

/// The cache for decompressed blocks.
struct BlockCache {
cache: Option<Mutex<LruCache<usize, Block>>>,
cache: Mutex<LruCache<usize, Block>>,
cache_hits: AtomicUsize,
cache_misses: AtomicUsize,
}

impl BlockCache {
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self
.cache
.as_ref()
.and_then(|cache| cache.lock().unwrap().get(&pos).cloned())
{
if let Some(block) = self.cache.lock().unwrap().get(&pos) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Some(block);
return Some(block.clone());
}
self.cache_misses.fetch_add(1, Ordering::SeqCst);
None
}

fn put_into_cache(&self, pos: usize, data: Block) {
if let Some(cache) = self.cache.as_ref() {
cache.lock().unwrap().put(pos, data);
}
self.cache.lock().unwrap().put(pos, data);
}

fn stats(&self) -> CacheStats {
Expand All @@ -68,16 +63,12 @@ impl BlockCache {
}

fn len(&self) -> usize {
self.cache
.as_ref()
.map_or(0, |cache| cache.lock().unwrap().len())
self.cache.lock().unwrap().len()
}

#[cfg(test)]
fn peek_lru(&self) -> Option<usize> {
self.cache
.as_ref()
.and_then(|cache| cache.lock().unwrap().peek_lru().map(|(&k, _)| k))
self.cache.lock().unwrap().peek_lru().map(|(&k, _)| k)
}
}

Expand Down Expand Up @@ -114,7 +105,7 @@ impl Sum for CacheStats {

impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
pub fn open(store_file: FileSlice, cache_size: NonZeroUsize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;

let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
Expand All @@ -125,8 +116,7 @@ impl StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: NonZeroUsize::new(cache_size)
.map(|cache_size| Mutex::new(LruCache::new(cache_size))),
cache: Mutex::new(LruCache::new(cache_size)),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
Expand Down

0 comments on commit 1d0fbf4

Please sign in to comment.