76
76
import com .sun .management .ThreadMXBean ;
77
77
import io .airlift .units .DataSize ;
78
78
import io .airlift .units .Duration ;
79
+ import io .netty .channel .EventLoop ;
79
80
import it .unimi .dsi .fastutil .longs .LongArrayList ;
80
81
import org .joda .time .DateTime ;
81
82
142
143
* This class now uses an event loop concurrency model to eliminate the need for explicit synchronization:
143
144
* <ul>
144
145
* <li>All mutable state access and modifications are performed on a single dedicated event loop thread</li>
145
- * <li>External threads submit operations to the event loop using {@code safeExecuteOnEventLoop ()}</li>
146
+ * <li>External threads submit operations to the event loop using {@code taskEventLoop.execute ()}</li>
146
147
* <li>The event loop serializes all operations, eliminating race conditions without using locks</li>
147
148
* </ul>
148
149
* <p>
@@ -230,9 +231,9 @@ public final class HttpRemoteTask
230
231
private final boolean taskUpdateSizeTrackingEnabled ;
231
232
private final SchedulerStatsTracker schedulerStatsTracker ;
232
233
233
- private final HttpRemoteTaskFactory . SafeEventLoop taskEventLoop ;
234
+ private final EventLoop taskEventLoop ;
234
235
235
- public static HttpRemoteTask createHttpRemoteTask (
236
+ public HttpRemoteTask (
236
237
Session session ,
237
238
TaskId taskId ,
238
239
String nodeId ,
@@ -268,84 +269,7 @@ public static HttpRemoteTask createHttpRemoteTask(
268
269
HandleResolver handleResolver ,
269
270
ConnectorTypeSerdeManager connectorTypeSerdeManager ,
270
271
SchedulerStatsTracker schedulerStatsTracker ,
271
- HttpRemoteTaskFactory .SafeEventLoop taskEventLoop )
272
- {
273
- HttpRemoteTask task = new HttpRemoteTask (session ,
274
- taskId ,
275
- nodeId ,
276
- location ,
277
- remoteLocation ,
278
- planFragment ,
279
- initialSplits ,
280
- outputBuffers ,
281
- httpClient ,
282
- maxErrorDuration ,
283
- taskStatusRefreshMaxWait ,
284
- taskInfoRefreshMaxWait ,
285
- taskInfoUpdateInterval ,
286
- summarizeTaskInfo ,
287
- taskStatusCodec ,
288
- taskInfoCodec ,
289
- taskInfoJsonCodec ,
290
- taskUpdateRequestCodec ,
291
- planFragmentCodec ,
292
- metadataUpdatesCodec ,
293
- nodeStatsTracker ,
294
- stats ,
295
- binaryTransportEnabled ,
296
- thriftTransportEnabled ,
297
- taskInfoThriftTransportEnabled ,
298
- thriftProtocol ,
299
- tableWriteInfo ,
300
- maxTaskUpdateSizeInBytes ,
301
- metadataManager ,
302
- queryManager ,
303
- taskUpdateRequestSize ,
304
- taskUpdateSizeTrackingEnabled ,
305
- handleResolver ,
306
- connectorTypeSerdeManager ,
307
- schedulerStatsTracker ,
308
- taskEventLoop );
309
- task .initialize ();
310
- return task ;
311
- }
312
-
313
- private HttpRemoteTask (Session session ,
314
- TaskId taskId ,
315
- String nodeId ,
316
- URI location ,
317
- URI remoteLocation ,
318
- PlanFragment planFragment ,
319
- Multimap <PlanNodeId , Split > initialSplits ,
320
- OutputBuffers outputBuffers ,
321
- HttpClient httpClient ,
322
- Duration maxErrorDuration ,
323
- Duration taskStatusRefreshMaxWait ,
324
- Duration taskInfoRefreshMaxWait ,
325
- Duration taskInfoUpdateInterval ,
326
- boolean summarizeTaskInfo ,
327
- Codec <TaskStatus > taskStatusCodec ,
328
- Codec <TaskInfo > taskInfoCodec ,
329
- Codec <TaskInfo > taskInfoJsonCodec ,
330
- Codec <TaskUpdateRequest > taskUpdateRequestCodec ,
331
- Codec <PlanFragment > planFragmentCodec ,
332
- Codec <MetadataUpdates > metadataUpdatesCodec ,
333
- NodeStatsTracker nodeStatsTracker ,
334
- RemoteTaskStats stats ,
335
- boolean binaryTransportEnabled ,
336
- boolean thriftTransportEnabled ,
337
- boolean taskInfoThriftTransportEnabled ,
338
- Protocol thriftProtocol ,
339
- TableWriteInfo tableWriteInfo ,
340
- int maxTaskUpdateSizeInBytes ,
341
- MetadataManager metadataManager ,
342
- QueryManager queryManager ,
343
- DecayCounter taskUpdateRequestSize ,
344
- boolean taskUpdateSizeTrackingEnabled ,
345
- HandleResolver handleResolver ,
346
- ConnectorTypeSerdeManager connectorTypeSerdeManager ,
347
- SchedulerStatsTracker schedulerStatsTracker ,
348
- HttpRemoteTaskFactory .SafeEventLoop taskEventLoop )
272
+ EventLoop taskEventLoop )
349
273
{
350
274
requireNonNull (session , "session is null" );
351
275
requireNonNull (taskId , "taskId is null" );
@@ -468,11 +392,7 @@ private HttpRemoteTask(Session session,
468
392
handleResolver ,
469
393
connectorTypeSerdeManager ,
470
394
thriftProtocol );
471
- }
472
395
473
- // this is a separate method to ensure that the `this` reference is not leaked during construction
474
- private void initialize ()
475
- {
476
396
taskStatusFetcher .addStateChangeListener (newStatus -> {
477
397
verify (taskEventLoop .inEventLoop ());
478
398
@@ -487,7 +407,7 @@ private void initialize()
487
407
});
488
408
489
409
updateTaskStats ();
490
- safeExecuteOnEventLoop (this ::updateSplitQueueSpace );
410
+ taskEventLoop . execute (this ::updateSplitQueueSpace );
491
411
}
492
412
493
413
public PlanFragment getPlanFragment ()
@@ -528,7 +448,7 @@ public URI getRemoteTaskLocation()
528
448
@ Override
529
449
public void start ()
530
450
{
531
- safeExecuteOnEventLoop (() -> {
451
+ taskEventLoop . execute (() -> {
532
452
// to start we just need to trigger an update
533
453
started = true ;
534
454
scheduleUpdate ();
@@ -548,7 +468,7 @@ public void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
548
468
return ;
549
469
}
550
470
551
- safeExecuteOnEventLoop (() -> {
471
+ taskEventLoop . execute (() -> {
552
472
boolean updateNeeded = false ;
553
473
for (Entry <PlanNodeId , Collection <Split >> entry : splitsBySource .asMap ().entrySet ()) {
554
474
PlanNodeId sourceId = entry .getKey ();
@@ -585,7 +505,7 @@ public void addSplits(Multimap<PlanNodeId, Split> splitsBySource)
585
505
@ Override
586
506
public void noMoreSplits (PlanNodeId sourceId )
587
507
{
588
- safeExecuteOnEventLoop (() -> {
508
+ taskEventLoop . execute (() -> {
589
509
if (noMoreSplits .containsKey (sourceId )) {
590
510
return ;
591
511
}
@@ -599,7 +519,7 @@ public void noMoreSplits(PlanNodeId sourceId)
599
519
@ Override
600
520
public void noMoreSplits (PlanNodeId sourceId , Lifespan lifespan )
601
521
{
602
- safeExecuteOnEventLoop (() -> {
522
+ taskEventLoop . execute (() -> {
603
523
if (pendingNoMoreSplitsForLifespan .put (sourceId , lifespan )) {
604
524
needsUpdate = true ;
605
525
scheduleUpdate ();
@@ -614,7 +534,7 @@ public void setOutputBuffers(OutputBuffers newOutputBuffers)
614
534
return ;
615
535
}
616
536
617
- safeExecuteOnEventLoop (() -> {
537
+ taskEventLoop . execute (() -> {
618
538
if (newOutputBuffers .getVersion () > outputBuffers .getVersion ()) {
619
539
outputBuffers = newOutputBuffers ;
620
540
needsUpdate = true ;
@@ -779,7 +699,7 @@ public ListenableFuture<?> whenSplitQueueHasSpace(long weightThreshold)
779
699
return immediateFuture (null );
780
700
}
781
701
SettableFuture <?> future = SettableFuture .create ();
782
- safeExecuteOnEventLoop (() -> {
702
+ taskEventLoop . execute (() -> {
783
703
if (whenSplitQueueHasSpaceThreshold .isPresent ()) {
784
704
checkArgument (weightThreshold == whenSplitQueueHasSpaceThreshold .getAsLong (), "Multiple split queue space notification thresholds not supported" );
785
705
}
@@ -949,7 +869,7 @@ private void scheduleUpdate()
949
869
950
870
private void sendUpdate ()
951
871
{
952
- safeExecuteOnEventLoop (() -> {
872
+ taskEventLoop . execute (() -> {
953
873
TaskStatus taskStatus = getTaskStatus ();
954
874
// don't update if the task hasn't been started yet or if it is already finished
955
875
if (!started || !needsUpdate || taskStatus .getState ().isDone ()) {
@@ -1072,7 +992,7 @@ private TaskSource getSource(PlanNodeId planNodeId)
1072
992
@ Override
1073
993
public void cancel ()
1074
994
{
1075
- safeExecuteOnEventLoop (() -> {
995
+ taskEventLoop . execute (() -> {
1076
996
TaskStatus taskStatus = getTaskStatus ();
1077
997
if (taskStatus .getState ().isDone ()) {
1078
998
return ;
@@ -1092,7 +1012,7 @@ public void cancel()
1092
1012
1093
1013
private void cleanUpTask ()
1094
1014
{
1095
- safeExecuteOnEventLoop (() -> {
1015
+ taskEventLoop . execute (() -> {
1096
1016
checkState (getTaskStatus ().getState ().isDone (), "attempt to clean up a task that is not done yet" );
1097
1017
1098
1018
// clear pending splits to free memory
@@ -1140,7 +1060,7 @@ public void abort()
1140
1060
1141
1061
private void abort (TaskStatus status )
1142
1062
{
1143
- safeExecuteOnEventLoop (() -> {
1063
+ taskEventLoop . execute (() -> {
1144
1064
checkState (status .getState ().isDone (), "cannot abort task with an incomplete status" );
1145
1065
1146
1066
taskStatusFetcher .updateTaskStatus (status );
@@ -1402,19 +1322,4 @@ public void onFailure(Throwable throwable)
1402
1322
onFailureTaskInfo (throwable , this .action , this .request , this .cleanupBackoff );
1403
1323
}
1404
1324
}
1405
-
1406
- /***
1407
- * Wrap the task execution on event loop to fail the entire task on any failure.
1408
- */
1409
- private void safeExecuteOnEventLoop (Runnable r )
1410
- {
1411
- taskEventLoop .execute (() -> {
1412
- try {
1413
- r .run ();
1414
- }
1415
- catch (Throwable t ) {
1416
- failTask (t );
1417
- }
1418
- });
1419
- }
1420
1325
}
0 commit comments