Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in DirectExchangeClient #11088

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Sets.newConcurrentHashSet;
Expand Down Expand Up @@ -77,8 +76,8 @@ public class DirectExchangeClient
private long successfulRequests;
@GuardedBy("this")
private long averageBytesPerRequest;

private final AtomicBoolean closed = new AtomicBoolean();
@GuardedBy("this")
private boolean closed;

private final LocalMemoryContext memoryContext;
private final Executor pageBufferClientCallbackExecutor;
Expand Down Expand Up @@ -144,7 +143,7 @@ public synchronized void addLocation(TaskId taskId, URI location)

// Ignore new locations after close
// NOTE: this MUST happen before checking no more locations is checked
if (closed.get()) {
if (closed) {
return;
}

Expand Down Expand Up @@ -212,19 +211,22 @@ public Slice pollPage()
{
assertNotHoldsLock();

if (closed.get()) {
return null;
}

Slice page = buffer.pollPage();

if (page == null) {
return null;
}

memoryContext.setBytes(buffer.getRetainedSizeInBytes());
scheduleRequestIfNecessary();
synchronized (this) {
if (!closed) {
memoryContext.setBytes(buffer.getRetainedSizeInBytes());
scheduleRequestIfNecessary();
}
}

// Return the page even if the client is closed.
// A concurrent thread may have responded to the `isFinished` change
// triggered by polling this page and may have closed the client.
return page;
}

Expand All @@ -236,9 +238,10 @@ public boolean isFinished()
@Override
public synchronized void close()
{
if (!closed.compareAndSet(false, true)) {
if (closed) {
return;
}
closed = true;

for (HttpPageBufferClient client : allClients.values()) {
closeQuietly(client);
Expand Down Expand Up @@ -291,23 +294,23 @@ private boolean addPages(HttpPageBufferClient client, List<Slice> pages)
checkState(!completedClients.contains(client), "client is already marked as completed");
// Compute stats before acquiring the lock
long responseSize = 0;
for (Slice page : pages) {
responseSize += page.length();
if (!pages.isEmpty()) {
for (Slice page : pages) {
responseSize += page.length();
}
// Buffer may already be closed at this point. In such situation the buffer is expected to simply ignore this call.
buffer.addPages(client.getRemoteTaskId(), pages);
arhimondr marked this conversation as resolved.
Show resolved Hide resolved
}

synchronized (this) {
if (closed.get() || buffer.isFinished() || buffer.isFailed()) {
if (closed || buffer.isFinished() || buffer.isFailed()) {
return false;
}

successfulRequests++;
// AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n
averageBytesPerRequest = (long) (1.0 * averageBytesPerRequest * (successfulRequests - 1) / successfulRequests + responseSize / successfulRequests);
}

// add pages outside of the lock
if (!pages.isEmpty()) {
buffer.addPages(client.getRemoteTaskId(), pages);
memoryContext.setBytes(buffer.getRetainedSizeInBytes());
}

Expand Down