Skip to content

Commit

Permalink
Fix leaked query count reporting in the disagg coordinators codepath
Browse files Browse the repository at this point in the history
Summary: Currently in disaggregated codepath, all running queries show up as leaked queries. After this change, the number of leaked queries reported by coordinators in the disaggregated codepath is at least eventually consistent: Due to the eventual consistence nature of the design, some amount of false positives of leaked queries are expected since the coordinator has a stale view of the running queries and the memory state across each worker.

Also made the leak check logging be WARN logging and pulled out the check to be every minute instead of it being every second.

Added logging of the leaked bytes in addition to the leaked queries.

Test plan - (Please fill in how you tested your changes)

Added unit tests.

Checked the metrics in a production environment, that the number of leaked queries is now usually one or two both in disaggregated and normal codepaths.
  • Loading branch information
agrawaldevesh authored and tdcmeehan committed Dec 21, 2022
1 parent 15788ac commit 017d65c
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ public void start()
log.error(e, "Error enforcing query output size limits");
}
}, 1, 1, TimeUnit.SECONDS);

// Pulling out the checking of memory leaks to happen at a coarser granularity since it's a bit
// expensive and does not need to happen as frequently as enforcement.
queryManagementExecutor.scheduleWithFixedDelay(() -> {
try {
checkForMemoryLeaks();
}
catch (Throwable e) {
log.error(e, "Error checking memory leaks");
}
}, 1, 1, TimeUnit.MINUTES);
}

@PreDestroy
Expand Down Expand Up @@ -365,7 +376,12 @@ private void enforceMemoryLimits()
List<QueryExecution> runningQueries = queryTracker.getAllQueries().stream()
.filter(query -> query.getState() == RUNNING)
.collect(toImmutableList());
memoryManager.process(runningQueries, this::getQueries);
memoryManager.process(runningQueries);
}

private void checkForMemoryLeaks()
{
memoryManager.checkForLeaks(this::getQueries);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.QueryId;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.airlift.units.DataSize;
import org.joda.time.DateTime;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;
import static org.joda.time.DateTime.now;
import static org.joda.time.Seconds.secondsBetween;

Expand All @@ -48,76 +45,46 @@ public class ClusterMemoryLeakDetector
@GuardedBy("this")
private Set<QueryId> leakedQueries;

/**
* @param queryInfoSupplier All queries that the coordinator knows about.
* @param queryMemoryReservations The memory reservations of queries in the GENERAL cluster memory pool.
*/
void checkForMemoryLeaks(Supplier<List<BasicQueryInfo>> queryInfoSupplier, Map<QueryId, Long> queryMemoryReservations)
{
requireNonNull(queryInfoSupplier);
requireNonNull(queryMemoryReservations);

Map<QueryId, BasicQueryInfo> queryIdToInfo = Maps.uniqueIndex(queryInfoSupplier.get(), BasicQueryInfo::getQueryId);

Map<QueryId, Long> leakedQueryReservations = queryMemoryReservations.entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.filter(entry -> isLeaked(queryIdToInfo, entry.getKey()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));

if (!leakedQueryReservations.isEmpty()) {
log.debug("Memory leak detected. The following queries are already finished, " +
"but they have memory reservations on some worker node(s): %s", leakedQueryReservations);
}

synchronized (this) {
leakedQueries = ImmutableSet.copyOf(leakedQueryReservations.keySet());
}
}
@GuardedBy("this")
private long leakedBytes;

/**
*
* @param runningQueriesSupplier All running queries on a cluster.
* @param queryIdToInfo All queries that the coordinator knows about, along with their optional query info.
* @param queryMemoryReservations The memory reservations of queries in the GENERAL cluster memory pool.
*/
void checkForClusterMemoryLeaks(Supplier<Optional<List<QueryId>>> runningQueriesSupplier, Map<QueryId, Long> queryMemoryReservations)
void checkForMemoryLeaks(Map<QueryId, Optional<BasicQueryInfo>> queryIdToInfo, Map<QueryId, Long> queryMemoryReservations)
{
requireNonNull(runningQueriesSupplier);
requireNonNull(queryMemoryReservations);

Optional<List<QueryId>> runningQueries = runningQueriesSupplier.get();

Map<QueryId, Long> leakedQueryReservations = queryMemoryReservations.entrySet()
.stream()
.filter(entry -> entry.getValue() > 0)
.filter(entry -> !runningQueries.isPresent() || !runningQueries.get().contains(entry.getKey()))
.filter(entry -> isLeaked(queryIdToInfo, entry.getKey()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));

long leakedBytesThisTime = leakedQueryReservations.values().stream().reduce(0L, Long::sum);
if (!leakedQueryReservations.isEmpty()) {
log.debug("Memory leak detected. The following queries are already finished, " +
"but they have memory reservations on some worker node(s): %s", leakedQueryReservations);
log.warn("Memory leak of %s detected. The following queries are already finished, " +
"but they have memory reservations on some worker node(s): %s",
DataSize.succinctBytes(leakedBytes), leakedQueryReservations);
}

synchronized (this) {
leakedQueries = ImmutableSet.copyOf(leakedQueryReservations.keySet());
leakedBytes = leakedBytesThisTime;
}
}

private static boolean isLeaked(Map<QueryId, BasicQueryInfo> queryIdToInfo, QueryId queryId)
private static boolean isLeaked(Map<QueryId, Optional<BasicQueryInfo>> queryIdToInfo, QueryId queryId)
{
BasicQueryInfo queryInfo = queryIdToInfo.get(queryId);
Optional<BasicQueryInfo> queryInfo = queryIdToInfo.get(queryId);

// if the query is not even found then it is definitely leaked
if (queryInfo == null) {
return true;
}

DateTime queryEndTime = queryInfo.getQueryStats().getEndTime();

if (queryInfo.getState() == RUNNING || queryEndTime == null) {
return false;
}
Optional<DateTime> queryEndTime = queryInfo.flatMap(qi -> Optional.ofNullable(qi.getState() == RUNNING ? null : qi.getQueryStats().getEndTime()));

return secondsBetween(queryEndTime, now()).getSeconds() >= DEFAULT_LEAK_CLAIM_DELTA_SEC;
return queryEndTime.map(ts -> secondsBetween(ts, now()).getSeconds() >= DEFAULT_LEAK_CLAIM_DELTA_SEC).orElse(false);
}

synchronized boolean wasQueryPossiblyLeaked(QueryId queryId)
Expand All @@ -129,4 +96,9 @@ synchronized int getNumberOfLeakedQueries()
{
return leakedQueries.size();
}

synchronized long getLeakedBytes()
{
return leakedBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -89,6 +90,7 @@
import static com.facebook.presto.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
Expand All @@ -99,6 +101,7 @@
import static java.util.AbstractMap.SimpleEntry;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static org.weakref.jmx.ObjectNames.generatedNameOf;

public class ClusterMemoryManager
Expand Down Expand Up @@ -234,21 +237,12 @@ public synchronized boolean memoryPoolExists(MemoryPoolId poolId)
return pools.containsKey(poolId);
}

public synchronized void process(Iterable<QueryExecution> runningQueries, Supplier<List<BasicQueryInfo>> allQueryInfoSupplier)
public synchronized void process(Iterable<QueryExecution> runningQueries)
{
if (!enabled) {
return;
}

// TODO revocable memory reservations can also leak and may need to be detected in the future
// We are only concerned about the leaks in general pool.
if (memoryManagerService.isPresent()) {
memoryLeakDetector.checkForClusterMemoryLeaks(pools.get(GENERAL_POOL).getClusterInfo()::getRunningQueries, pools.get(GENERAL_POOL).getQueryMemoryReservations());
}
else {
memoryLeakDetector.checkForMemoryLeaks(allQueryInfoSupplier, pools.get(GENERAL_POOL).getQueryMemoryReservations());
}

boolean outOfMemory = isClusterOutOfMemory();
if (!outOfMemory) {
lastTimeNotOutOfMemory = System.nanoTime();
Expand Down Expand Up @@ -338,6 +332,32 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
updateNodes(assignmentsRequest);
}

public synchronized void checkForLeaks(Supplier<List<BasicQueryInfo>> allQueryInfoSupplier)
{
if (!enabled) {
return;
}

Map<QueryId, Optional<BasicQueryInfo>> allRunningQueries;
if (memoryManagerService.isPresent()) {
// We are in the multi-coordinator codepath, and thus care about the globally running queries
allRunningQueries = getClusterInfo(GENERAL_POOL)
.getRunningQueries()
.orElse(ImmutableList.of())
.stream().collect(toImmutableMap(identity(), t -> Optional.empty()));
}
else {
// We are in the single coordinator setup, and thus care about the local queries. Ie, global queries
// does not make sense.
allRunningQueries = Maps.uniqueIndex(
allQueryInfoSupplier.get().stream()
.map(Optional::of)
.iterator(),
queryInfo -> queryInfo.get().getQueryId());
}
memoryLeakDetector.checkForMemoryLeaks(allRunningQueries, pools.get(GENERAL_POOL).getQueryMemoryReservations());
}

private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
{
List<QueryMemoryInfo> queryMemoryInfoList = Streams.stream(runningQueries)
Expand Down Expand Up @@ -637,6 +657,12 @@ public int getNumberOfLeakedQueries()
return memoryLeakDetector.getNumberOfLeakedQueries();
}

@Managed
public long getLeakedBytes()
{
return memoryLeakDetector.getLeakedBytes();
}

@Managed
public long getClusterUserMemoryReservation()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,27 @@ public void testLeakDetector()
QueryId testQuery = new QueryId("test");
ClusterMemoryLeakDetector leakDetector = new ClusterMemoryLeakDetector();

leakDetector.checkForMemoryLeaks(() -> ImmutableList.of(), ImmutableMap.of());
leakDetector.checkForMemoryLeaks(ImmutableMap.of(), ImmutableMap.of());
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report no leaked queries as the query is still running
leakDetector.checkForMemoryLeaks(() -> ImmutableList.of(createQueryInfo(testQuery.getId(), RUNNING)), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report exactly one leaked query since the query is finished, and its end time is way in the past
leakDetector.checkForMemoryLeaks(() -> ImmutableList.of(createQueryInfo(testQuery.getId(), FINISHED)), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 1);

// the leak detector should report no leaked queries as the query doesn't have any memory reservation
leakDetector.checkForMemoryLeaks(() -> ImmutableList.of(createQueryInfo(testQuery.getId(), FINISHED)), ImmutableMap.of(testQuery, 0L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report exactly one leaked query since the coordinator doesn't know of any query
leakDetector.checkForMemoryLeaks(() -> ImmutableList.of(), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 1);
}

@Test
public void testClusterLeakDetector()
{
QueryId testQuery = new QueryId("test");
ClusterMemoryLeakDetector leakDetector = new ClusterMemoryLeakDetector();

leakDetector.checkForClusterMemoryLeaks(() -> Optional.empty(), ImmutableMap.of());
// the leak detector should be okay with a missing basic query info and treat that as a running query
leakDetector.checkForMemoryLeaks(ImmutableMap.of(testQuery, Optional.empty()), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report no leaked queries as the query is still running
leakDetector.checkForClusterMemoryLeaks(() -> Optional.of(ImmutableList.of(testQuery)), ImmutableMap.of(testQuery, 1L));
leakDetector.checkForMemoryLeaks(ImmutableMap.of(testQuery, Optional.of(createQueryInfo(testQuery.getId(), RUNNING))), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report exactly one leaked query since the query is finished, and its end time is way in the past
leakDetector.checkForClusterMemoryLeaks(() -> Optional.empty(), ImmutableMap.of(testQuery, 1L));
leakDetector.checkForMemoryLeaks(ImmutableMap.of(testQuery, Optional.of(createQueryInfo(testQuery.getId(), FINISHED))), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 1);

// the leak detector should report no leaked queries as the query doesn't have any memory reservation
leakDetector.checkForClusterMemoryLeaks(() -> Optional.empty(), ImmutableMap.of(testQuery, 0L));
leakDetector.checkForMemoryLeaks(ImmutableMap.of(testQuery, Optional.of(createQueryInfo(testQuery.getId(), FINISHED))), ImmutableMap.of(testQuery, 0L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 0);

// the leak detector should report exactly one leaked query since the coordinator doesn't know of any query
leakDetector.checkForClusterMemoryLeaks(() -> Optional.empty(), ImmutableMap.of(testQuery, 1L));
leakDetector.checkForMemoryLeaks(ImmutableMap.of(), ImmutableMap.of(testQuery, 1L));
assertEquals(leakDetector.getNumberOfLeakedQueries(), 1);
}

Expand Down

0 comments on commit 017d65c

Please sign in to comment.