@@ -227,6 +227,7 @@ public final class HttpRemoteTask
227
227
private final TableWriteInfo tableWriteInfo ;
228
228
229
229
private final DecayCounter taskUpdateRequestSize ;
230
+ private final boolean taskUpdateSizeTrackingEnabled ;
230
231
private final SchedulerStatsTracker schedulerStatsTracker ;
231
232
232
233
private final HttpRemoteTaskFactory .SafeEventLoop taskEventLoop ;
@@ -263,6 +264,7 @@ public static HttpRemoteTask createHttpRemoteTask(
263
264
MetadataManager metadataManager ,
264
265
QueryManager queryManager ,
265
266
DecayCounter taskUpdateRequestSize ,
267
+ boolean taskUpdateSizeTrackingEnabled ,
266
268
HandleResolver handleResolver ,
267
269
ConnectorTypeSerdeManager connectorTypeSerdeManager ,
268
270
SchedulerStatsTracker schedulerStatsTracker ,
@@ -299,6 +301,7 @@ public static HttpRemoteTask createHttpRemoteTask(
299
301
metadataManager ,
300
302
queryManager ,
301
303
taskUpdateRequestSize ,
304
+ taskUpdateSizeTrackingEnabled ,
302
305
handleResolver ,
303
306
connectorTypeSerdeManager ,
304
307
schedulerStatsTracker ,
@@ -338,6 +341,7 @@ private HttpRemoteTask(Session session,
338
341
MetadataManager metadataManager ,
339
342
QueryManager queryManager ,
340
343
DecayCounter taskUpdateRequestSize ,
344
+ boolean taskUpdateSizeTrackingEnabled ,
341
345
HandleResolver handleResolver ,
342
346
ConnectorTypeSerdeManager connectorTypeSerdeManager ,
343
347
SchedulerStatsTracker schedulerStatsTracker ,
@@ -404,6 +408,7 @@ private HttpRemoteTask(Session session,
404
408
.map (PlanNode ::getId )
405
409
.collect (toImmutableSet ());
406
410
this .taskUpdateRequestSize = taskUpdateRequestSize ;
411
+ this .taskUpdateSizeTrackingEnabled = taskUpdateSizeTrackingEnabled ;
407
412
this .schedulerStatsTracker = schedulerStatsTracker ;
408
413
409
414
for (Entry <PlanNodeId , Split > entry : requireNonNull (initialSplits , "initialSplits is null" ).entries ()) {
@@ -983,20 +988,22 @@ private void sendUpdate()
983
988
byte [] taskUpdateRequestJson = taskUpdateRequestCodec .toBytes (updateRequest );
984
989
schedulerStatsTracker .recordTaskUpdateSerializedCpuTime (THREAD_MX_BEAN .getCurrentThreadCpuTime () - serializeStartCpuTimeNanos );
985
990
986
- taskUpdateRequestSize .add (taskUpdateRequestJson .length );
987
-
988
991
if (taskUpdateRequestJson .length > maxTaskUpdateSizeInBytes ) {
989
992
failTask (new PrestoException (EXCEEDED_TASK_UPDATE_SIZE_LIMIT , getExceededTaskUpdateSizeMessage (taskUpdateRequestJson )));
990
993
return ;
991
994
}
992
995
993
- if (fragment .isPresent ()) {
994
- stats .updateWithPlanSize (taskUpdateRequestJson .length );
995
- }
996
- else {
997
- if (ThreadLocalRandom .current ().nextDouble () < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE ) {
998
- // This is to keep track of the task update size even when the plan fragment is NOT present
999
- stats .updateWithoutPlanSize (taskUpdateRequestJson .length );
996
+ if (taskUpdateSizeTrackingEnabled ) {
997
+ taskUpdateRequestSize .add (taskUpdateRequestJson .length );
998
+
999
+ if (fragment .isPresent ()) {
1000
+ stats .updateWithPlanSize (taskUpdateRequestJson .length );
1001
+ }
1002
+ else {
1003
+ if (ThreadLocalRandom .current ().nextDouble () < UPDATE_WITHOUT_PLAN_STATS_SAMPLE_RATE ) {
1004
+ // This is to keep track of the task update size even when the plan fragment is NOT present
1005
+ stats .updateWithoutPlanSize (taskUpdateRequestJson .length );
1006
+ }
1000
1007
}
1001
1008
}
1002
1009
0 commit comments