Skip to content

Commit

Permalink
An array-based task queue to address performance regressions.
Browse files Browse the repository at this point in the history
TaskFifo has higher performance than `ConcurrentLinkedQueue` because
primary contention is resolved through atomic increment, which
translates into `LOCK XADD` and doesn't require a CAS loop.

TaskFifo also generates negligible per-task garbage, only doing so when
there is a descheduled or unusually slow take thread.

PiperOrigin-RevId: 507600172
Change-Id: Iab60eec942a1b12e0ebe0fb3947c3b20a3bf8997
  • Loading branch information
aoeui authored and copybara-github committed Feb 6, 2023
1 parent e9316f0 commit 44782dc
Show file tree
Hide file tree
Showing 4 changed files with 695 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.createPaddedBaseAddress;
import static com.google.devtools.build.lib.concurrent.PaddedAddresses.getAlignedAddress;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_CPU_HEAVY_TASK;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.DO_TASK;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.NextWorkerActivity.IDLE;
Expand All @@ -32,7 +34,6 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
Expand Down Expand Up @@ -68,12 +69,11 @@ final class PriorityWorkerPool {
*
* <p>An interesting alternative to consider is to place unprioritized tasks directly into {@link
* #pool}, which could reduce the work performed by the system. Doing this results in about a
* {@code 4%} end-to-end analysis regression in our benchmark. The reasons are not clear, but
* perhaps polling from a {@link ConcurrentLinkedQueue}, as implemented in {@link
* WorkerThread#runLoop} is more efficient than random scanning of {@link ForkJoinPool}, or it
* could be a domain specific reason having to do with differences in the resulting task ordering.
* {@code 4%} end-to-end regression in our benchmark. The likely cause for this is that FIFO
* behavior is very important for performance because it reflects the ordering of prioritized
* tasks.
*/
private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final TaskFifo queue;

private final ConcurrentSkipListSet<ComparableRunnable> cpuHeavyQueue =
new ConcurrentSkipListSet<>();
Expand Down Expand Up @@ -131,10 +131,14 @@ final class PriorityWorkerPool {
this.pool = newForkJoinPool();
this.errorClassifier = errorClassifier;

long baseAddress = PaddedAddresses.createPaddedBaseAddress(2);
long baseAddress = createPaddedBaseAddress(4);
cleaner.register(this, new AddressFreer(baseAddress));
this.countersAddress = PaddedAddresses.getAlignedAddress(baseAddress, /* offset= */ 0);
this.queueSizeAddress = PaddedAddresses.getAlignedAddress(baseAddress, /* offset= */ 1);
this.countersAddress = getAlignedAddress(baseAddress, /* offset= */ 0);
this.queue =
new TaskFifo(
/* sizeAddress= */ getAlignedAddress(baseAddress, /* offset= */ 1),
/* appendIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 2),
/* takeIndexAddress= */ getAlignedAddress(baseAddress, /* offset= */ 3));

resetExecutionCounters();
}
Expand All @@ -159,7 +163,7 @@ void execute(Runnable rawTask) {
}
}

while (!tryAppend(rawTask)) {
while (!queue.tryAppend(rawTask)) {
// If the queue is full, this thread donates some work to reduce the queue.
if (!tryAcquireTask()) {
// This should be very hard to reach if the queue is full except under cancellation. It's
Expand All @@ -180,16 +184,6 @@ void execute(Runnable rawTask) {
}
}

private boolean tryAppend(Runnable task) {
if (queueSizeIncrementAndGet() > TASKS_MAX_VALUE) {
queueSizeDecrement();
return false;
}

queue.add(task);
return true;
}

/**
* An object to {@link Object#wait} or {@link Object#notifyAll} on for quiescence.
*
Expand Down Expand Up @@ -275,7 +269,7 @@ public String toString() {
}
return toStringHelper(this)
.add("available", formatSnapshot(getExecutionCounters()))
.add("|queue|", queueSize())
.add("|queue|", queue.size())
.add("|cpu queue|", cpuHeavyQueue.size())
.add("threads", threadStates)
.add("unhandled", unhandled.get())
Expand Down Expand Up @@ -422,8 +416,7 @@ boolean tryDoQueuedWork() {

private void dequeueTaskAndRun() {
try {
var task = queue.poll();
UNSAFE.getAndAddInt(null, queueSizeAddress, -1);
var task = queue.take();
task.run();
} catch (Throwable uncaught) {
handleUncaughtError(uncaught);
Expand Down Expand Up @@ -453,7 +446,7 @@ private void dequeueCpuHeavyTaskAndRun() {
private static final long TASKS_MASK = 0x0000_001F_FF80_0000L;
private static final int TASKS_BIT_OFFSET = 23;
private static final long ONE_TASK = 1L << TASKS_BIT_OFFSET;
@VisibleForTesting static final int TASKS_MAX_VALUE = (int) (TASKS_MASK >> TASKS_BIT_OFFSET);
static final int TASKS_MAX_VALUE = (int) (TASKS_MASK >> TASKS_BIT_OFFSET);

private static final long CPU_HEAVY_TASKS_MASK = 0x0000_0000_07F_FFFFL;
private static final int CPU_HEAVY_TASKS_BIT_OFFSET = 0;
Expand All @@ -477,8 +470,8 @@ private void dequeueCpuHeavyTaskAndRun() {
* <ol>
* <li>Canceled - (1 bit) true for cancelled.
* <li>CPU Permits - (11 bits) how many CPU heavy permits are available.
* <li>Threads - (16 bits) how many threads are available.
* <li>Tasks - (13 bits) how many non-CPU heavy tasks are inflight.
* <li>Threads - (15 bits) how many threads are available.
* <li>Tasks - (14 bits) how many non-CPU heavy tasks are inflight.
* <li>CPU Heavy Tasks - (23 bits) how many CPU heavy tasks are inflight.
* </ol>
*
Expand Down Expand Up @@ -512,7 +505,6 @@ private void resetExecutionCounters() {
countersAddress,
(((long) poolSize) << THREADS_BIT_OFFSET)
| (((long) cpuPermits) << CPU_PERMITS_BIT_OFFSET));
UNSAFE.putInt(null, queueSizeAddress, 0);
}

private boolean acquireThreadElseReleaseTask() {
Expand Down Expand Up @@ -625,31 +617,6 @@ private boolean tryUpdateExecutionCounters(long snapshot, long target) {
return UNSAFE.compareAndSwapLong(null, countersAddress, snapshot, target);
}

/**
* Address of the {@code int} queue size.
*
* <p>The queue size is used to detect when adding to the queue might overflow. Having a fixed cap
* on queue size enables using fewer bits to track its state, simplifying code paths having high
* contention.
*
* <p>The queue size can be transiently greater than, but is never less than the actual queue
* size. This property is maintained by always incrementing the counter before inserting into the
* queue and always decrementing after inserting into the queue.
*/
private final long queueSizeAddress;

private int queueSize() {
return UNSAFE.getIntVolatile(null, queueSizeAddress);
}

private int queueSizeIncrementAndGet() {
return UNSAFE.getAndAddInt(null, queueSizeAddress, 1) + 1;
}

private void queueSizeDecrement() {
UNSAFE.getAndAddInt(null, queueSizeAddress, -1);
}

private static String formatSnapshot(long snapshot) {
return String.format(
"{cancelled=%b, threads=%d, cpuPermits=%d, tasks=%d, cpuHeavyTasks=%d}",
Expand Down
Loading

0 comments on commit 44782dc

Please sign in to comment.