Skip to content

Commit

Permalink
Make BackgroundIndexer more Efficient (elastic#57781) (elastic#57789)
Browse files Browse the repository at this point in the history
Improve efficiency of background indexer by allowing to add
an assertion for failures while they are produced to prevent
queuing them up.
Also, add non-blocking stop to the background indexer so that when
stopping multiple indexers we don't needlessly continue indexing
on some indexers while stopping another one.

Closes elastic#57766
  • Loading branch information
original-brownbear authored Jun 8, 2020
1 parent 6c93fed commit 619e4f8
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testGlobalCheckpointIsSafe() throws Exception {
false, random())) {
indexer.setRequestTimeout(TimeValue.ZERO);
indexer.setIgnoreIndexingFailures(true);
indexer.setAssertNoFailuresOnStop(false);
indexer.setFailureAssertion(e -> {});
indexer.start(-1);

waitForDocs(randomIntBetween(1, 100), indexer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,12 @@ public void testCloseWhileIndexingDocuments() throws Exception {

int nbDocs = 0;
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS)) {
indexer.setAssertNoFailuresOnStop(false);
indexer.setFailureAssertion(t -> assertException(t, indexName));

waitForDocs(randomIntBetween(10, 50), indexer);
assertBusy(() -> closeIndices(indexName));
indexer.stop();
indexer.stopAndAwaitStopped();
nbDocs += indexer.totalIndexedDocs();

final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}

assertIndexIsClosed(indexName);
Expand Down Expand Up @@ -280,6 +273,7 @@ public void testConcurrentClosesAndOpens() throws Exception {
createIndex(indexName);

final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS);
indexer.setFailureAssertion(e -> {});
waitForDocs(1, indexer);

final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -321,8 +315,7 @@ public void testConcurrentClosesAndOpens() throws Exception {
thread.join();
}

indexer.setAssertNoFailuresOnStop(false);
indexer.stop();
indexer.stopAndAwaitStopped();

final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
if (clusterState.metadata().indices().get(indexName).getState() == IndexMetadata.State.CLOSE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void testCloseWhileRelocatingShards() throws Exception {
logger.debug("creating index {} with background indexing", indexName);
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1);
indexers.put(indexName, indexer);
indexer.setFailureAssertion(t -> assertException(t, indexName));
waitForDocs(1, indexer);
}
docsPerIndex.put(indexName, (long) nbDocs);
Expand Down Expand Up @@ -225,20 +226,15 @@ public void testCloseWhileRelocatingShards() throws Exception {
thread.join();
}

// stop indexers first without waiting for stop to not redundantly index on some while waiting for another one to stop
for (BackgroundIndexer indexer : indexers.values()) {
indexer.stop();
}
for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
final BackgroundIndexer indexer = entry.getValue();
indexer.setAssertNoFailuresOnStop(false);
indexer.stop();

indexer.awaitStopped();
final String indexName = entry.getKey();
docsPerIndex.computeIfPresent(indexName, (key, value) -> value + indexer.totalIndexedDocs());

final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}

for (String index : indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> {} docs indexed", totalNumDocs);

logger.info("--> marking and waiting for indexing threads to stop ...");
indexer.stop();
indexer.stopAndAwaitStopped();
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr
logger.info("--> {} docs indexed", totalNumDocs);

logger.info("--> marking and waiting for indexing threads to stop ...");
indexer.stop();
indexer.stopAndAwaitStopped();
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
Expand Down Expand Up @@ -261,7 +261,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception
.setWaitForNoRelocatingShards(true));

logger.info("--> marking and waiting for indexing threads to stop ...");
indexer.stop();
indexer.stopAndAwaitStopped();
logger.info("--> indexing threads stopped");

assertNoTimeout(client().admin().cluster().prepareHealth()
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testRecoverWhileRelocating() throws Exception {
}

logger.info("--> marking and waiting for indexing threads to stop ...");
indexer.stop();
indexer.stopAndAwaitStopped();

logger.info("--> indexing threads stopped");
logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testRelocationWhileIndexingRandom() throws Exception {
}
logger.info("--> done relocations");
logger.info("--> waiting for indexing threads to stop ...");
indexer.stop();
indexer.stopAndAwaitStopped();
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@
import org.junit.Assert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -58,15 +60,15 @@ public class BackgroundIndexer implements AutoCloseable {
final Thread[] writers;
final Client client;
final CountDownLatch stopLatch;
final CopyOnWriteArrayList<Exception> failures;
final Collection<Exception> failures = new ArrayList<>();
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicLong idGenerator = new AtomicLong();
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore
final Semaphore availableBudget = new Semaphore(0);
final boolean useAutoGeneratedIDs;
private final Set<String> ids = ConcurrentCollections.newConcurrentSet();
private boolean assertNoFailuresOnStop = true;
private volatile Consumer<Exception> failureAssertion = null;

volatile int minFieldSize = 10;
volatile int maxFieldSize = 140;
Expand Down Expand Up @@ -118,7 +120,6 @@ public BackgroundIndexer(final String index, final String type, final Client cli
}
this.client = client;
useAutoGeneratedIDs = random.nextBoolean();
failures = new CopyOnWriteArrayList<>();
writers = new Thread[writerCount];
stopLatch = new CountDownLatch(writers.length);
logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
Expand Down Expand Up @@ -162,7 +163,7 @@ public void run() {
boolean add = ids.add(bulkItemResponse.getId());
assert add : "ID: " + bulkItemResponse.getId() + " already used";
} else {
failures.add(bulkItemResponse.getFailure().getCause());
trackFailure(bulkItemResponse.getFailure().getCause());
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -204,7 +205,7 @@ public void run() {
}
logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), ids.size());
} catch (Exception e) {
failures.add(e);
trackFailure(e);
final long docId = id;
logger.warn(
(Supplier<?>)
Expand All @@ -222,6 +223,16 @@ public void run() {
}
}

private void trackFailure(Exception e) {
synchronized (failures) {
if (failureAssertion != null) {
failureAssertion.accept(e);
} else {
failures.add(e);
}
}
}

private XContentBuilder generateSource(long id, Random random) throws IOException {
int contentLength = RandomNumbers.randomIntBetween(random, minFieldSize, maxFieldSize);
StringBuilder text = new StringBuilder(contentLength);
Expand Down Expand Up @@ -287,32 +298,55 @@ public void continueIndexing(int numOfDocs) {
setBudget(numOfDocs);
}

/** Stop all background threads * */
public void stop() throws InterruptedException {
if (stop.get()) {
return;
}
/** Stop all background threads but don't wait for ongoing indexing operations to finish * */
public void stop() {
stop.set(true);
}

public void awaitStopped() throws InterruptedException {
assert stop.get();
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
if (assertNoFailuresOnStop) {
if (failureAssertion == null) {
assertNoFailures();
}
}

public long totalIndexedDocs() {
return ids.size();
/** Stop all background threads and wait for ongoing indexing operations to finish * */
public void stopAndAwaitStopped() throws InterruptedException {
stop();
awaitStopped();
}

public Throwable[] getFailures() {
return failures.toArray(new Throwable[failures.size()]);
public long totalIndexedDocs() {
return ids.size();
}

public void assertNoFailures() {
Assert.assertThat(failures, emptyIterable());
synchronized (failures) {
Assert.assertThat(failures, emptyIterable());
}
}

public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
/**
* Set a consumer that can be used to run assertions on failures during indexing. If such a consumer is set then it disables adding
* failures to {@link #failures}. Should be used if the number of expected failures during indexing could become very large.
*/
public void setFailureAssertion(Consumer<Exception> failureAssertion) {
synchronized (failures) {
this.failureAssertion = failureAssertion;
boolean success = false;
try {
for (Exception failure : failures) {
failureAssertion.accept(failure);
}
failures.clear();
success = true;
} finally {
if (success == false) {
stop();
}
}
}
}

@Override
Expand Down

0 comments on commit 619e4f8

Please sign in to comment.