From 6ffc89ff128f85a6b7ae1752f5314e7d4a293af4 Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Thu, 17 Feb 2022 11:39:31 -0500 Subject: [PATCH] Fix race condition in DirectExchangeClient A race condition can occur when the client is closed while still being active (e.g.: early termination due to LIMIT). It may happen that the memory context is updated after close and then never set back to 0. --- .../trino/operator/DirectExchangeClient.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java index d8aa54c3f15f..5808845c96a5 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClient.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Sets.newConcurrentHashSet; @@ -77,8 +76,8 @@ public class DirectExchangeClient private long successfulRequests; @GuardedBy("this") private long averageBytesPerRequest; - - private final AtomicBoolean closed = new AtomicBoolean(); + @GuardedBy("this") + private boolean closed; private final LocalMemoryContext memoryContext; private final Executor pageBufferClientCallbackExecutor; @@ -144,7 +143,7 @@ public synchronized void addLocation(TaskId taskId, URI location) // Ignore new locations after close // NOTE: this MUST happen before checking no more locations is checked - if (closed.get()) { + if (closed) { return; } @@ -212,19 +211,22 @@ public Slice pollPage() { assertNotHoldsLock(); - if (closed.get()) { - return null; - } - Slice page = buffer.pollPage(); if (page == null) { return null; } - memoryContext.setBytes(buffer.getRetainedSizeInBytes()); - scheduleRequestIfNecessary(); + synchronized (this) { + if (!closed) { + memoryContext.setBytes(buffer.getRetainedSizeInBytes()); + scheduleRequestIfNecessary(); + } + } + // Return the page even if the client is closed. + // A concurrent thread may have responded to the `isFinished` change + // triggered by polling this page and may have closed the client. return page; } @@ -236,9 +238,10 @@ public boolean isFinished() @Override public synchronized void close() { - if (!closed.compareAndSet(false, true)) { + if (closed) { return; } + closed = true; for (HttpPageBufferClient client : allClients.values()) { closeQuietly(client); @@ -291,23 +294,23 @@ private boolean addPages(HttpPageBufferClient client, List pages) checkState(!completedClients.contains(client), "client is already marked as completed"); // Compute stats before acquiring the lock long responseSize = 0; - for (Slice page : pages) { - responseSize += page.length(); + if (!pages.isEmpty()) { + for (Slice page : pages) { + responseSize += page.length(); + } + // Buffer may already be closed at this point. In such situation the buffer is expected to simply ignore this call. + buffer.addPages(client.getRemoteTaskId(), pages); } synchronized (this) { - if (closed.get() || buffer.isFinished() || buffer.isFailed()) { + if (closed || buffer.isFinished() || buffer.isFailed()) { return false; } successfulRequests++; // AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n averageBytesPerRequest = (long) (1.0 * averageBytesPerRequest * (successfulRequests - 1) / successfulRequests + responseSize / successfulRequests); - } - // add pages outside of the lock - if (!pages.isEmpty()) { - buffer.addPages(client.getRemoteTaskId(), pages); memoryContext.setBytes(buffer.getRetainedSizeInBytes()); }