Skip to content

Commit

Permalink
Cleanup flushing logic in DocumentsWriter (#12647)
Browse files Browse the repository at this point in the history
DocumentsWriter had some duplicate logic for iterating over
segments to be flushed. This change simplifies some of the loops
and moves common code in on place. This also adds tests to ensure
we actually freeze and apply deletes on segment flush.

Relates to #12572
  • Loading branch information
s1monw committed Oct 12, 2023
1 parent 601e5a5 commit c2331b4
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 91 deletions.
115 changes: 47 additions & 68 deletions lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -169,11 +168,11 @@ private boolean applyAllDeletes() throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;

if (flushControl.isFullFlush() == false
// never apply deletes during full flush this breaks happens before relationship
// never apply deletes during full flush this breaks happens before relationship.
&& deleteQueue.isOpen()
// if it's closed then it's already fully applied and we have a new delete queue
&& flushControl.getAndResetApplyAllDeletes()) {
if (ticketQueue.addDeletes(deleteQueue)) {
if (ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(deleteQueue)) != null) {
flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
return true;
}
Expand Down Expand Up @@ -241,15 +240,16 @@ final boolean flushOneDWPT() throws IOException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFlushOneDWPT");
}
// first check if there is one pending
DocumentsWriterPerThread documentsWriterPerThread = flushControl.nextPendingFlush();
if (documentsWriterPerThread == null) {
documentsWriterPerThread = flushControl.checkoutLargestNonPendingWriter();
}
if (documentsWriterPerThread != null) {
return doFlush(documentsWriterPerThread);
if (maybeFlush() == false) {
DocumentsWriterPerThread documentsWriterPerThread =
flushControl.checkoutLargestNonPendingWriter();
if (documentsWriterPerThread != null) {
doFlush(documentsWriterPerThread);
return true;
}
return false;
}
return false; // we didn't flush anything here
return true;
}

/**
Expand Down Expand Up @@ -388,11 +388,8 @@ private boolean preUpdate() throws IOException {
|| (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
hasEvents |= doFlush(flushingDWPT);
}
// no need to loop over the next pending flushes... doFlush will take care of this
hasEvents |= maybeFlush();
flushControl.waitIfStalled(); // block if stalled
}
return hasEvents;
Expand All @@ -402,14 +399,11 @@ private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEve
throws IOException {
hasEvents |= applyAllDeletes();
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
doFlush(flushingDWPT);
hasEvents = true;
} else if (config.checkPendingFlushOnUpdate) {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
hasEvents |= doFlush(nextPendingFlush);
}
hasEvents |= maybeFlush();
}

return hasEvents;
}

Expand Down Expand Up @@ -451,11 +445,19 @@ long updateDocuments(
return seqNo;
}

private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {
private boolean maybeFlush() throws IOException {
final DocumentsWriterPerThread flushingDWPT = flushControl.nextPendingFlush();
if (flushingDWPT != null) {
doFlush(flushingDWPT);
return true;
}
return false;
}

private void doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
assert flushingDWPT != null : "Flushing DWPT must not be null";
do {
assert flushingDWPT.hasFlushed() == false;
hasEvents = true;
boolean success = false;
DocumentsWriterFlushQueue.FlushTicket ticket = null;
try {
Expand Down Expand Up @@ -483,8 +485,11 @@ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOExceptio
*/
try {
assert assertTicketQueueModification(flushingDWPT.deleteQueue);
final DocumentsWriterPerThread dwpt = flushingDWPT;
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
ticket =
ticketQueue.addTicket(
() -> new DocumentsWriterFlushQueue.FlushTicket(dwpt.prepareFlush(), true));
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
Expand All @@ -497,11 +502,9 @@ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOExceptio
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
Set<String> files = flushingDWPT.pendingFilesToDelete();
flushNotifications.deleteUnusedFiles(files);
hasEvents = true;
}
if (dwptSuccess == false) {
flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the
Expand All @@ -525,42 +528,12 @@ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOExceptio
// other threads flushing segments. In this case
// we forcefully stall the producers.
flushNotifications.onTicketBacklog();
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
}

flushingDWPT = flushControl.nextPendingFlush();
}

if (hasEvents) {
flushNotifications.afterSegmentsFlushed();
}

// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH
&& flushControl.getDeleteBytesUsed() > (1024 * 1024 * ramBufferSizeMB / 2)) {
hasEvents = true;
if (applyAllDeletes() == false) {
if (infoStream.isEnabled("DW")) {
infoStream.message(
"DW",
String.format(
Locale.ROOT,
"force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed() / (1024. * 1024.),
ramBufferSizeMB));
}
flushNotifications.onDeletesApplied();
}
}

return hasEvents;
} while ((flushingDWPT = flushControl.nextPendingFlush()) != null);
flushNotifications.afterSegmentsFlushed();
}

synchronized long getNextSequenceNumber() {
Expand Down Expand Up @@ -665,11 +638,7 @@ long flushAllThreads() throws IOException {

boolean anythingFlushed = false;
try {
DocumentsWriterPerThread flushingDWPT;
// Help out with flushing:
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
anythingFlushed |= maybeFlush();
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (anythingFlushed == false
Expand All @@ -679,9 +648,9 @@ long flushAllThreads() throws IOException {
"DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
assert assertTicketQueueModification(flushingDeleteQueue);
ticketQueue.addDeletes(flushingDeleteQueue);
ticketQueue.addTicket(() -> maybeFreezeGlobalBuffer(flushingDeleteQueue));
}
// we can't assert that we don't have any tickets in teh queue since we might add a
// we can't assert that we don't have any tickets in the queue since we might add a
// DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
Expand All @@ -698,6 +667,16 @@ long flushAllThreads() throws IOException {
}
}

private DocumentsWriterFlushQueue.FlushTicket maybeFreezeGlobalBuffer(
DocumentsWriterDeleteQueue deleteQueue) {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) {
// no need to publish anything if we don't have any frozen updates
return new DocumentsWriterFlushQueue.FlushTicket(frozenBufferedUpdates, false);
}
return null;
}

void finishFullFlush(boolean success) throws IOException {
try {
if (infoStream.isEnabled("DW")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOConsumer;

Expand All @@ -34,23 +35,23 @@ final class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();

synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized FlushTicket addTicket(Supplier<FlushTicket> ticketSupplier) throws IOException {
// first inc the ticket count - freeze opens a window for #anyChanges to fail
incTickets();
boolean success = false;
try {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) {
FlushTicket ticket = ticketSupplier.get();
if (ticket != null) {
// no need to publish anything if we don't have any frozen updates
queue.add(new FlushTicket(frozenBufferedUpdates, false));
queue.add(ticket);
success = true;
}
return ticket;
} finally {
if (!success) {
decTickets();
}
}
return success;
}

private void incTickets() {
Expand All @@ -63,24 +64,6 @@ private void decTickets() {
assert numTickets >= 0;
}

synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}

synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
assert ticket.hasSegment;
// the actual flush is done asynchronously and once done the FlushedSegment
Expand Down
86 changes: 86 additions & 0 deletions lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3143,6 +3143,92 @@ public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedExcept
dir.close();
}

public void testApplyDeletesWithoutFlushes() throws IOException {
try (Directory dir = newDirectory()) {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig();
AtomicBoolean flushDeletes = new AtomicBoolean();
indexWriterConfig.setFlushPolicy(
new FlushPolicy() {
@Override
public void onChange(
DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
if (flushDeletes.get()) {
control.setApplyAllDeletes();
}
}
});
try (IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
w.deleteDocuments(new Term("foo", "bar"));
long bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue(bytesUsed + " > 0", bytesUsed > 0);
w.deleteDocuments(new Term("foo", "baz"));
bytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue(bytesUsed + " > 0", bytesUsed > 0);
assertEquals(2, w.getBufferedDeleteTermsSize());
assertEquals(0, w.getFlushDeletesCount());
flushDeletes.set(true);
w.deleteDocuments(new Term("foo", "bar"));
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
assertEquals(1, w.getFlushDeletesCount());
}
}
}

public void testDeletesAppliedOnFlush() throws IOException {
try (Directory dir = newDirectory()) {
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig())) {
Document doc = new Document();
doc.add(newField("id", "1", storedTextType));
w.addDocument(doc);
w.updateDocument(new Term("id", "1"), doc);
long deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0);
assertEquals(0, w.getFlushDeletesCount());
assertTrue(w.flushNextBuffer());
assertEquals(1, w.getFlushDeletesCount());
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
w.deleteAll();
w.commit();
assertEquals(2, w.getFlushDeletesCount());
if (random().nextBoolean()) {
w.deleteDocuments(new Term("id", "1"));
} else {
w.updateDocValues(new Term("id", "1"), new NumericDocValuesField("foo", 1l));
}
deleteBytesUsed = w.docWriter.flushControl.getDeleteBytesUsed();
assertTrue("deletedBytesUsed: " + deleteBytesUsed, deleteBytesUsed > 0);
doc = new Document();
doc.add(newField("id", "5", storedTextType));
w.addDocument(doc);
assertTrue(w.flushNextBuffer());
assertEquals(0, w.docWriter.flushControl.getDeleteBytesUsed());
assertEquals(3, w.getFlushDeletesCount());
}
try (RandomIndexWriter w = new RandomIndexWriter(random(), dir, new IndexWriterConfig())) {
int numDocs = 1 + random().nextInt(100);
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
doc.add(newField("id", "" + i, storedTextType));
w.addDocument(doc);
}
for (int i = 0; i < numDocs; i++) {
if (random().nextBoolean()) {
Document doc = new Document();
doc.add(newField("id", "" + i, storedTextType));
w.updateDocument(new Term("id", "" + i), doc);
}
}

long deleteBytesUsed = w.w.docWriter.flushControl.getDeleteBytesUsed();
if (deleteBytesUsed > 0) {
assertTrue(w.w.flushNextBuffer());
assertEquals(0, w.w.docWriter.flushControl.getDeleteBytesUsed());
}
}
}
}

public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
Expand Down

0 comments on commit c2331b4

Please sign in to comment.