Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust DWPT pool concurrency to the number of cores. #12216

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,49 @@

/**
* Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
* better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
* independently.
* better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked
* independently. The number of subs is computed dynamically based on hardware concurrency.
*/
final class ConcurrentApproximatePriorityQueue<T> {

/** Keeping 8 queues should already help a lot compared to a single one. */
static final int CONCURRENCY = 8;
static final int MIN_CONCURRENCY = 1;
static final int MAX_CONCURRENCY = 256;

private static final int MASK = 0x07;
private static final int getConcurrency() {
int coreCount = Runtime.getRuntime().availableProcessors();
// Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is
// that if we set the concurrency too high then we'll completely lose the bias towards larger
// DWPTs. And if we set it too low then we risk seeing contention.
int concurrency = coreCount / 4;
concurrency = Math.max(MIN_CONCURRENCY, concurrency);
concurrency = Math.min(MAX_CONCURRENCY, concurrency);
return concurrency;
}

final int concurrency;
final Lock[] locks;
final ApproximatePriorityQueue<T>[] queues;

ConcurrentApproximatePriorityQueue() {
locks = new Lock[CONCURRENCY];
this(getConcurrency());
}

ConcurrentApproximatePriorityQueue(int concurrency) {
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
throw new IllegalArgumentException(
"concurrency must be in ["
+ MIN_CONCURRENCY
+ ", "
+ MAX_CONCURRENCY
+ "], got "
+ concurrency);
}
this.concurrency = concurrency;
locks = new Lock[concurrency];
@SuppressWarnings({"rawtypes", "unchecked"})
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[concurrency];
this.queues = queues;
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
locks[i] = new ReentrantLock();
queues[i] = new ApproximatePriorityQueue<>();
}
Expand All @@ -50,9 +74,9 @@ void add(T entry, long weight) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
Expand All @@ -64,7 +88,7 @@ void add(T entry, long weight) {
}
}
}
final int index = threadHash & MASK;
final int index = threadHash % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
Expand All @@ -76,9 +100,9 @@ void add(T entry, long weight) {
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
Expand All @@ -92,8 +116,8 @@ T poll(Predicate<T> predicate) {
}
}
}
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
Expand All @@ -117,7 +141,7 @@ boolean contains(Object o) {
throw new AssertionError("contains should only be used for assertions");
}

for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
Expand All @@ -133,7 +157,7 @@ boolean contains(Object o) {
}

boolean remove(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

import java.util.concurrent.CountDownLatch;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ThreadInterruptedException;

public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {

public void testPollFromSameThread() {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
Expand All @@ -34,7 +40,12 @@ public void testPollFromSameThread() {
}

public void testPollFromDifferentThread() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(
random(),
ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY,
ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
Expand All @@ -53,7 +64,10 @@ public void run() {
}

public void testCurrentLockIsBusy() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
// This test needs a concurrency of 2 or more.
ConcurrentApproximatePriorityQueue<Integer> pq =
new ConcurrentApproximatePriorityQueue<>(
TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY));
pq.add(3, 3);
CountDownLatch takeLock = new CountDownLatch(1);
CountDownLatch releaseLock = new CountDownLatch(1);
Expand Down