Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare commit #1202

Merged
merged 2 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,8 @@ impl IndexWriter {
}

/// Detects and removes the files that are not used by the index anymore.
pub fn garbage_collect_files(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
self.segment_updater.schedule_garbage_collect()
pub async fn garbage_collect_files(&self) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect().await
}

/// Deletes all documents from the index
Expand Down Expand Up @@ -607,7 +605,7 @@ impl IndexWriter {
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub(crate) fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub(crate) use self::prepared_commit::PreparedCommit;
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::SegmentEntry;
pub use self::segment_manager::SegmentManager;
pub use self::segment_serializer::SegmentSerializer;
Expand Down
33 changes: 23 additions & 10 deletions src/indexer/prepared_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::Opstamp;
use futures::executor::block_on;

/// A prepared commit
pub(crate) struct PreparedCommit<'a> {
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: Opstamp,
Expand All @@ -18,25 +18,38 @@ impl<'a> PreparedCommit<'a> {
}
}

pub(crate) fn opstamp(&self) -> Opstamp {
/// Returns the opstamp associated to the prepared commit.
pub fn opstamp(&self) -> Opstamp {
self.opstamp
}

pub(crate) fn set_payload(&mut self, payload: &str) {
/// Adds an arbitrary payload to the commit.
pub fn set_payload(&mut self, payload: &str) {
self.payload = Some(payload.to_string())
}

pub(crate) fn abort(self) -> crate::Result<Opstamp> {
/// Rollbacks any change.
pub fn abort(self) -> crate::Result<Opstamp> {
self.index_writer.rollback()
}

pub(crate) fn commit(self) -> crate::Result<Opstamp> {
/// Proceeds to commit.
/// See `.commit_async()`.
pub fn commit(self) -> crate::Result<Opstamp> {
block_on(self.commit_async())
}

/// Proceeds to commit.
///
/// Unfortunately, contrary to what `PrepareCommit` may suggests,
/// this operation is not at all really light.
/// At this point deletes have not been flushed yet.
pub async fn commit_async(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp);
block_on(
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload),
)?;
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload)
.await?;
Ok(self.opstamp)
}
}
60 changes: 31 additions & 29 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,37 +351,39 @@ impl SegmentUpdater {
*self.merge_policy.write().unwrap() = arc_merge_policy;
}

fn schedule_future<T: 'static + Send, F: Future<Output = crate::Result<T>> + 'static + Send>(
async fn schedule_task<
T: 'static + Send,
F: Future<Output = crate::Result<T>> + 'static + Send,
>(
&self,
f: F,
) -> impl Future<Output = crate::Result<T>> {
let (sender, receiver) = oneshot::channel();
if self.is_alive() {
self.pool.spawn_ok(async move {
let _ = sender.send(f.await);
});
} else {
let _ = sender.send(Err(crate::TantivyError::SystemError(
task: F,
) -> crate::Result<T> {
if !self.is_alive() {
return Err(crate::TantivyError::SystemError(
"Segment updater killed".to_string(),
)));
));
}
receiver.unwrap_or_else(|_| {
let (sender, receiver) = oneshot::channel();
self.pool.spawn_ok(async move {
let task_result = task.await;
let _ = sender.send(task_result);
});
let task_result = receiver.await;
task_result.unwrap_or_else(|_| {
let err_msg =
"A segment_updater future did not success. This should never happen.".to_string();
Err(crate::TantivyError::SystemError(err_msg))
})
}

pub fn schedule_add_segment(
&self,
segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<()>> {
pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> {
let segment_updater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}

/// Orders `SegmentManager` to remove all segments
Expand Down Expand Up @@ -448,11 +450,9 @@ impl SegmentUpdater {
Ok(())
}

pub fn schedule_garbage_collect(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
pub async fn schedule_garbage_collect(&self) -> crate::Result<GarbageCollectionResult> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_future(garbage_collect_future)
self.schedule_task(garbage_collect_future).await
}

/// List the files that are useful to the index.
Expand All @@ -470,20 +470,21 @@ impl SegmentUpdater {
files
}

pub fn schedule_commit(
pub(crate) async fn schedule_commit(
&self,
opstamp: Opstamp,
payload: Option<String>,
) -> impl Future<Output = crate::Result<()>> {
) -> crate::Result<()> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}

fn store_meta(&self, index_meta: &IndexMeta) {
Expand Down Expand Up @@ -611,14 +612,14 @@ impl SegmentUpdater {
}
}

fn end_merge(
async fn end_merge(
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<SegmentMeta>> {
) -> crate::Result<SegmentMeta> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let end_merge_future = self.schedule_future(async move {
self.schedule_task(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
Expand Down Expand Up @@ -661,8 +662,9 @@ impl SegmentUpdater {

let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
end_merge_future.map_ok(|_| after_merge_segment_meta)
})
.await?;
Ok(after_merge_segment_meta)
}

/// Wait for current merging threads.
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub use crate::indexer::demuxer::*;
pub use crate::indexer::merge_filtered_segments;
pub use crate::indexer::merge_indices;
pub use crate::indexer::operation::UserOperation;
pub use crate::indexer::IndexWriter;
pub use crate::indexer::{IndexWriter, PreparedCommit};
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term};
Expand Down