Skip to content

Commit

Permalink
Assign event loop to tasks and run methods from a task in the same ev…
Browse files Browse the repository at this point in the history
…ent loop
  • Loading branch information
shangm2 authored and arhimondr committed Jan 15, 2025
1 parent 1b57e1c commit b511e7a
Show file tree
Hide file tree
Showing 7 changed files with 588 additions and 578 deletions.
22 changes: 18 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${dep.netty.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-testing-docker</artifactId>
Expand Down Expand Up @@ -2660,7 +2666,9 @@
<executions>
<execution>
<id>unlink-out-of-tree-build-directory</id>
<goals><goal>exec</goal></goals>
<goals>
<goal>exec</goal>
</goals>
<phase>pre-clean</phase>
<configuration>
<executable>rm</executable>
Expand All @@ -2672,7 +2680,9 @@
</execution>
<execution>
<id>remove-out-of-tree-build-directory</id>
<goals><goal>exec</goal></goals>
<goals>
<goal>exec</goal>
</goals>
<phase>pre-clean</phase>
<configuration>
<executable>rm</executable>
Expand All @@ -2684,7 +2694,9 @@
</execution>
<execution>
<id>create-out-of-tree-build-directory</id>
<goals><goal>exec</goal></goals>
<goals>
<goal>exec</goal>
</goals>
<phase>validate</phase>
<configuration>
<executable>mkdir</executable>
Expand All @@ -2696,7 +2708,9 @@
</execution>
<execution>
<id>link-out-of-tree-build-directory</id>
<goals><goal>exec</goal></goals>
<goals>
<goal>exec</goal>
</goals>
<phase>validate</phase>
<configuration>
<executable>ln</executable>
Expand Down
5 changes: 5 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@
<artifactId>ratis-common</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.server.remotetask;

import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.ResponseHandler;
Expand All @@ -38,13 +37,9 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import io.netty.channel.EventLoop;

import javax.annotation.concurrent.GuardedBy;

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand All @@ -60,6 +55,7 @@
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_MISMATCH;
import static com.facebook.presto.util.Failures.REMOTE_TASK_MISMATCH_ERROR;
import static com.google.common.base.Verify.verify;
import static io.airlift.units.Duration.nanosSince;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -75,20 +71,16 @@ class ContinuousTaskStatusFetcher
private final Codec<TaskStatus> taskStatusCodec;

private final Duration refreshMaxWait;
private final Executor executor;
private final EventLoop taskEventLoop;
private final HttpClient httpClient;
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final boolean binaryTransportEnabled;
private final boolean thriftTransportEnabled;
private final Protocol thriftProtocol;

private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private long currentRequestStartNanos;
private boolean running;

@GuardedBy("this")
private ListenableFuture<BaseResponse<TaskStatus>> future;

public ContinuousTaskStatusFetcher(
Expand All @@ -97,10 +89,9 @@ public ContinuousTaskStatusFetcher(
TaskStatus initialTaskStatus,
Duration refreshMaxWait,
Codec<TaskStatus> taskStatusCodec,
Executor executor,
EventLoop taskEventLoop,
HttpClient httpClient,
Duration maxErrorDuration,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats,
boolean binaryTransportEnabled,
boolean thriftTransportEnabled,
Expand All @@ -110,23 +101,25 @@ public ContinuousTaskStatusFetcher(

this.taskId = requireNonNull(taskId, "taskId is null");
this.onFail = requireNonNull(onFail, "onFail is null");
this.taskStatus = new StateMachine<>("task-" + taskId, executor, initialTaskStatus);
this.taskStatus = new StateMachine<>("task-" + taskId, taskEventLoop, initialTaskStatus);

this.refreshMaxWait = requireNonNull(refreshMaxWait, "refreshMaxWait is null");
this.taskStatusCodec = requireNonNull(taskStatusCodec, "taskStatusCodec is null");

this.executor = requireNonNull(executor, "executor is null");
this.taskEventLoop = requireNonNull(taskEventLoop, "taskEventLoop is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");

this.errorTracker = taskRequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
this.errorTracker = taskRequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, taskEventLoop, "getting task status");
this.stats = requireNonNull(stats, "stats is null");
this.binaryTransportEnabled = binaryTransportEnabled;
this.thriftTransportEnabled = thriftTransportEnabled;
this.thriftProtocol = requireNonNull(thriftProtocol, "thriftProtocol is null");
}

public synchronized void start()
public void start()
{
verify(taskEventLoop.inEventLoop());

if (running) {
// already running
return;
Expand All @@ -135,8 +128,10 @@ public synchronized void start()
scheduleNextRequest();
}

public synchronized void stop()
public void stop()
{
verify(taskEventLoop.inEventLoop());

running = false;
if (future != null) {
// do not terminate if the request is already running to avoid closing pooled connections
Expand All @@ -145,8 +140,10 @@ public synchronized void stop()
}
}

private synchronized void scheduleNextRequest()
private void scheduleNextRequest()
{
verify(taskEventLoop.inEventLoop());

// stopped or done?
TaskStatus taskStatus = getTaskStatus();
if (!running || taskStatus.getState().isDone()) {
Expand All @@ -163,7 +160,7 @@ private synchronized void scheduleNextRequest()
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<?> errorRateLimit = errorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::scheduleNextRequest, executor);
errorRateLimit.addListener(this::scheduleNextRequest, taskEventLoop);
return;
}

Expand All @@ -189,7 +186,7 @@ else if (binaryTransportEnabled) {

errorTracker.startRequest();
future = httpClient.executeAsync(request, responseHandler);
currentRequestStartNanos.set(System.nanoTime());
currentRequestStartNanos = System.nanoTime();
FutureCallback callback;
if (thriftTransportEnabled) {
callback = new ThriftHttpResponseHandler(this, request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR);
Expand All @@ -201,7 +198,7 @@ else if (binaryTransportEnabled) {
Futures.addCallback(
future,
callback,
executor);
taskEventLoop);
}

TaskStatus getTaskStatus()
Expand All @@ -212,59 +209,62 @@ TaskStatus getTaskStatus()
@Override
public void success(TaskStatus value)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
}
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
}
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
onFail.accept(cause);
}

void updateTaskStatus(TaskStatus newValue)
{
verify(taskEventLoop.inEventLoop());

// change to new value if old value is not changed and new value has a newer version
AtomicBoolean taskMismatch = new AtomicBoolean();
taskStatus.setIf(newValue, oldValue -> {
// did the task instance id change
boolean isEmpty = oldValue.getTaskInstanceIdLeastSignificantBits() == 0 && oldValue.getTaskInstanceIdMostSignificantBits() == 0;
boolean isEmpty = (oldValue.getTaskInstanceIdLeastSignificantBits() == 0 && oldValue.getTaskInstanceIdMostSignificantBits() == 0)
|| (newValue.getTaskInstanceIdLeastSignificantBits() == 0 && newValue.getTaskInstanceIdMostSignificantBits() == 0);
if (!isEmpty &&
!(oldValue.getTaskInstanceIdLeastSignificantBits() == newValue.getTaskInstanceIdLeastSignificantBits() &&
oldValue.getTaskInstanceIdMostSignificantBits() == newValue.getTaskInstanceIdMostSignificantBits())) {
Expand All @@ -291,11 +291,6 @@ void updateTaskStatus(TaskStatus newValue)
}
}

public synchronized boolean isRunning()
{
return running;
}

/**
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
Expand All @@ -308,6 +303,8 @@ public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus>

private void updateStats(long currentRequestStartNanos)
{
verify(taskEventLoop.inEventLoop());

stats.statusRoundTripMillis(nanosSince(currentRequestStartNanos).toMillis());
}
}
Loading

0 comments on commit b511e7a

Please sign in to comment.