Skip to content

Commit 0711cfa

Browse files
shangm2NikhilCollooru
authored andcommitted
Convert DataSize for other files in presto-main
1 parent 369deb7 commit 0711cfa

13 files changed

+129
-138
lines changed

presto-main/src/main/java/com/facebook/presto/dispatcher/FailedDispatchQuery.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
2424
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
2525
import com.google.common.util.concurrent.ListenableFuture;
26-
import io.airlift.units.DataSize;
2726
import io.airlift.units.Duration;
2827
import org.joda.time.DateTime;
2928

@@ -34,7 +33,6 @@
3433
import static com.facebook.presto.execution.QueryState.FAILED;
3534
import static com.facebook.presto.server.BasicQueryInfo.immediateFailureQueryInfo;
3635
import static com.google.common.util.concurrent.Futures.immediateFuture;
37-
import static io.airlift.units.DataSize.Unit.BYTE;
3836
import static java.util.Objects.requireNonNull;
3937
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4038

@@ -178,15 +176,15 @@ public Duration getTotalCpuTime()
178176
}
179177

180178
@Override
181-
public DataSize getTotalMemoryReservation()
179+
public long getTotalMemoryReservationInBytes()
182180
{
183-
return new DataSize(0, BYTE);
181+
return 0L;
184182
}
185183

186184
@Override
187-
public DataSize getUserMemoryReservation()
185+
public long getUserMemoryReservationInBytes()
188186
{
189-
return new DataSize(0, BYTE);
187+
return 0L;
190188
}
191189

192190
@Override

presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.common.util.concurrent.Futures;
3535
import com.google.common.util.concurrent.ListenableFuture;
3636
import com.google.common.util.concurrent.SettableFuture;
37-
import io.airlift.units.DataSize;
3837
import io.airlift.units.Duration;
3938
import org.joda.time.DateTime;
4039

@@ -53,7 +52,6 @@
5352
import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
5453
import static com.facebook.presto.util.Failures.toFailure;
5554
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
56-
import static io.airlift.units.DataSize.Unit.BYTE;
5755
import static java.util.Objects.requireNonNull;
5856
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5957

@@ -303,19 +301,19 @@ public Duration getTotalCpuTime()
303301
}
304302

305303
@Override
306-
public DataSize getTotalMemoryReservation()
304+
public long getTotalMemoryReservationInBytes()
307305
{
308306
return tryGetQueryExecution()
309-
.map(QueryExecution::getTotalMemoryReservation)
310-
.orElseGet(() -> new DataSize(0, BYTE));
307+
.map(QueryExecution::getTotalMemoryReservationInBytes)
308+
.orElse(0L);
311309
}
312310

313311
@Override
314-
public DataSize getUserMemoryReservation()
312+
public long getUserMemoryReservationInBytes()
315313
{
316314
return tryGetQueryExecution()
317-
.map(QueryExecution::getUserMemoryReservation)
318-
.orElseGet(() -> new DataSize(0, BYTE));
315+
.map(QueryExecution::getUserMemoryReservationInBytes)
316+
.orElse(0L);
319317
}
320318

321319
public int getRunningTaskCount()

presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java

+29-28
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import static com.google.common.base.Preconditions.checkArgument;
9595
import static com.google.common.collect.ImmutableList.toImmutableList;
9696
import static com.google.common.collect.ImmutableMap.toImmutableMap;
97+
import static io.airlift.units.DataSize.succinctBytes;
9798
import static java.lang.Double.NaN;
9899
import static java.lang.Math.max;
99100
import static java.lang.Math.toIntExact;
@@ -376,31 +377,31 @@ private List<OperatorStatistics> createOperatorStatistics(QueryInfo queryInfo)
376377
operatorSummary.getAddInputCalls(),
377378
operatorSummary.getAddInputWall(),
378379
operatorSummary.getAddInputCpu(),
379-
operatorSummary.getAddInputAllocation(),
380-
operatorSummary.getRawInputDataSize(),
380+
succinctBytes(operatorSummary.getAddInputAllocationInBytes()),
381+
succinctBytes(operatorSummary.getRawInputDataSizeInBytes()),
381382
operatorSummary.getRawInputPositions(),
382-
operatorSummary.getInputDataSize(),
383+
succinctBytes(operatorSummary.getInputDataSizeInBytes()),
383384
operatorSummary.getInputPositions(),
384385
operatorSummary.getSumSquaredInputPositions(),
385386
operatorSummary.getGetOutputCalls(),
386387
operatorSummary.getGetOutputWall(),
387388
operatorSummary.getGetOutputCpu(),
388-
operatorSummary.getGetOutputAllocation(),
389-
operatorSummary.getOutputDataSize(),
389+
succinctBytes(operatorSummary.getGetOutputAllocationInBytes()),
390+
succinctBytes(operatorSummary.getOutputDataSizeInBytes()),
390391
operatorSummary.getOutputPositions(),
391-
operatorSummary.getPhysicalWrittenDataSize(),
392+
succinctBytes(operatorSummary.getPhysicalWrittenDataSizeInBytes()),
392393
operatorSummary.getBlockedWall(),
393394
operatorSummary.getFinishCalls(),
394395
operatorSummary.getFinishWall(),
395396
operatorSummary.getFinishCpu(),
396-
operatorSummary.getFinishAllocation(),
397-
operatorSummary.getUserMemoryReservation(),
398-
operatorSummary.getRevocableMemoryReservation(),
399-
operatorSummary.getSystemMemoryReservation(),
400-
operatorSummary.getPeakUserMemoryReservation(),
401-
operatorSummary.getPeakSystemMemoryReservation(),
402-
operatorSummary.getPeakTotalMemoryReservation(),
403-
operatorSummary.getSpilledDataSize(),
397+
succinctBytes(operatorSummary.getFinishAllocationInBytes()),
398+
succinctBytes(operatorSummary.getUserMemoryReservationInBytes()),
399+
succinctBytes(operatorSummary.getRevocableMemoryReservationInBytes()),
400+
succinctBytes(operatorSummary.getSystemMemoryReservationInBytes()),
401+
succinctBytes(operatorSummary.getPeakUserMemoryReservationInBytes()),
402+
succinctBytes(operatorSummary.getPeakSystemMemoryReservationInBytes()),
403+
succinctBytes(operatorSummary.getPeakTotalMemoryReservationInBytes()),
404+
succinctBytes(operatorSummary.getSpilledDataSizeInBytes()),
404405
Optional.ofNullable(operatorSummary.getInfo()).map(operatorInfoCodec::toJson),
405406
operatorSummary.getRuntimeStats(),
406407
getPlanNodeEstimateOutputSize(operatorSummary.getPlanNodeId(), estimateMap, planNodeIdMap),
@@ -435,21 +436,21 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
435436
Optional.of(ofMillis(queryStats.getAnalysisTime().toMillis())),
436437
ofMillis(queryStats.getExecutionTime().toMillis()),
437438
queryStats.getPeakRunningTasks(),
438-
queryStats.getPeakUserMemoryReservation().toBytes(),
439-
queryStats.getPeakTotalMemoryReservation().toBytes(),
440-
queryStats.getPeakTaskUserMemory().toBytes(),
441-
queryStats.getPeakTaskTotalMemory().toBytes(),
442-
queryStats.getPeakNodeTotalMemory().toBytes(),
443-
queryStats.getShuffledDataSize().toBytes(),
439+
queryStats.getPeakUserMemoryReservationInBytes(),
440+
queryStats.getPeakTotalMemoryReservationInBytes(),
441+
queryStats.getPeakTaskUserMemoryInBytes(),
442+
queryStats.getPeakTaskTotalMemoryInBytes(),
443+
queryStats.getPeakNodeTotalMemoryInBytes(),
444+
queryStats.getShuffledDataSizeInBytes(),
444445
queryStats.getShuffledPositions(),
445-
queryStats.getRawInputDataSize().toBytes(),
446+
queryStats.getRawInputDataSizeInBytes(),
446447
queryStats.getRawInputPositions(),
447-
queryStats.getOutputDataSize().toBytes(),
448+
queryStats.getOutputDataSizeInBytes(),
448449
queryStats.getOutputPositions(),
449-
queryStats.getWrittenOutputLogicalDataSize().toBytes(),
450+
queryStats.getWrittenOutputLogicalDataSizeInBytes(),
450451
queryStats.getWrittenOutputPositions(),
451-
queryStats.getWrittenIntermediatePhysicalDataSize().toBytes(),
452-
queryStats.getSpilledDataSize().toBytes(),
452+
queryStats.getWrittenIntermediatePhysicalDataSizeInBytes(),
453+
queryStats.getSpilledDataSizeInBytes(),
453454
queryStats.getCumulativeUserMemory(),
454455
queryStats.getCumulativeTotalMemory(),
455456
queryStats.getCompletedDrivers(),
@@ -807,9 +808,9 @@ private static void computeStageStatistics(
807808
executionInfo.getStats().getTotalCpuTime(),
808809
executionInfo.getStats().getRetriedCpuTime(),
809810
executionInfo.getStats().getTotalBlockedTime(),
810-
executionInfo.getStats().getRawInputDataSize(),
811-
executionInfo.getStats().getProcessedInputDataSize(),
812-
executionInfo.getStats().getPhysicalWrittenDataSize(),
811+
succinctBytes(executionInfo.getStats().getRawInputDataSizeInBytes()),
812+
succinctBytes(executionInfo.getStats().getProcessedInputDataSizeInBytes()),
813+
succinctBytes(executionInfo.getStats().getPhysicalWrittenDataSizeInBytes()),
813814
executionInfo.getStats().getGcInfo(),
814815
createResourceDistribution(cpuDistribution.snapshot()),
815816
createResourceDistribution(memoryDistribution.snapshot())));

presto-main/src/main/java/com/facebook/presto/event/SplitMonitor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private void splitCompletedEvent(TaskId taskId, DriverStats driverStats, @Nullab
8989
ofMillis(driverStats.getQueuedTime().toMillis()),
9090
ofMillis(driverStats.getRawInputReadTime().toMillis()),
9191
driverStats.getRawInputPositions(),
92-
driverStats.getRawInputDataSize().toBytes(),
92+
driverStats.getRawInputDataSizeInBytes(),
9393
timeToStart,
9494
timeToEnd),
9595
splitFailureMetadata,

presto-main/src/main/java/com/facebook/presto/memory/ClusterMemoryManager.java

+25-25
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ public class ClusterMemoryManager
118118
private final MBeanExporter exporter;
119119
private final Codec<MemoryInfo> memoryInfoCodec;
120120
private final Codec<MemoryPoolAssignmentsRequest> assignmentsRequestCodec;
121-
private final DataSize maxQueryMemory;
122-
private final DataSize maxQueryTotalMemory;
121+
private final long maxQueryMemoryInBytes;
122+
private final long maxQueryTotalMemoryInBytes;
123123
private final boolean enabled;
124124
private final LowMemoryKiller lowMemoryKiller;
125125
private final Duration killOnOutOfMemoryDelay;
@@ -177,8 +177,8 @@ public ClusterMemoryManager(
177177
this.httpClient = requireNonNull(httpClient, "httpClient is null");
178178
this.exporter = requireNonNull(exporter, "exporter is null");
179179
this.lowMemoryKiller = requireNonNull(lowMemoryKiller, "lowMemoryKiller is null");
180-
this.maxQueryMemory = config.getMaxQueryMemory();
181-
this.maxQueryTotalMemory = config.getMaxQueryTotalMemory();
180+
this.maxQueryMemoryInBytes = config.getMaxQueryMemory().toBytes();
181+
this.maxQueryTotalMemoryInBytes = config.getMaxQueryTotalMemory().toBytes();
182182
this.coordinatorId = queryIdGenerator.getCoordinatorId();
183183
this.enabled = serverConfig.isCoordinator();
184184
this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();
@@ -193,11 +193,11 @@ public ClusterMemoryManager(
193193
this.assignmentsRequestCodec = requireNonNull(assignmentsRequestJsonCodec, "assignmentsRequestJsonCodec is null");
194194
}
195195

196-
verify(maxQueryMemory.toBytes() <= maxQueryTotalMemory.toBytes(),
196+
verify(maxQueryMemoryInBytes <= maxQueryTotalMemoryInBytes,
197197
"maxQueryMemory cannot be greater than maxQueryTotalMemory");
198-
verify(config.getSoftMaxQueryMemory().toBytes() <= maxQueryMemory.toBytes(),
198+
verify(config.getSoftMaxQueryMemory().toBytes() <= maxQueryMemoryInBytes,
199199
"Soft max query memory cannot be greater than hard limit");
200-
verify(config.getSoftMaxQueryTotalMemory().toBytes() <= maxQueryTotalMemory.toBytes(),
200+
verify(config.getSoftMaxQueryTotalMemory().toBytes() <= maxQueryTotalMemoryInBytes,
201201
"Soft max query total memory cannot be greater than hard limit");
202202

203203
this.pools = createClusterMemoryPools(nodeMemoryConfig.isReservedPoolEnabled());
@@ -253,8 +253,8 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
253253
long totalMemoryBytes = 0L;
254254
for (QueryExecution query : runningQueries) {
255255
boolean resourceOvercommit = resourceOvercommit(query.getSession());
256-
long userMemoryReservation = query.getUserMemoryReservation().toBytes();
257-
long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
256+
long userMemoryReservation = query.getUserMemoryReservationInBytes();
257+
long totalMemoryReservation = query.getTotalMemoryReservationInBytes();
258258

259259
if (resourceOvercommit && outOfMemory) {
260260
// If a query has requested resource overcommit, only kill it if the cluster has run out of memory
@@ -265,20 +265,20 @@ public synchronized void process(Iterable<QueryExecution> runningQueries)
265265
}
266266

267267
if (!resourceOvercommit) {
268-
long userMemoryLimit = min(maxQueryMemory.toBytes(), getQueryMaxMemory(query.getSession()).toBytes());
268+
long userMemoryLimit = min(maxQueryMemoryInBytes, getQueryMaxMemory(query.getSession()).toBytes());
269269
if (userMemoryReservation > userMemoryLimit) {
270270
query.fail(exceededGlobalUserLimit(succinctBytes(userMemoryLimit)));
271271
queryKilled = true;
272272
}
273-
QueryLimit<DataSize> queryTotalMemoryLimit = getMinimum(
274-
createDataSizeLimit(maxQueryTotalMemory, SYSTEM),
273+
QueryLimit<Long> queryTotalMemoryLimit = getMinimum(
274+
createDataSizeLimit(maxQueryTotalMemoryInBytes, SYSTEM),
275275
query.getResourceGroupQueryLimits()
276276
.flatMap(ResourceGroupQueryLimits::getTotalMemoryLimit)
277-
.map(rgLimit -> createDataSizeLimit(rgLimit, RESOURCE_GROUP))
277+
.map(rgLimit -> createDataSizeLimit(rgLimit.toBytes(), RESOURCE_GROUP))
278278
.orElse(null),
279-
createDataSizeLimit(getQueryMaxTotalMemory(query.getSession()), QUERY));
280-
if (totalMemoryReservation > queryTotalMemoryLimit.getLimit().toBytes()) {
281-
query.fail(exceededGlobalTotalLimit(queryTotalMemoryLimit.getLimit(), queryTotalMemoryLimit.getLimitSource().name()));
279+
createDataSizeLimit(getQueryMaxTotalMemory(query.getSession()).toBytes(), QUERY));
280+
if (totalMemoryReservation > queryTotalMemoryLimit.getLimit()) {
281+
query.fail(exceededGlobalTotalLimit(succinctBytes(queryTotalMemoryLimit.getLimit()), queryTotalMemoryLimit.getLimitSource().name()));
282282
queryKilled = true;
283283
}
284284
}
@@ -342,17 +342,17 @@ public synchronized void checkForLeaks(Supplier<List<BasicQueryInfo>> allQueryIn
342342
if (memoryManagerService.isPresent()) {
343343
// We are in the multi-coordinator codepath, and thus care about the globally running queries
344344
allRunningQueries = getClusterInfo(GENERAL_POOL)
345-
.getRunningQueries()
346-
.orElse(ImmutableList.of())
347-
.stream().collect(toImmutableMap(identity(), t -> Optional.empty()));
345+
.getRunningQueries()
346+
.orElse(ImmutableList.of())
347+
.stream().collect(toImmutableMap(identity(), t -> Optional.empty()));
348348
}
349349
else {
350350
// We are in the single coordinator setup, and thus care about the local queries. Ie, global queries
351351
// does not make sense.
352352
allRunningQueries = Maps.uniqueIndex(
353353
allQueryInfoSupplier.get().stream()
354-
.map(Optional::of)
355-
.iterator(),
354+
.map(Optional::of)
355+
.iterator(),
356356
queryInfo -> queryInfo.get().getQueryId());
357357
}
358358
memoryLeakDetector.checkForMemoryLeaks(allRunningQueries, pools.get(GENERAL_POOL).getQueryMemoryReservations());
@@ -497,8 +497,8 @@ private QueryExecution findLargestMemoryQuery(ClusterMemoryPoolInfo generalPool,
497497
// If present, this means the resource manager is determining the largest query, so do not make this determination locally
498498
if (memoryManagerService.isPresent()) {
499499
return largestMemoryQuery.flatMap(largestMemoryQueryId -> Streams.stream(queries)
500-
.filter(query -> query.getQueryId().equals(largestMemoryQueryId))
501-
.findFirst())
500+
.filter(query -> query.getQueryId().equals(largestMemoryQueryId))
501+
.findFirst())
502502
.orElse(null);
503503
}
504504
for (QueryExecution queryExecution : queries) {
@@ -519,12 +519,12 @@ private QueryExecution findLargestMemoryQuery(ClusterMemoryPoolInfo generalPool,
519519

520520
private QueryMemoryInfo createQueryMemoryInfo(QueryExecution query)
521521
{
522-
return new QueryMemoryInfo(query.getQueryId(), query.getMemoryPool().getId(), query.getTotalMemoryReservation().toBytes());
522+
return new QueryMemoryInfo(query.getQueryId(), query.getMemoryPool().getId(), query.getTotalMemoryReservationInBytes());
523523
}
524524

525525
private long getQueryMemoryReservation(QueryExecution query)
526526
{
527-
return query.getTotalMemoryReservation().toBytes();
527+
return query.getTotalMemoryReservationInBytes();
528528
}
529529

530530
private synchronized boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries)

presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void onTimeout(AsyncEvent event)
212212
}
213213
});
214214

215-
ListenableFuture<BufferResult> bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSize);
215+
ListenableFuture<BufferResult> bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSize.toBytes());
216216
bufferResultFuture = addTimeout(
217217
bufferResultFuture,
218218
() -> BufferResult.emptyResults(

0 commit comments

Comments
 (0)