Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports force merge based on specified segments. #14163

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public MergeSpecification findForcedDeletesMerges(
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
}

@Override
public MergeSpecification findMergesBySegmentNames(
SegmentInfos segmentInfos, MergeContext mergeContext, String[] segmentNames)
throws IOException {
return in.findMergesBySegmentNames(segmentInfos, mergeContext, segmentNames);
}

@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
Expand Down
66 changes: 66 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,72 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
// background threads accomplish the merging
}

/**
* Just like {@link #forceMergeDeletes()},The only difference is that merging can be enforced
* based on the name of the segment
*/
public void forceMergeBySegmentNames(boolean doWait, String[] segmentNames) throws IOException {
ensureOpen();

flush(true, true);

if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "forceMergeDeletes: index now " + segString());
}

final MergePolicy mergePolicy = config.getMergePolicy();
final CachingMergeContext cachingMergeContext = new CachingMergeContext(this);
MergePolicy.MergeSpecification spec;
boolean newMergesFound = false;
synchronized (this) {
spec = mergePolicy.findMergesBySegmentNames(segmentInfos, cachingMergeContext, segmentNames);
newMergesFound = spec != null;
if (newMergesFound) {
final int numMerges = spec.merges.size();
for (int i = 0; i < numMerges; i++) registerMerge(spec.merges.get(i));
}
}

mergeScheduler.merge(mergeSource, MergeTrigger.EXPLICIT);

if (spec != null && doWait) {
final int numMerges = spec.merges.size();
synchronized (this) {
boolean running = true;
while (running) {

if (tragedy.get() != null) {
throw new IllegalStateException(
"this writer hit an unrecoverable error; cannot complete forceMergeDeletes",
tragedy.get());
}

// Check each merge that MergePolicy asked us to
// do, to see if any of them are still running and
// if any of them have hit an exception.
running = false;
for (int i = 0; i < numMerges; i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
if (pendingMerges.contains(merge) || runningMerges.contains(merge)) {
running = true;
}
Throwable t = merge.getException();
if (t != null) {
throw new IOException("background merge hit exception: " + merge.segString(), t);
}
}

// If any of our merges are still running, wait:
if (running) doWait();
}
}
}

// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
}

/**
* Forces merging of all segments that have deleted documents. The actual merges to be executed
* are determined by the {@link MergePolicy}. For example, the default {@link TieredMergePolicy}
Expand Down
40 changes: 40 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -676,6 +677,45 @@ public abstract MergeSpecification findForcedMerges(
public abstract MergeSpecification findForcedDeletesMerges(
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;

/**
* Finds the specified segment based on its name
*
* @param segmentInfos the total set of segments in the index
* @param mergeContext the MergeContext to find the merges on
* @param segmentNames specified segment names
*/
public MergeSpecification findMergesBySegmentNames(
SegmentInfos segmentInfos, MergeContext mergeContext, String[] segmentNames)
throws IOException {
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();

boolean haveWork = false;
for (SegmentCommitInfo info : segmentInfos) {
if (!merging.contains(info)) {
haveWork = true;
break;
}
}
if (haveWork == false) {
return null;
}
List<SegmentCommitInfo> candidate = new ArrayList<>();
for (SegmentCommitInfo segmentInfo : segmentInfos) {
if (Arrays.stream(segmentNames)
.anyMatch(segmentName -> segmentInfo.info.name.equals(segmentName))) {
candidate.add(segmentInfo);
}
}

if (candidate.size() == 0) {
return null;
}
MergeSpecification spec = new MergeSpecification();
OneMerge merge = new OneMerge(candidate);
spec.add(merge);
return spec;
}

/**
* Identifies merges that we want to execute (synchronously) on commit. By default, this will
* return {@link #findMerges natural merges} whose segments are all less than the {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public MergeSpecification findForcedDeletesMerges(
return null;
}

@Override
public MergeSpecification findMergesBySegmentNames(
SegmentInfos segmentInfos, MergeContext mergeContext, String[] segmentNames)
throws IOException {
return null;
}

@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public MergeSpecification findForcedDeletesMerges(
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
}

@Override
public MergeSpecification findMergesBySegmentNames(
SegmentInfos segmentInfos, MergeContext mergeContext, String[] segmentNames)
throws IOException {
return wrapSpec(in.findMergesBySegmentNames(segmentInfos, mergeContext, segmentNames));
}

@Override
public MergeSpecification findFullFlushMerges(
MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
Expand Down