13
13
*/
14
14
package com .facebook .presto .server .remotetask ;
15
15
16
+ import com .facebook .airlift .concurrent .SetThreadName ;
16
17
import com .facebook .airlift .http .client .HttpClient ;
17
18
import com .facebook .airlift .http .client .Request ;
18
19
import com .facebook .airlift .http .client .ResponseHandler ;
37
38
import com .google .common .util .concurrent .Futures ;
38
39
import com .google .common .util .concurrent .ListenableFuture ;
39
40
import io .airlift .units .Duration ;
40
- import io .netty .channel .EventLoop ;
41
41
42
+ import javax .annotation .concurrent .GuardedBy ;
43
+
44
+ import java .util .concurrent .Executor ;
45
+ import java .util .concurrent .ScheduledExecutorService ;
42
46
import java .util .concurrent .atomic .AtomicBoolean ;
47
+ import java .util .concurrent .atomic .AtomicLong ;
43
48
import java .util .function .Consumer ;
44
49
45
50
import static com .facebook .airlift .http .client .HttpUriBuilder .uriBuilderFrom ;
55
60
import static com .facebook .presto .spi .StandardErrorCode .REMOTE_TASK_ERROR ;
56
61
import static com .facebook .presto .spi .StandardErrorCode .REMOTE_TASK_MISMATCH ;
57
62
import static com .facebook .presto .util .Failures .REMOTE_TASK_MISMATCH_ERROR ;
58
- import static com .google .common .base .Verify .verify ;
59
63
import static io .airlift .units .Duration .nanosSince ;
60
64
import static java .lang .String .format ;
61
65
import static java .util .Objects .requireNonNull ;
@@ -71,16 +75,20 @@ class ContinuousTaskStatusFetcher
71
75
private final Codec <TaskStatus > taskStatusCodec ;
72
76
73
77
private final Duration refreshMaxWait ;
74
- private final EventLoop taskEventLoop ;
78
+ private final Executor executor ;
75
79
private final HttpClient httpClient ;
76
80
private final RequestErrorTracker errorTracker ;
77
81
private final RemoteTaskStats stats ;
78
82
private final boolean binaryTransportEnabled ;
79
83
private final boolean thriftTransportEnabled ;
80
84
private final Protocol thriftProtocol ;
81
- private long currentRequestStartNanos ;
85
+
86
+ private final AtomicLong currentRequestStartNanos = new AtomicLong ();
87
+
88
+ @ GuardedBy ("this" )
82
89
private boolean running ;
83
90
91
+ @ GuardedBy ("this" )
84
92
private ListenableFuture <BaseResponse <TaskStatus >> future ;
85
93
86
94
public ContinuousTaskStatusFetcher (
@@ -89,9 +97,10 @@ public ContinuousTaskStatusFetcher(
89
97
TaskStatus initialTaskStatus ,
90
98
Duration refreshMaxWait ,
91
99
Codec <TaskStatus > taskStatusCodec ,
92
- EventLoop taskEventLoop ,
100
+ Executor executor ,
93
101
HttpClient httpClient ,
94
102
Duration maxErrorDuration ,
103
+ ScheduledExecutorService errorScheduledExecutor ,
95
104
RemoteTaskStats stats ,
96
105
boolean binaryTransportEnabled ,
97
106
boolean thriftTransportEnabled ,
@@ -101,25 +110,23 @@ public ContinuousTaskStatusFetcher(
101
110
102
111
this .taskId = requireNonNull (taskId , "taskId is null" );
103
112
this .onFail = requireNonNull (onFail , "onFail is null" );
104
- this .taskStatus = new StateMachine <>("task-" + taskId , taskEventLoop , initialTaskStatus );
113
+ this .taskStatus = new StateMachine <>("task-" + taskId , executor , initialTaskStatus );
105
114
106
115
this .refreshMaxWait = requireNonNull (refreshMaxWait , "refreshMaxWait is null" );
107
116
this .taskStatusCodec = requireNonNull (taskStatusCodec , "taskStatusCodec is null" );
108
117
109
- this .taskEventLoop = requireNonNull (taskEventLoop , "taskEventLoop is null" );
118
+ this .executor = requireNonNull (executor , "executor is null" );
110
119
this .httpClient = requireNonNull (httpClient , "httpClient is null" );
111
120
112
- this .errorTracker = taskRequestErrorTracker (taskId , initialTaskStatus .getSelf (), maxErrorDuration , taskEventLoop , "getting task status" );
121
+ this .errorTracker = taskRequestErrorTracker (taskId , initialTaskStatus .getSelf (), maxErrorDuration , errorScheduledExecutor , "getting task status" );
113
122
this .stats = requireNonNull (stats , "stats is null" );
114
123
this .binaryTransportEnabled = binaryTransportEnabled ;
115
124
this .thriftTransportEnabled = thriftTransportEnabled ;
116
125
this .thriftProtocol = requireNonNull (thriftProtocol , "thriftProtocol is null" );
117
126
}
118
127
119
- public void start ()
128
+ public synchronized void start ()
120
129
{
121
- verify (taskEventLoop .inEventLoop ());
122
-
123
130
if (running ) {
124
131
// already running
125
132
return ;
@@ -128,10 +135,8 @@ public void start()
128
135
scheduleNextRequest ();
129
136
}
130
137
131
- public void stop ()
138
+ public synchronized void stop ()
132
139
{
133
- verify (taskEventLoop .inEventLoop ());
134
-
135
140
running = false ;
136
141
if (future != null ) {
137
142
// do not terminate if the request is already running to avoid closing pooled connections
@@ -140,10 +145,8 @@ public void stop()
140
145
}
141
146
}
142
147
143
- private void scheduleNextRequest ()
148
+ private synchronized void scheduleNextRequest ()
144
149
{
145
- verify (taskEventLoop .inEventLoop ());
146
-
147
150
// stopped or done?
148
151
TaskStatus taskStatus = getTaskStatus ();
149
152
if (!running || taskStatus .getState ().isDone ()) {
@@ -160,7 +163,7 @@ private void scheduleNextRequest()
160
163
// if throttled due to error, asynchronously wait for timeout and try again
161
164
ListenableFuture <?> errorRateLimit = errorTracker .acquireRequestPermit ();
162
165
if (!errorRateLimit .isDone ()) {
163
- errorRateLimit .addListener (this ::scheduleNextRequest , taskEventLoop );
166
+ errorRateLimit .addListener (this ::scheduleNextRequest , executor );
164
167
return ;
165
168
}
166
169
@@ -186,7 +189,7 @@ else if (binaryTransportEnabled) {
186
189
187
190
errorTracker .startRequest ();
188
191
future = httpClient .executeAsync (request , responseHandler );
189
- currentRequestStartNanos = System .nanoTime ();
192
+ currentRequestStartNanos . set ( System .nanoTime () );
190
193
FutureCallback callback ;
191
194
if (thriftTransportEnabled ) {
192
195
callback = new ThriftHttpResponseHandler (this , request .getUri (), stats .getHttpResponseStats (), REMOTE_TASK_ERROR );
@@ -198,7 +201,7 @@ else if (binaryTransportEnabled) {
198
201
Futures .addCallback (
199
202
future ,
200
203
callback ,
201
- taskEventLoop );
204
+ executor );
202
205
}
203
206
204
207
TaskStatus getTaskStatus ()
@@ -209,62 +212,59 @@ TaskStatus getTaskStatus()
209
212
@ Override
210
213
public void success (TaskStatus value )
211
214
{
212
- verify ( taskEventLoop . inEventLoop ());
213
-
214
- updateStats ( currentRequestStartNanos );
215
- try {
216
- updateTaskStatus ( value );
217
- errorTracker . requestSucceeded ();
218
- }
219
- finally {
220
- scheduleNextRequest ();
215
+ try ( SetThreadName ignored = new SetThreadName ( "ContinuousTaskStatusFetcher-%s" , taskId )) {
216
+ updateStats ( currentRequestStartNanos . get ());
217
+ try {
218
+ updateTaskStatus ( value );
219
+ errorTracker . requestSucceeded ( );
220
+ }
221
+ finally {
222
+ scheduleNextRequest ();
223
+ }
221
224
}
222
225
}
223
226
224
227
@ Override
225
228
public void failed (Throwable cause )
226
229
{
227
- verify (taskEventLoop .inEventLoop ());
228
-
229
- updateStats (currentRequestStartNanos );
230
- try {
231
- // if task not already done, record error
232
- TaskStatus taskStatus = getTaskStatus ();
233
- if (!taskStatus .getState ().isDone ()) {
234
- errorTracker .requestFailed (cause );
230
+ try (SetThreadName ignored = new SetThreadName ("ContinuousTaskStatusFetcher-%s" , taskId )) {
231
+ updateStats (currentRequestStartNanos .get ());
232
+ try {
233
+ // if task not already done, record error
234
+ TaskStatus taskStatus = getTaskStatus ();
235
+ if (!taskStatus .getState ().isDone ()) {
236
+ errorTracker .requestFailed (cause );
237
+ }
238
+ }
239
+ catch (Error e ) {
240
+ onFail .accept (e );
241
+ throw e ;
242
+ }
243
+ catch (RuntimeException e ) {
244
+ onFail .accept (e );
245
+ }
246
+ finally {
247
+ scheduleNextRequest ();
235
248
}
236
- }
237
- catch (Error e ) {
238
- onFail .accept (e );
239
- throw e ;
240
- }
241
- catch (RuntimeException e ) {
242
- onFail .accept (e );
243
- }
244
- finally {
245
- scheduleNextRequest ();
246
249
}
247
250
}
248
251
249
252
@ Override
250
253
public void fatal (Throwable cause )
251
254
{
252
- verify ( taskEventLoop . inEventLoop ());
253
-
254
- updateStats ( currentRequestStartNanos );
255
- onFail . accept ( cause );
255
+ try ( SetThreadName ignored = new SetThreadName ( "ContinuousTaskStatusFetcher-%s" , taskId )) {
256
+ updateStats ( currentRequestStartNanos . get ());
257
+ onFail . accept ( cause );
258
+ }
256
259
}
257
260
258
261
void updateTaskStatus (TaskStatus newValue )
259
262
{
260
- verify (taskEventLoop .inEventLoop ());
261
-
262
263
// change to new value if old value is not changed and new value has a newer version
263
264
AtomicBoolean taskMismatch = new AtomicBoolean ();
264
265
taskStatus .setIf (newValue , oldValue -> {
265
266
// did the task instance id change
266
- boolean isEmpty = (oldValue .getTaskInstanceIdLeastSignificantBits () == 0 && oldValue .getTaskInstanceIdMostSignificantBits () == 0 )
267
- || (newValue .getTaskInstanceIdLeastSignificantBits () == 0 && newValue .getTaskInstanceIdMostSignificantBits () == 0 );
267
+ boolean isEmpty = oldValue .getTaskInstanceIdLeastSignificantBits () == 0 && oldValue .getTaskInstanceIdMostSignificantBits () == 0 ;
268
268
if (!isEmpty &&
269
269
!(oldValue .getTaskInstanceIdLeastSignificantBits () == newValue .getTaskInstanceIdLeastSignificantBits () &&
270
270
oldValue .getTaskInstanceIdMostSignificantBits () == newValue .getTaskInstanceIdMostSignificantBits ())) {
@@ -291,6 +291,11 @@ void updateTaskStatus(TaskStatus newValue)
291
291
}
292
292
}
293
293
294
+ public synchronized boolean isRunning ()
295
+ {
296
+ return running ;
297
+ }
298
+
294
299
/**
295
300
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
296
301
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
@@ -303,8 +308,6 @@ public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus>
303
308
304
309
private void updateStats (long currentRequestStartNanos )
305
310
{
306
- verify (taskEventLoop .inEventLoop ());
307
-
308
311
stats .statusRoundTripMillis (nanosSince (currentRequestStartNanos ).toMillis ());
309
312
}
310
313
}
0 commit comments