Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demux #1150

Merged
merged 9 commits into from
Oct 6, 2021
Merged

demux #1150

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Tantivy 0.16.1
========================
- Major Bugfix on multivalued fastfield. #1151
- Demux operation (@PSeitz)

Tantivy 0.16.0
=========================
Expand Down
59 changes: 57 additions & 2 deletions common/src/bitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ impl BitSet {

/// Deserialize a `BitSet`.
///
#[cfg(test)]
pub fn deserialize(mut data: &[u8]) -> io::Result<Self> {
let max_value: u32 = u32::from_le_bytes(data[..4].try_into().unwrap());
data = &data[4..];
Expand Down Expand Up @@ -247,7 +246,22 @@ impl BitSet {
}
}

/// Intersect with serialized bitset
pub fn intersect_update(&mut self, other: &ReadSerializedBitSet) {
self.intersect_update_with_iter(other.iter_tinysets());
}

/// Intersect with tinysets
fn intersect_update_with_iter(&mut self, other: impl Iterator<Item = TinySet>) {
self.len = 0;
for (left, right) in self.tinysets.iter_mut().zip(other) {
*left = left.intersect(right);
self.len += left.len() as u64;
}
}

/// Returns the number of elements in the `BitSet`.
#[inline]
pub fn len(&self) -> usize {
self.len as usize
}
Expand Down Expand Up @@ -297,6 +311,7 @@ impl BitSet {
.map(|delta_bucket| bucket + delta_bucket as u32)
}

#[inline]
pub fn max_value(&self) -> u32 {
self.max_value
}
Expand Down Expand Up @@ -334,7 +349,7 @@ impl ReadSerializedBitSet {
/// Iterate the tinyset on the fly from serialized data.
///
#[inline]
fn iter_tinysets<'a>(&'a self) -> impl Iterator<Item = TinySet> + 'a {
fn iter_tinysets(&self) -> impl Iterator<Item = TinySet> + '_ {
assert!((self.data.len()) % 8 == 0);
self.data.chunks_exact(8).map(move |chunk| {
let tinyset: TinySet = TinySet::deserialize(chunk.try_into().unwrap()).unwrap();
Expand Down Expand Up @@ -401,6 +416,46 @@ mod tests {
assert_eq!(bitset.len(), 4);
}

#[test]
fn test_bitset_intersect() {
let bitset_serialized = {
let mut bitset = BitSet::with_max_value_and_full(5);
bitset.remove(1);
bitset.remove(3);
let mut out = vec![];
bitset.serialize(&mut out).unwrap();

ReadSerializedBitSet::open(OwnedBytes::new(out))
};

let mut bitset = BitSet::with_max_value_and_full(5);
bitset.remove(1);
bitset.intersect_update(&bitset_serialized);

assert!(bitset.contains(0));
assert!(!bitset.contains(1));
assert!(bitset.contains(2));
assert!(!bitset.contains(3));
assert!(bitset.contains(4));

bitset.intersect_update_with_iter(vec![TinySet::singleton(0)].into_iter());

assert!(bitset.contains(0));
assert!(!bitset.contains(1));
assert!(!bitset.contains(2));
assert!(!bitset.contains(3));
assert!(!bitset.contains(4));
assert_eq!(bitset.len(), 1);

bitset.intersect_update_with_iter(vec![TinySet::singleton(1)].into_iter());
assert!(!bitset.contains(0));
assert!(!bitset.contains(1));
assert!(!bitset.contains(2));
assert!(!bitset.contains(3));
assert!(!bitset.contains(4));
assert_eq!(bitset.len(), 0);
}

#[test]
fn test_read_serialized_bitset_empty() {
let mut bitset = BitSet::with_max_value(5);
Expand Down
2 changes: 1 addition & 1 deletion src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl Index {
/// Creates a multithreaded writer
///
/// Tantivy will automatically define the number of threads to use, but
/// no more than [`MAX_NUM_THREAD`] threads.
/// no more than 8 threads.
/// `overall_heap_size_in_bytes` is the total target memory usage that will be split
/// between a given number of threads.
///
Expand Down
2 changes: 1 addition & 1 deletion src/core/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Searcher {
&self.segment_readers
}

/// Returns the segment_reader associated with the given segment_ordinal
/// Returns the segment_reader associated with the given segment_ord
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
&self.segment_readers[segment_ord as usize]
}
Expand Down
34 changes: 28 additions & 6 deletions src/core/segment_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::core::SegmentId;
use crate::directory::CompositeFile;
use crate::directory::FileSlice;
use crate::error::DataCorruption;
use crate::fastfield::union_alive_bitset;
use crate::fastfield::AliveBitSet;
use crate::fastfield::FacetReader;
use crate::fastfield::FastFieldReaders;
Expand Down Expand Up @@ -140,6 +141,14 @@ impl SegmentReader {

/// Open a new segment for reading.
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
Self::open_with_custom_alive_set(segment, None)
}

/// Open a new segment for reading.
pub fn open_with_custom_alive_set(
segment: &Segment,
custom_bitset: Option<AliveBitSet>,
) -> crate::Result<SegmentReader> {
let termdict_file = segment.open_read(SegmentComponent::Terms)?;
let termdict_composite = CompositeFile::open(&termdict_file)?;

Expand All @@ -164,22 +173,35 @@ impl SegmentReader {
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
let fast_field_readers =
Arc::new(FastFieldReaders::new(schema.clone(), fast_fields_composite));

let fieldnorm_data = segment.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;

let mut num_docs = segment.meta().num_docs();
let max_doc = segment.meta().max_doc();

let alive_bitset_opt = if segment.meta().has_deletes() {
let alive_bitset_bytes = segment.open_read(SegmentComponent::Delete)?.read_bytes()?;
let alive_bitset = AliveBitSet::open(alive_bitset_bytes);
let delete_data = segment.open_read(SegmentComponent::Delete)?;
let mut alive_bitset = AliveBitSet::open(delete_data.read_bytes()?);

if let Some(provided_bitset) = custom_bitset {
assert_eq!(max_doc, provided_bitset.bitset().max_value());
alive_bitset = union_alive_bitset(&alive_bitset, &provided_bitset)?;
num_docs = alive_bitset.num_alive_docs() as u32;
}
Some(alive_bitset)
} else {
None
if let Some(provided_bitset) = custom_bitset {
num_docs = provided_bitset.num_alive_docs() as u32;
Some(provided_bitset)
} else {
None
}
};

Ok(SegmentReader {
inv_idx_reader_cache: Default::default(),
max_doc: segment.meta().max_doc(),
num_docs: segment.meta().num_docs(),
num_docs,
max_doc,
termdict_composite,
postings_composite,
fast_fields_readers: fast_field_readers,
Expand Down
31 changes: 29 additions & 2 deletions src/fastfield/alive_bitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ use ownedbytes::OwnedBytes;
use std::io;
use std::io::Write;

/// Merges (intersects) two AliveBitSet in a new one.
/// The two bitsets need to have the same max_value.
pub fn union_alive_bitset(left: &AliveBitSet, right: &AliveBitSet) -> crate::Result<AliveBitSet> {
assert_eq!(left.bitset().max_value(), right.bitset().max_value());

let mut merged_bitset = BitSet::deserialize(left.data().as_slice())?;
merged_bitset.intersect_update(right.bitset());

let mut alive_bitset_buffer = vec![];
write_alive_bitset(&merged_bitset, &mut alive_bitset_buffer)?;

Ok(AliveBitSet::open(OwnedBytes::new(alive_bitset_buffer)))
}

/// Write a alive `BitSet`
///
/// where `alive_bitset` is the set of alive `DocId`.
Expand All @@ -22,6 +36,7 @@ pub struct AliveBitSet {
num_alive_docs: usize,
bitset: ReadSerializedBitSet,
num_bytes: ByteCount,
data: OwnedBytes,
}

impl AliveBitSet {
Expand All @@ -38,14 +53,21 @@ impl AliveBitSet {
Self::open(alive_bitset_bytes)
}

pub(crate) fn from_bitset(bitset: &BitSet) -> AliveBitSet {
let mut out = vec![];
write_alive_bitset(bitset, &mut out).unwrap();
AliveBitSet::open(OwnedBytes::new(out))
}

/// Opens a delete bitset given its file.
pub fn open(bytes: OwnedBytes) -> AliveBitSet {
let num_bytes = bytes.len();
let bitset = ReadSerializedBitSet::open(bytes);
let bitset = ReadSerializedBitSet::open(bytes.clone());
AliveBitSet {
num_alive_docs: bitset.len(),
bitset,
num_bytes,
data: bytes,
}
}

Expand All @@ -61,7 +83,7 @@ impl AliveBitSet {
!self.is_alive(doc)
}

/// Iterate over the alive docids.
/// Iterate over the alive doc_ids.
#[inline]
pub fn iter_alive(&self) -> impl Iterator<Item = DocId> + '_ {
self.bitset.iter()
Expand All @@ -82,6 +104,11 @@ impl AliveBitSet {
pub fn space_usage(&self) -> ByteCount {
self.num_bytes
}

/// Get underlying bytes.
pub(crate) fn data(&self) -> OwnedBytes {
self.data.clone()
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ values stored.
Read access performance is comparable to that of an array lookup.
*/

pub use self::alive_bitset::union_alive_bitset;
pub use self::alive_bitset::write_alive_bitset;
pub use self::alive_bitset::AliveBitSet;
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
Expand Down
Loading