From ed428ca5e1f07a5531ec5904cb0b09b4a78f34b0 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 29 Mar 2023 09:20:31 +0200 Subject: [PATCH 1/7] Adjust DWPT pool concurrency to the number of cores. After upgrading Elasticsearch to a recent Lucene snapshot, we observed a few indexing slowdowns when indexing with low numbers of cores. This appears to be due to the fact that we lost too much of the bias towards larger DWPTs in apache/lucene#12199. This change tries to add back more ordering by adjusting the concurrency of `DWPTPool` to the number of cores that are available on the local node. --- .../ConcurrentApproximatePriorityQueue.java | 56 +++++++++++++------ ...estConcurrentApproximatePriorityQueue.java | 16 ++++++ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index 036ba24c962a..c7c5b74802be 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -22,25 +22,47 @@ /** * Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for - * better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked - * independently. + * better concurrency by maintaining multiple sub {@link ApproximatePriorityQueue}s that are locked + * independently. The number of subs is computed dynamically based on hardware concurrency. */ final class ConcurrentApproximatePriorityQueue { - /** Keeping 8 queues should already help a lot compared to a single one. */ - static final int CONCURRENCY = 8; + /** + * Used for testing. + * + * @lucene.internal + */ + static final String CONCURRENCY_OVERRIDE_PROPERTY = "lucene.dwptpool.concurrency_override"; - private static final int MASK = 0x07; + private static final int getConcurrency() { + String value = System.getProperty(CONCURRENCY_OVERRIDE_PROPERTY); + if (value != null) { + try { + return Integer.parseInt(value); + } catch ( + @SuppressWarnings("unused") + NumberFormatException e) { + // ignore + } + } + int coreCount = Runtime.getRuntime().availableProcessors(); + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is that + // if we set the concurrency too high then we'll completely lose the bias towards larger DWPTs. + // And if we set it too low then we risk seeing contention. + return Math.max(1, coreCount / 4); + } + final int concurrency; final Lock[] locks; final ApproximatePriorityQueue[] queues; ConcurrentApproximatePriorityQueue() { - locks = new Lock[CONCURRENCY]; + concurrency = getConcurrency(); + locks = new Lock[concurrency]; @SuppressWarnings({"rawtypes", "unchecked"}) - ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[CONCURRENCY]; + ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[concurrency]; this.queues = queues; - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { locks[i] = new ReentrantLock(); queues[i] = new ApproximatePriorityQueue<>(); } @@ -51,8 +73,8 @@ void add(T entry, long weight) { // entries across queues and gives a bit of thread affinity between entries and threads, which // can't hurt. final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + for (int i = 0; i < concurrency; ++i) { + final int index = Math.floorMod(threadHash + i, concurrency); final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -64,7 +86,7 @@ void add(T entry, long weight) { } } } - final int index = threadHash & MASK; + final int index = Math.floorMod(threadHash, concurrency); final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -77,8 +99,8 @@ void add(T entry, long weight) { T poll(Predicate predicate) { final int threadHash = Thread.currentThread().hashCode(); - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + for (int i = 0; i < concurrency; ++i) { + final int index = Math.floorMod(threadHash + i, concurrency); final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -92,8 +114,8 @@ T poll(Predicate predicate) { } } } - for (int i = 0; i < CONCURRENCY; ++i) { - final int index = (threadHash + i) & MASK; + for (int i = 0; i < concurrency; ++i) { + final int index = Math.floorMod(threadHash + i, concurrency); final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -117,7 +139,7 @@ boolean contains(Object o) { throw new AssertionError("contains should only be used for assertions"); } - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); @@ -133,7 +155,7 @@ boolean contains(Object o) { } boolean remove(Object o) { - for (int i = 0; i < CONCURRENCY; ++i) { + for (int i = 0; i < concurrency; ++i) { final Lock lock = locks[i]; final ApproximatePriorityQueue queue = queues[i]; lock.lock(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java index 49584d9e6978..53ff655aed17 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java @@ -18,10 +18,26 @@ import java.util.concurrent.CountDownLatch; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ThreadInterruptedException; +import org.junit.AfterClass; +import org.junit.BeforeClass; public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { + @BeforeClass + public static void beforeClass() { + final int concurrency = TestUtil.nextInt(random(), 2, 8); + System.setProperty( + ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE_PROPERTY, + Integer.toString(concurrency)); + } + + @AfterClass + public static void afterClass() { + System.clearProperty(ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE_PROPERTY); + } + public void testPollFromSameThread() { ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); pq.add(3, 3); From 16e7cfb5ced3fdfb010610318d90708d4c2ff6a2 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 29 Mar 2023 09:30:24 +0200 Subject: [PATCH 2/7] Remove unnecessary leniency. --- .../lucene/index/ConcurrentApproximatePriorityQueue.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index c7c5b74802be..cdc097aa2874 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -37,13 +37,7 @@ final class ConcurrentApproximatePriorityQueue { private static final int getConcurrency() { String value = System.getProperty(CONCURRENCY_OVERRIDE_PROPERTY); if (value != null) { - try { - return Integer.parseInt(value); - } catch ( - @SuppressWarnings("unused") - NumberFormatException e) { - // ignore - } + return Integer.parseInt(value); } int coreCount = Runtime.getRuntime().availableProcessors(); // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is that From c6f19e3101b36b7f4e88a558d98299a933e9e4c2 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 29 Mar 2023 10:13:02 +0200 Subject: [PATCH 3/7] Simplify. --- .../ConcurrentApproximatePriorityQueue.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index cdc097aa2874..2cdfbc11de7d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -36,14 +36,19 @@ final class ConcurrentApproximatePriorityQueue { private static final int getConcurrency() { String value = System.getProperty(CONCURRENCY_OVERRIDE_PROPERTY); + int concurrency; if (value != null) { - return Integer.parseInt(value); + concurrency = Integer.parseInt(value); + } else { + int coreCount = Runtime.getRuntime().availableProcessors(); + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is that + // if we set the concurrency too high then we'll completely lose the bias towards larger DWPTs. + // And if we set it too low then we risk seeing contention. + concurrency = coreCount / 4; } - int coreCount = Runtime.getRuntime().availableProcessors(); - // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is that - // if we set the concurrency too high then we'll completely lose the bias towards larger DWPTs. - // And if we set it too low then we risk seeing contention. - return Math.max(1, coreCount / 4); + concurrency = Math.max(1, concurrency); + concurrency = Math.min(256, concurrency); + return concurrency; } final int concurrency; @@ -66,9 +71,9 @@ void add(T entry, long weight) { // Seed the order in which to look at entries based on the current thread. This helps distribute // entries across queues and gives a bit of thread affinity between entries and threads, which // can't hurt. - final int threadHash = Thread.currentThread().hashCode(); + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; for (int i = 0; i < concurrency; ++i) { - final int index = Math.floorMod(threadHash + i, concurrency); + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -80,7 +85,7 @@ void add(T entry, long weight) { } } } - final int index = Math.floorMod(threadHash, concurrency); + final int index = threadHash % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); @@ -92,9 +97,9 @@ void add(T entry, long weight) { } T poll(Predicate predicate) { - final int threadHash = Thread.currentThread().hashCode(); + final int threadHash = Thread.currentThread().hashCode() & 0xFFFF; for (int i = 0; i < concurrency; ++i) { - final int index = Math.floorMod(threadHash + i, concurrency); + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; if (lock.tryLock()) { @@ -109,7 +114,7 @@ T poll(Predicate predicate) { } } for (int i = 0; i < concurrency; ++i) { - final int index = Math.floorMod(threadHash + i, concurrency); + final int index = (threadHash + i) % concurrency; final Lock lock = locks[index]; final ApproximatePriorityQueue queue = queues[index]; lock.lock(); From 9f82d0b6ab727b633681cc9499524595832c9bea Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 29 Mar 2023 10:46:12 +0200 Subject: [PATCH 4/7] tidy --- .../lucene/index/ConcurrentApproximatePriorityQueue.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index 2cdfbc11de7d..bebe2abeb2a6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -41,9 +41,9 @@ private static final int getConcurrency() { concurrency = Integer.parseInt(value); } else { int coreCount = Runtime.getRuntime().availableProcessors(); - // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is that - // if we set the concurrency too high then we'll completely lose the bias towards larger DWPTs. - // And if we set it too low then we risk seeing contention. + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is + // that if we set the concurrency too high then we'll completely lose the bias towards larger + // DWPTs. And if we set it too low then we risk seeing contention. concurrency = coreCount / 4; } concurrency = Math.max(1, concurrency); From 5f17c9c4710c27cc4650ddaf270efcef53733c6a Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 29 Mar 2023 11:16:10 +0200 Subject: [PATCH 5/7] Fix permissions. --- .../index/ConcurrentApproximatePriorityQueue.java | 12 +++--------- .../TestConcurrentApproximatePriorityQueue.java | 7 ++----- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index bebe2abeb2a6..a97c65c917db 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -27,18 +27,12 @@ */ final class ConcurrentApproximatePriorityQueue { - /** - * Used for testing. - * - * @lucene.internal - */ - static final String CONCURRENCY_OVERRIDE_PROPERTY = "lucene.dwptpool.concurrency_override"; + static Integer CONCURRENCY_OVERRIDE; private static final int getConcurrency() { - String value = System.getProperty(CONCURRENCY_OVERRIDE_PROPERTY); int concurrency; - if (value != null) { - concurrency = Integer.parseInt(value); + if (CONCURRENCY_OVERRIDE != null) { + concurrency = CONCURRENCY_OVERRIDE; } else { int coreCount = Runtime.getRuntime().availableProcessors(); // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java index 53ff655aed17..42b5e135bd2c 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java @@ -27,15 +27,12 @@ public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { @BeforeClass public static void beforeClass() { - final int concurrency = TestUtil.nextInt(random(), 2, 8); - System.setProperty( - ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE_PROPERTY, - Integer.toString(concurrency)); + ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE = TestUtil.nextInt(random(), 2, 8); } @AfterClass public static void afterClass() { - System.clearProperty(ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE_PROPERTY); + ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE = null; } public void testPollFromSameThread() { From bd30cf1c32df4b7415f2c6f2db9361e26fb5a2cf Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 31 Mar 2023 13:46:34 +0200 Subject: [PATCH 6/7] feedback --- .../ConcurrentApproximatePriorityQueue.java | 33 ++++++++++++------- ...estConcurrentApproximatePriorityQueue.java | 31 ++++++++--------- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index a97c65c917db..ca9773b68c12 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -27,19 +27,15 @@ */ final class ConcurrentApproximatePriorityQueue { - static Integer CONCURRENCY_OVERRIDE; + static final int MIN_CONCURRENCY = 1; + static final int MAX_CONCURRENCY = 256; private static final int getConcurrency() { - int concurrency; - if (CONCURRENCY_OVERRIDE != null) { - concurrency = CONCURRENCY_OVERRIDE; - } else { - int coreCount = Runtime.getRuntime().availableProcessors(); - // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is - // that if we set the concurrency too high then we'll completely lose the bias towards larger - // DWPTs. And if we set it too low then we risk seeing contention. - concurrency = coreCount / 4; - } + int coreCount = Runtime.getRuntime().availableProcessors(); + // Aim for ~4 entries per slot when indexing with one thread per CPU core. The trade-off is + // that if we set the concurrency too high then we'll completely lose the bias towards larger + // DWPTs. And if we set it too low then we risk seeing contention. + int concurrency = coreCount / 4; concurrency = Math.max(1, concurrency); concurrency = Math.min(256, concurrency); return concurrency; @@ -50,7 +46,20 @@ private static final int getConcurrency() { final ApproximatePriorityQueue[] queues; ConcurrentApproximatePriorityQueue() { - concurrency = getConcurrency(); + this(getConcurrency()); + } + + ConcurrentApproximatePriorityQueue(int concurrency) { + if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) { + throw new IllegalArgumentException( + "concurrency must be in [" + + MIN_CONCURRENCY + + ", " + + MAX_CONCURRENCY + + "], got " + + concurrency); + } + this.concurrency = concurrency; locks = new Lock[concurrency]; @SuppressWarnings({"rawtypes", "unchecked"}) ApproximatePriorityQueue[] queues = new ApproximatePriorityQueue[concurrency]; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java index 42b5e135bd2c..2656e4a38855 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentApproximatePriorityQueue.java @@ -20,23 +20,16 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ThreadInterruptedException; -import org.junit.AfterClass; -import org.junit.BeforeClass; public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase { - @BeforeClass - public static void beforeClass() { - ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE = TestUtil.nextInt(random(), 2, 8); - } - - @AfterClass - public static void afterClass() { - ConcurrentApproximatePriorityQueue.CONCURRENCY_OVERRIDE = null; - } - public void testPollFromSameThread() { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -47,7 +40,12 @@ public void testPollFromSameThread() { } public void testPollFromDifferentThread() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt( + random(), + ConcurrentApproximatePriorityQueue.MIN_CONCURRENCY, + ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); pq.add(10, 10); pq.add(7, 7); @@ -66,7 +64,10 @@ public void run() { } public void testCurrentLockIsBusy() throws Exception { - ConcurrentApproximatePriorityQueue pq = new ConcurrentApproximatePriorityQueue<>(); + // This test needs a concurrency of 2 or more. + ConcurrentApproximatePriorityQueue pq = + new ConcurrentApproximatePriorityQueue<>( + TestUtil.nextInt(random(), 2, ConcurrentApproximatePriorityQueue.MAX_CONCURRENCY)); pq.add(3, 3); CountDownLatch takeLock = new CountDownLatch(1); CountDownLatch releaseLock = new CountDownLatch(1); From 0a6968a81d3f50fd12ee907001443f8fd210ee7a Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 31 Mar 2023 14:05:00 +0200 Subject: [PATCH 7/7] use constants --- .../lucene/index/ConcurrentApproximatePriorityQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java index ca9773b68c12..8a8fc72ab4c3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentApproximatePriorityQueue.java @@ -36,8 +36,8 @@ private static final int getConcurrency() { // that if we set the concurrency too high then we'll completely lose the bias towards larger // DWPTs. And if we set it too low then we risk seeing contention. int concurrency = coreCount / 4; - concurrency = Math.max(1, concurrency); - concurrency = Math.min(256, concurrency); + concurrency = Math.max(MIN_CONCURRENCY, concurrency); + concurrency = Math.min(MAX_CONCURRENCY, concurrency); return concurrency; }