Skip to content

Commit

Permalink
Sort Index/Docids By Field (#1026)
Browse files Browse the repository at this point in the history
* sort index by field

add sort info to IndexSettings
generate docid mapping for sorted field (only fastfield)
remap singlevalue fastfield

* support docid mapping in multivalue fastfield

move docid mapping to serialization step (less intermediate data for mapping)
add support for docid mapping in multivalue fastfield

* handle docid map in bytes fastfield

* forward docid mapping, remap postings

* fix merge conflicts

* move test to index_sorter

* add docid index mapping old->new

add docid mapping for both directions old->new (used in postings) and new->old (used in fast field)
handle mapping in postings recorder
warn instead of info for MAX_TOKEN_LEN

* remap docid in fielnorm

* resort docids in recorder, more extensive tests

* handle index sorting in docstore

handle index sort in docstore, by saving all the docs in a temp docstore file (SegmentComponent::TempStore). On serialization the docid mapping is used to create a docstore in the correct order by reader the old docstore.

add docstore sort tests
refactor tests

* refactor

rename docid doc_id
rename docid_map doc_id_map
rename DocidMapping DocIdMapping
fix  typo

* u32 to DocId

* better doc_id_map creation

remove unstable sort

* add non mut method to FastFieldWriters

add _mut prefix to &mut methods

* remove sort_index

* fix clippy issues

* fix SegmentComponent iterator

use std::mem::replace

* fix test

* fmt

* handle indexsettings deserialize

* add reading, writing bytes to doc store

get bytes of document in doc store
add store_bytes method doc writer to accept serialized document
add serialization index settings test

* rename index_sorter to doc_id_mapping

use bufferlender in recorder

* fix compile issue, make sort_by_field optional

* fix test compile

* validate index settings on merge

validate index settings on merge
forward merge info to SegmentSerializer (for TempStore)

* fix doctest

* add itertools, use kmerge

add itertools, use kmerge
push because rustfmt fails

* implement/test merge for fastfield

implement/test merge for fastfield
rename len to num_deleted in DeleteBitSet

* Use precalculated docid mapping in merger

Use precalculated docid mapping in merger for sorted indices instead of on the fly calculation 
Add index creation macro benchmark, but commented out for now, since it is not really usable due to long runtimes, and extreme fluctuations. May be better suited in criterion or an external bench bin

* fix fast field reader docs

fix fast field reader docs, Error instead of None returned
add u64s_lenient to fastreader
add create docid mapping benchmark

* add test for multifast field merge

refactor test 
add test for multifast field merge

* add num_bytes to BytesFastFieldReader

equivalent to num_vals in MultiValuedFastFieldReader

* add MultiValueLength trait

add MultiValueLength trait in order to unify index creation for BytesFastFieldReader and MultiValuedFastFieldReader in merger

* Add ReaderWithOrdinal, fix 

Add ReaderWithOrdinal to associate data to a reader in merger
Fix bytes offset index creation in merger

* add test for merging bytes with sorted docids

* Merge fieldnorm for sorted index

* handle posting list in merge in sorted index

handle posting list in merge in sorted index by using doc id mapping for sorting
reuse SegmentOrdinal type

* handle doc store order in merge in sorted index

* fix typo, cleanup

* make IndexSetting non-optional

* fix type, rename test file

fix type
rename test file
add  type

* remove SegmentReaderWithOrdinal accessors

* cargo fmt

* add index sort & merge test to include deletes

* Fix posting list merge issue

Fix posting list merge issue - ensure serializer always gets monotonically increasing doc ids
handle sorting and merging for facets field

* performance: cache field readers, use bytes for doc store merge

* change facet merge test to cover index sorting

* add RawDocument abstraction to access bytes in doc store

* fix deserialization, update changelog

fix deserialization
update changelog
forward error on merge failed

* cache store readers to utilize lru cache (4x performance)

cache store readers, to utilize lru cache (4x faster performance, due to less decompress calls on the block)

* add include_temp_doc_store flag in InnerSegmentMeta

unset flag on deserialization and after finalize of a segment
set flag when creating new instances
  • Loading branch information
PSeitz authored May 17, 2021
1 parent 6ca27b6 commit d523543
Show file tree
Hide file tree
Showing 38 changed files with 1,945 additions and 351 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Tantivy 0.15.0
- Simplified positions index format (@fulmicoton) #1022
- Moved bitpacking to bitpacker subcrate and add BlockedBitpacker, which bitpacks blocks of 128 elements (@PSeitz) #1030
- Added support for more-like-this query in tantivy (@evanxg852000) #1011
- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations). #1026

Tantivy 0.14.0
=========================
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ smallvec = "1"
rayon = "1"
lru = "0.6"
fastdivide = "0.3"
itertools = "0.10.0"

[target.'cfg(windows)'.dependencies]
winapi = "0.3"
Expand Down
6 changes: 5 additions & 1 deletion bitpacker/src/bitpacker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ pub struct BitPacker {
mini_buffer: u64,
mini_buffer_written: usize,
}

impl Default for BitPacker {
fn default() -> Self {
BitPacker::new()
}
}
impl BitPacker {
pub fn new() -> BitPacker {
BitPacker {
Expand Down
7 changes: 5 additions & 2 deletions bitpacker/src/blocked_bitpacker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub struct BlockedBitpacker {
buffer: Vec<u64>,
offset_and_bits: Vec<BlockedBitpackerEntryMetaData>,
}
impl Default for BlockedBitpacker {
fn default() -> Self {
BlockedBitpacker::new()
}
}

/// `BlockedBitpackerEntryMetaData` encodes the
/// offset and bit_width into a u64 bit field
Expand Down Expand Up @@ -115,8 +120,6 @@ impl BlockedBitpacker {
self.buffer.clear();
self.compressed_blocks
.resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker
} else {
return;
}
}
pub fn get(&self, idx: usize) -> u64 {
Expand Down
87 changes: 60 additions & 27 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,42 @@ fn load_metas(
///
/// ```
/// use tantivy::schema::*;
/// use tantivy::{Index, IndexSettings};
/// use tantivy::{Index, IndexSettings, IndexSortByField, Order};
///
/// let mut schema_builder = Schema::builder();
/// let id_field = schema_builder.add_text_field("id", STRING);
/// let title_field = schema_builder.add_text_field("title", TEXT);
/// let body_field = schema_builder.add_text_field("body", TEXT);
/// let number_field = schema_builder.add_u64_field(
/// "number",
/// IntOptions::default().set_fast(Cardinality::SingleValue),
/// );
///
/// let schema = schema_builder.build();
/// let settings = IndexSettings::default();
/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc})};
/// let index = Index::builder().schema(schema).settings(settings).create_in_ram();
///
/// ```
pub struct IndexBuilder {
schema: Option<Schema>,
index_settings: Option<IndexSettings>,
index_settings: IndexSettings,
}
impl Default for IndexBuilder {
fn default() -> Self {
IndexBuilder::new()
}
}
impl IndexBuilder {
/// Creates a new `IndexBuilder`
pub fn new() -> Self {
Self {
schema: None,
index_settings: None,
index_settings: IndexSettings::default(),
}
}
/// Set the settings
pub fn settings(mut self, settings: IndexSettings) -> Self {
self.index_settings = Some(settings);
self.index_settings = settings;
self
}
/// Set the schema
Expand Down Expand Up @@ -131,15 +142,11 @@ impl IndexBuilder {
let mmap_directory = MmapDirectory::create_from_tempdir()?;
self.create(mmap_directory)
}
fn get_settings_or_default(&self) -> Option<IndexSettings> {
self.index_settings.as_ref().cloned()
}
fn get_expect_schema(&self) -> crate::Result<Schema> {
Ok(self
.schema
self.schema
.as_ref()
.cloned()
.ok_or_else(|| TantivyError::IndexBuilderMissingArgument("schema"))?)
.ok_or(TantivyError::IndexBuilderMissingArgument("schema"))
}
/// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(self, dir: Dir) -> crate::Result<Index> {
Expand All @@ -162,11 +169,11 @@ impl IndexBuilder {
let directory = ManagedDirectory::wrap(dir)?;
save_new_metas(
self.get_expect_schema()?,
self.get_settings_or_default(),
self.index_settings.clone(),
&directory,
)?;
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
metas.index_settings = self.get_settings_or_default();
metas.index_settings = self.index_settings.clone();
let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
Ok(index)
}
Expand All @@ -177,7 +184,7 @@ impl IndexBuilder {
pub struct Index {
directory: ManagedDirectory,
schema: Schema,
settings: Option<IndexSettings>,
settings: IndexSettings,
executor: Arc<Executor>,
tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
Expand Down Expand Up @@ -265,12 +272,10 @@ impl Index {
pub fn create<Dir: Directory>(
dir: Dir,
schema: Schema,
settings: Option<IndexSettings>,
settings: IndexSettings,
) -> crate::Result<Index> {
let mut builder = IndexBuilder::new().schema(schema);
if let Some(settings) = settings {
builder = builder.settings(settings);
}
builder = builder.settings(settings);
builder.create(dir)
}

Expand Down Expand Up @@ -423,7 +428,7 @@ impl Index {

/// Helper to create an index writer for tests.
///
/// That index writer only simply has a single thread and a heap of 5 MB.
/// That index writer only simply has a single thread and a heap of 10 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
Expand Down Expand Up @@ -452,7 +457,7 @@ impl Index {

/// Accessor to the index settings
///
pub fn settings(&self) -> &Option<IndexSettings> {
pub fn settings(&self) -> &IndexSettings {
&self.settings
}
/// Accessor to the index schema
Expand Down Expand Up @@ -523,11 +528,14 @@ impl fmt::Debug for Index {

#[cfg(test)]
mod tests {
use crate::directory::{RamDirectory, WatchCallback};
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader;
use crate::ReloadPolicy;
use crate::{
directory::{RamDirectory, WatchCallback},
IndexSettings,
};
use crate::{Directory, Index};

#[test]
Expand All @@ -548,7 +556,12 @@ mod tests {
fn test_index_exists() {
let directory = RamDirectory::create();
assert!(!Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok());
assert!(Index::create(
directory.clone(),
throw_away_schema(),
IndexSettings::default()
)
.is_ok());
assert!(Index::exists(&directory).unwrap());
}

Expand All @@ -563,23 +576,43 @@ mod tests {
#[test]
fn open_or_create_should_open() {
let directory = RamDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok());
assert!(Index::create(
directory.clone(),
throw_away_schema(),
IndexSettings::default()
)
.is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
}

#[test]
fn create_should_wipeoff_existing() {
let directory = RamDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok());
assert!(Index::create(
directory.clone(),
throw_away_schema(),
IndexSettings::default()
)
.is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), Schema::builder().build(), None).is_ok());
assert!(Index::create(
directory.clone(),
Schema::builder().build(),
IndexSettings::default()
)
.is_ok());
}

#[test]
fn open_or_create_exists_but_schema_does_not_match() {
let directory = RamDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok());
assert!(Index::create(
directory.clone(),
throw_away_schema(),
IndexSettings::default()
)
.is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
let err = Index::open_or_create(directory, Schema::builder().build());
Expand Down Expand Up @@ -714,7 +747,7 @@ mod tests {
let directory = RamDirectory::create();
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let index = Index::create(directory.clone(), schema, None).unwrap();
let index = Index::create(directory.clone(), schema, IndexSettings::default()).unwrap();

let mut writer = index.writer_with_num_threads(8, 24_000_000).unwrap();
for i in 0u64..8_000u64 {
Expand Down
Loading

0 comments on commit d523543

Please sign in to comment.