Skip to content

Commit e2fdc66

Browse files
authored
Put event loop behind a toggle (prestodb#24668)
1 parent 2d08ffb commit e2fdc66

10 files changed

+3023
-2
lines changed

pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@
232232
<scope>import</scope>
233233
</dependency>
234234

235+
<dependency>
236+
<groupId>io.netty</groupId>
237+
<artifactId>netty-transport</artifactId>
238+
<version>${dep.netty.version}</version>
239+
</dependency>
240+
235241
<dependency>
236242
<groupId>com.facebook.presto</groupId>
237243
<artifactId>presto-testing-docker</artifactId>

presto-main/pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,16 @@
511511
<optional>true</optional>
512512
</dependency>
513513

514+
<dependency>
515+
<groupId>io.netty</groupId>
516+
<artifactId>netty-transport</artifactId>
517+
</dependency>
518+
519+
<dependency>
520+
<groupId>io.netty</groupId>
521+
<artifactId>netty-common</artifactId>
522+
</dependency>
523+
514524
<dependency>
515525
<groupId>com.squareup.okhttp3</groupId>
516526
<artifactId>mockwebserver</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution;
15+
16+
import com.facebook.airlift.log.Logger;
17+
import io.netty.channel.DefaultEventLoop;
18+
import io.netty.channel.DefaultEventLoopGroup;
19+
import io.netty.channel.EventLoop;
20+
import io.netty.channel.EventLoopGroup;
21+
22+
import java.util.concurrent.Executor;
23+
import java.util.concurrent.ThreadFactory;
24+
import java.util.function.Consumer;
25+
import java.util.function.Supplier;
26+
27+
import static java.util.Objects.requireNonNull;
28+
29+
/***
30+
* One observation about event loop is if submitted task fails, it could kill the thread but the event loop group will not create a new one.
31+
* Here, we wrap it as safe event loop so that if any submitted job fails, we chose to log the error and fail the entire task.
32+
*/
33+
34+
public class SafeEventLoopGroup
35+
extends DefaultEventLoopGroup
36+
{
37+
private static final Logger log = Logger.get(SafeEventLoopGroup.class);
38+
39+
public SafeEventLoopGroup(int nThreads, ThreadFactory threadFactory)
40+
{
41+
super(nThreads, threadFactory);
42+
}
43+
44+
@Override
45+
protected EventLoop newChild(Executor executor, Object... args)
46+
{
47+
return new SafeEventLoop(this, executor);
48+
}
49+
50+
public static class SafeEventLoop
51+
extends DefaultEventLoop
52+
{
53+
public SafeEventLoop(EventLoopGroup parent, Executor executor)
54+
{
55+
super(parent, executor);
56+
}
57+
58+
@Override
59+
protected void run()
60+
{
61+
do {
62+
Runnable task = takeTask();
63+
if (task != null) {
64+
try {
65+
runTask(task);
66+
}
67+
catch (Throwable t) {
68+
log.error("Error running task on event loop", t);
69+
}
70+
updateLastExecutionTime();
71+
}
72+
}
73+
while (!this.confirmShutdown());
74+
}
75+
76+
public void execute(Runnable task, Consumer<Throwable> failureHandler)
77+
{
78+
requireNonNull(task, "task is null");
79+
this.execute(() -> {
80+
try {
81+
task.run();
82+
}
83+
catch (Throwable t) {
84+
log.error("Error executing task on event loop", t);
85+
if (failureHandler != null) {
86+
failureHandler.accept(t);
87+
}
88+
}
89+
});
90+
}
91+
92+
public <T> void execute(Supplier<T> task, Consumer<T> successHandler, Consumer<Throwable> failureHandler)
93+
{
94+
requireNonNull(task, "task is null");
95+
this.execute(() -> {
96+
try {
97+
T result = task.get();
98+
if (successHandler != null) {
99+
successHandler.accept(result);
100+
}
101+
}
102+
catch (Throwable t) {
103+
log.error("Error executing task on event loop", t);
104+
if (failureHandler != null) {
105+
failureHandler.accept(t);
106+
}
107+
}
108+
});
109+
}
110+
}
111+
}

presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java

+13
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,19 @@ public class TaskManagerConfig
100100
private double highMemoryTaskKillerGCReclaimMemoryThreshold = 0.01;
101101
private Duration highMemoryTaskKillerFrequentFullGCDurationThreshold = new Duration(1, SECONDS);
102102
private double highMemoryTaskKillerHeapMemoryThreshold = 0.9;
103+
private boolean enableEventLoop;
104+
105+
@Config("task.enable-event-loop")
106+
public TaskManagerConfig setEventLoopEnabled(boolean enableEventLoop)
107+
{
108+
this.enableEventLoop = enableEventLoop;
109+
return this;
110+
}
111+
112+
public boolean isEventLoopEnabled()
113+
{
114+
return enableEventLoop;
115+
}
103116

104117
@MinDuration("1ms")
105118
@MaxDuration("10s")

0 commit comments

Comments
 (0)