diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index 036ba24c962a..8a8fc72ab4c3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -22,25 +22,49 @@ /** * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for - * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked - * independently. + * better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked + * independently. The number of subs is computed dynamically based on hardware concurrency. */ final class ConcurrentApproximatePriorityQueue { - /** Keeping 8 queues should already help a lot compared to a single one. */ - static final int CONCURRENCY = 8; + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; - private static final int MASK = 0x07; + private static final int getConcurrency() { + int coreCount = Runtime.getRuntime().availableProcessors(); + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is + // that if we set the concurrency too high then we'll completely lose the bias towards larger + // DWPTs. And if we set it too low then we risk seeing contention. + int concurrency = coreCount / 4; + concurrency = Math.max(MIN_CONCURRENCY, concurrency); + concurrency = Math.min(MAX_CONCURRENCY, concurrency); + return concurrency; + } + final int concurrency; final Lock[] locks; final ApproximatePriorityQueue[] queues; ConcurrentApproximatePriorityQueue() { - locks = new Lock[CONCURRENCY]; + this(getConcurrency()); + } + + ConcurrentApproximatePriorityQueue(int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + + MIN_CONCURRENCY + + ", " + + MAX_CONCURRENCY + + "], got " + + concurrency); + } + this.concurrency = concurrency; + locks = new Lock[concurrency]; @SuppressWarnings({"rawtypes", "unchecked"}) - ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[CONCURRENCY]; + ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[concurrency]; this.queues = queues; - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { locks[i] = new ReentrantLock(); queues[i] = new ApproximatePriorityQueue<>(); } @@ -50,9 +74,9 @@ void add(T entry, long weight) { // Seed the order in which to look at entries based on the current thread. This helps distribute // entries across queues and gives a bit of thread affinity between entries and threads, which // can't hurt. - final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -64,7 +88,7 @@ void add(T entry, long weight) { } } } - final int index = threadHash & MASK; + final int index = threadHash % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -76,9 +100,9 @@ void add(T entry, long weight) { } T poll(Predicate predicate) { - final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -92,8 +116,8 @@ T poll(Predicate predicate) { } } } - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + for (int i = 0; i < concurrency; ++i) { + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -117,7 +141,7 @@ boolean contains(Object o) { throw new AssertionError("contains should only be used for assertions"); } - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); @@ -133,7 +157,7 @@ boolean contains(Object o) { } boolean remove(Object o) { - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java index 49584d9e6978..2656e4a38855 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java @@ -18,12 +18,18 @@ import java.util.concurrent.CountDownLatch; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ThreadInterruptedException; public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { public void testPollFromSameThread() { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -34,7 +40,12 @@ public void testPollFromSameThread() { } public void testPollFromDifferentThread() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -53,7 +64,10 @@ public void run() { } public void testCurrentLockIsBusy() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + // This test needs a concurrency of 2 or more. + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); CountDownLatch takeLock = new CountDownLatch(1); CountDownLatch releaseLock = new CountDownLatch(1);