76
76
import com .sun .management .ThreadMXBean ;
77
77
import io .airlift .units .DataSize ;
78
78
import io .airlift .units .Duration ;
79
- import io .netty .channel .EventLoop ;
80
79
import it .unimi .dsi .fastutil .longs .LongArrayList ;
81
80
import org .joda .time .DateTime ;
82
81
143
142
* This class now uses an event loop concurrency model to eliminate the need for explicit synchronization:
144
143
* <ul>
145
144
* <li>All mutable state access and modifications are performed on a single dedicated event loop thread</li>
146
- * <li>External threads submit operations to the event loop using {@code taskEventLoop.execute ()}</li>
145
+ * <li>External threads submit operations to the event loop using {@code safeExecuteOnEventLoop ()}</li>
147
146
* <li>The event loop serializes all operations, eliminating race conditions without using locks</li>
148
147
* </ul>
149
148
* <p>
@@ -230,9 +229,9 @@ public final class HttpRemoteTask
230
229
private final DecayCounter taskUpdateRequestSize ;
231
230
private final SchedulerStatsTracker schedulerStatsTracker ;
232
231
233
- private final EventLoop taskEventLoop ;
232
+ private final HttpRemoteTaskFactory . SafeEventLoop taskEventLoop ;
234
233
235
- public HttpRemoteTask (
234
+ public static HttpRemoteTask createHttpRemoteTask (
236
235
Session session ,
237
236
TaskId taskId ,
238
237
String nodeId ,
@@ -267,7 +266,82 @@ public HttpRemoteTask(
267
266
HandleResolver handleResolver ,
268
267
ConnectorTypeSerdeManager connectorTypeSerdeManager ,
269
268
SchedulerStatsTracker schedulerStatsTracker ,
270
- EventLoop taskEventLoop )
269
+ HttpRemoteTaskFactory .SafeEventLoop taskEventLoop )
270
+ {
271
+ HttpRemoteTask task = new HttpRemoteTask (session ,
272
+ taskId ,
273
+ nodeId ,
274
+ location ,
275
+ remoteLocation ,
276
+ planFragment ,
277
+ initialSplits ,
278
+ outputBuffers ,
279
+ httpClient ,
280
+ maxErrorDuration ,
281
+ taskStatusRefreshMaxWait ,
282
+ taskInfoRefreshMaxWait ,
283
+ taskInfoUpdateInterval ,
284
+ summarizeTaskInfo ,
285
+ taskStatusCodec ,
286
+ taskInfoCodec ,
287
+ taskInfoJsonCodec ,
288
+ taskUpdateRequestCodec ,
289
+ planFragmentCodec ,
290
+ metadataUpdatesCodec ,
291
+ nodeStatsTracker ,
292
+ stats ,
293
+ binaryTransportEnabled ,
294
+ thriftTransportEnabled ,
295
+ taskInfoThriftTransportEnabled ,
296
+ thriftProtocol ,
297
+ tableWriteInfo ,
298
+ maxTaskUpdateSizeInBytes ,
299
+ metadataManager ,
300
+ queryManager ,
301
+ taskUpdateRequestSize ,
302
+ handleResolver ,
303
+ connectorTypeSerdeManager ,
304
+ schedulerStatsTracker ,
305
+ taskEventLoop );
306
+ task .initialize ();
307
+ return task ;
308
+ }
309
+
310
+ private HttpRemoteTask (Session session ,
311
+ TaskId taskId ,
312
+ String nodeId ,
313
+ URI location ,
314
+ URI remoteLocation ,
315
+ PlanFragment planFragment ,
316
+ Multimap <PlanNodeId , Split > initialSplits ,
317
+ OutputBuffers outputBuffers ,
318
+ HttpClient httpClient ,
319
+ Duration maxErrorDuration ,
320
+ Duration taskStatusRefreshMaxWait ,
321
+ Duration taskInfoRefreshMaxWait ,
322
+ Duration taskInfoUpdateInterval ,
323
+ boolean summarizeTaskInfo ,
324
+ Codec <TaskStatus > taskStatusCodec ,
325
+ Codec <TaskInfo > taskInfoCodec ,
326
+ Codec <TaskInfo > taskInfoJsonCodec ,
327
+ Codec <TaskUpdateRequest > taskUpdateRequestCodec ,
328
+ Codec <PlanFragment > planFragmentCodec ,
329
+ Codec <MetadataUpdates > metadataUpdatesCodec ,
330
+ NodeStatsTracker nodeStatsTracker ,
331
+ RemoteTaskStats stats ,
332
+ boolean binaryTransportEnabled ,
333
+ boolean thriftTransportEnabled ,
334
+ boolean taskInfoThriftTransportEnabled ,
335
+ Protocol thriftProtocol ,
336
+ TableWriteInfo tableWriteInfo ,
337
+ int maxTaskUpdateSizeInBytes ,
338
+ MetadataManager metadataManager ,
339
+ QueryManager queryManager ,
340
+ DecayCounter taskUpdateRequestSize ,
341
+ HandleResolver handleResolver ,
342
+ ConnectorTypeSerdeManager connectorTypeSerdeManager ,
343
+ SchedulerStatsTracker schedulerStatsTracker ,
344
+ HttpRemoteTaskFactory .SafeEventLoop taskEventLoop )
271
345
{
272
346
requireNonNull (session , "session is null" );
273
347
requireNonNull (taskId , "taskId is null" );
@@ -389,7 +463,11 @@ public HttpRemoteTask(
389
463
handleResolver ,
390
464
connectorTypeSerdeManager ,
391
465
thriftProtocol );
466
+ }
392
467
468
+ // this is a separate method to ensure that the `this` reference is not leaked during construction
469
+ private void initialize ()
470
+ {
393
471
taskStatusFetcher .addStateChangeListener (newStatus -> {
394
472
verify (taskEventLoop .inEventLoop ());
395
473
@@ -404,7 +482,7 @@ public HttpRemoteTask(
404
482
});
405
483
406
484
updateTaskStats ();
407
- taskEventLoop . execute (this ::updateSplitQueueSpace );
485
+ safeExecuteOnEventLoop (this ::updateSplitQueueSpace );
408
486
}
409
487
410
488
public PlanFragment getPlanFragment ()
@@ -445,7 +523,7 @@ public URI getRemoteTaskLocation()
445
523
@ Override
446
524
public void start ()
447
525
{
448
- taskEventLoop . execute (() -> {
526
+ safeExecuteOnEventLoop (() -> {
449
527
// to start we just need to trigger an update
450
528
started = true ;
451
529
scheduleUpdate ();
@@ -465,7 +543,7 @@ public void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
465
543
return ;
466
544
}
467
545
468
- taskEventLoop . execute (() -> {
546
+ safeExecuteOnEventLoop (() -> {
469
547
boolean updateNeeded = false ;
470
548
for (Entry <PlanNodeId , Collection <Split >> entry : splitsBySource .asMap ().entrySet ()) {
471
549
PlanNodeId sourceId = entry .getKey ();
@@ -502,7 +580,7 @@ public void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
502
580
@ Override
503
581
public void noMoreSplits (PlanNodeId sourceId )
504
582
{
505
- taskEventLoop . execute (() -> {
583
+ safeExecuteOnEventLoop (() -> {
506
584
if (noMoreSplits .containsKey (sourceId )) {
507
585
return ;
508
586
}
@@ -516,7 +594,7 @@ public void noMoreSplits(PlanNodeId sourceId)
516
594
@ Override
517
595
public void noMoreSplits (PlanNodeId sourceId , Lifespan lifespan )
518
596
{
519
- taskEventLoop . execute (() -> {
597
+ safeExecuteOnEventLoop (() -> {
520
598
if (pendingNoMoreSplitsForLifespan .put (sourceId , lifespan )) {
521
599
needsUpdate = true ;
522
600
scheduleUpdate ();
@@ -531,7 +609,7 @@ public void setOutputBuffers(OutputBuffers newOutputBuffers)
531
609
return ;
532
610
}
533
611
534
- taskEventLoop . execute (() -> {
612
+ safeExecuteOnEventLoop (() -> {
535
613
if (newOutputBuffers .getVersion () > outputBuffers .getVersion ()) {
536
614
outputBuffers = newOutputBuffers ;
537
615
needsUpdate = true ;
@@ -696,7 +774,7 @@ public ListenableFuture<?> whenSplitQueueHasSpace(long weightThreshold)
696
774
return immediateFuture (null );
697
775
}
698
776
SettableFuture <?> future = SettableFuture .create ();
699
- taskEventLoop . execute (() -> {
777
+ safeExecuteOnEventLoop (() -> {
700
778
if (whenSplitQueueHasSpaceThreshold .isPresent ()) {
701
779
checkArgument (weightThreshold == whenSplitQueueHasSpaceThreshold .getAsLong (), "Multiple split queue space notification thresholds not supported" );
702
780
}
@@ -866,7 +944,7 @@ private void scheduleUpdate()
866
944
867
945
private void sendUpdate ()
868
946
{
869
- taskEventLoop . execute (() -> {
947
+ safeExecuteOnEventLoop (() -> {
870
948
TaskStatus taskStatus = getTaskStatus ();
871
949
// don't update if the task hasn't been started yet or if it is already finished
872
950
if (!started || !needsUpdate || taskStatus .getState ().isDone ()) {
@@ -987,7 +1065,7 @@ private TaskSource getSource(PlanNodeId planNodeId)
987
1065
@ Override
988
1066
public void cancel ()
989
1067
{
990
- taskEventLoop . execute (() -> {
1068
+ safeExecuteOnEventLoop (() -> {
991
1069
TaskStatus taskStatus = getTaskStatus ();
992
1070
if (taskStatus .getState ().isDone ()) {
993
1071
return ;
@@ -1007,7 +1085,7 @@ public void cancel()
1007
1085
1008
1086
private void cleanUpTask ()
1009
1087
{
1010
- taskEventLoop . execute (() -> {
1088
+ safeExecuteOnEventLoop (() -> {
1011
1089
checkState (getTaskStatus ().getState ().isDone (), "attempt to clean up a task that is not done yet" );
1012
1090
1013
1091
// clear pending splits to free memory
@@ -1055,7 +1133,7 @@ public void abort()
1055
1133
1056
1134
private void abort (TaskStatus status )
1057
1135
{
1058
- taskEventLoop . execute (() -> {
1136
+ safeExecuteOnEventLoop (() -> {
1059
1137
checkState (status .getState ().isDone (), "cannot abort task with an incomplete status" );
1060
1138
1061
1139
taskStatusFetcher .updateTaskStatus (status );
@@ -1317,4 +1395,19 @@ public void onFailure(Throwable throwable)
1317
1395
onFailureTaskInfo (throwable , this .action , this .request , this .cleanupBackoff );
1318
1396
}
1319
1397
}
1398
+
1399
+ /***
1400
+ * Wrap the task execution on event loop to fail the entire task on any failure.
1401
+ */
1402
+ private void safeExecuteOnEventLoop (Runnable r )
1403
+ {
1404
+ taskEventLoop .execute (() -> {
1405
+ try {
1406
+ r .run ();
1407
+ }
1408
+ catch (Throwable t ) {
1409
+ failTask (t );
1410
+ }
1411
+ });
1412
+ }
1320
1413
}
0 commit comments