Skip to content

Commit

Permalink
Adjust DWPT pool concurrency to the number of cores. (#12216)
Browse files Browse the repository at this point in the history
After upgrading Elasticsearch to a recent Lucene snapshot, we observed a few
indexing slowdowns when indexing with low numbers of cores. This appears to be
due to the fact that we lost too much of the bias towards larger DWPTs in
#12199. This change tries to add back more ordering by adjusting
the concurrency of `DWPTPool` to the number of cores that are available on the
local node.
  • Loading branch information
jpountz authored Mar 31, 2023
1 parent 172dfaf commit 56e6591
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,49 @@

/**
* Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
* better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
* independently.
* better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked
* independently. The number of subs is computed dynamically based on hardware concurrency.
*/
final class ConcurrentApproximatePriorityQueue<T> {

/** Keeping 8 queues should already help a lot compared to a single one. */
static final int CONCURRENCY = 8;
static final int MIN_CONCURRENCY = 1;
static final int MAX_CONCURRENCY = 256;

private static final int MASK = 0x07;
private static final int getConcurrency() {
int coreCount = Runtime.getRuntime().availableProcessors();
// Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is
// that if we set the concurrency too high then we'll completely lose the bias towards larger
// DWPTs. And if we set it too low then we risk seeing contention.
int concurrency = coreCount / 4;
concurrency = Math.max(MIN_CONCURRENCY, concurrency);
concurrency = Math.min(MAX_CONCURRENCY, concurrency);
return concurrency;
}

final int concurrency;
final Lock[] locks;
final ApproximatePriorityQueue<T>[] queues;

ConcurrentApproximatePriorityQueue() {
locks = new Lock[CONCURRENCY];
this(getConcurrency());
}

ConcurrentApproximatePriorityQueue(int concurrency) {
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
throw new IllegalArgumentException(
"concurrency must be in ["
+ MIN_CONCURRENCY
+ ", "
+ MAX_CONCURRENCY
+ "], got "
+ concurrency);
}
this.concurrency = concurrency;
locks = new Lock[concurrency];
@SuppressWarnings({"rawtypes", "unchecked"})
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[concurrency];
this.queues = queues;
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
locks[i] = new ReentrantLock();
queues[i] = new ApproximatePriorityQueue<>();
}
Expand All @@ -50,9 +74,9 @@ void add(T entry, long weight) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
Expand All @@ -64,7 +88,7 @@ void add(T entry, long weight) {
}
}
}
final int index = threadHash & MASK;
final int index = threadHash % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
Expand All @@ -76,9 +100,9 @@ void add(T entry, long weight) {
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
Expand All @@ -92,8 +116,8 @@ T poll(Predicate<T> predicate) {
}
}
}
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
Expand All @@ -117,7 +141,7 @@ boolean contains(Object o) {
throw new AssertionError("contains should only be used for assertions");
}

for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
Expand All @@ -133,7 +157,7 @@ boolean contains(Object o) {
}

boolean remove(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

import java.util.concurrent.CountDownLatch;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ThreadInterruptedException;

public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {

public void testPollFromSameThread() {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
Expand All @@ -34,7 +40,12 @@ public void testPollFromSameThread() {
}

public void testPollFromDifferentThread() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
Expand All @@ -53,7 +64,10 @@ public void run() {
}

public void testCurrentLockIsBusy() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
// This test needs a concurrency of 2 or more.
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
CountDownLatch takeLock = new CountDownLatch(1);
CountDownLatch releaseLock = new CountDownLatch(1);
Expand Down

0 comments on commit 56e6591

Please sign in to comment.