Skip to content

Commit

Permalink
Fix Disagg Coordinator task limit enforcement
Browse files Browse the repository at this point in the history
For Disagg Coodinator, each coordinator is doing it's own task limit enforcement and ended up
runnig lot more tasks on the cluster than configured. With this change, fixing the behavior so
each coordinator get global running task count and then enforce the limit and kill queries which
using tasks larger that configured limit.
  • Loading branch information
swapsmagic authored and tdcmeehan committed Dec 8, 2022
1 parent 8ba44be commit 2744f0e
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.execution.warnings.WarningCollectorFactory;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ClusterStatusSender;
import com.facebook.presto.security.AccessControl;
import com.facebook.presto.server.BasicQueryInfo;
Expand Down Expand Up @@ -127,7 +128,8 @@ public DispatchManager(
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
ClusterStatusSender clusterStatusSender,
SecurityConfig securityConfig)
SecurityConfig securityConfig,
Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
{
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
this.analyzerProvider = requireNonNull(analyzerProvider, "analyzerClient is null");
Expand All @@ -147,7 +149,7 @@ public DispatchManager(

this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null");

this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor());
this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService);

this.securityConfig = requireNonNull(securityConfig, "securityConfig is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
Expand Down Expand Up @@ -81,7 +82,9 @@ public class QueryTracker<T extends TrackedQuery>
@GuardedBy("this")
private ScheduledFuture<?> backgroundTask;

public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor)
private final Optional<ClusterQueryTrackerService> clusterQueryTrackerService;

public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor, Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
{
requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge();
Expand All @@ -91,6 +94,7 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ
this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount();

this.queryManagementExecutor = requireNonNull(queryManagementExecutor, "queryManagementExecutor is null");
this.clusterQueryTrackerService = clusterQueryTrackerService;
}

public synchronized void start()
Expand Down Expand Up @@ -280,6 +284,10 @@ void enforceTaskLimits()
}
}

if (clusterQueryTrackerService.isPresent()) {
totalRunningTaskCount = clusterQueryTrackerService.get().getRunningTaskCount();
}

runningTaskCount.set(totalRunningTaskCount);
int runningTaskCountAfterKills = totalRunningTaskCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.warnings.WarningCollectorFactory;
import com.facebook.presto.memory.ClusterMemoryManager;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -97,7 +99,14 @@ public class SqlQueryManager
private final HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker;

@Inject
public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMonitor, EmbedVersion embedVersion, QueryManagerConfig queryManagerConfig, WarningCollectorFactory warningCollectorFactory, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager)
public SqlQueryManager(
ClusterMemoryManager memoryManager,
QueryMonitor queryMonitor,
EmbedVersion embedVersion,
QueryManagerConfig queryManagerConfig,
WarningCollectorFactory warningCollectorFactory,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
{
this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
Expand All @@ -111,7 +120,7 @@ public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMon
this.queryManagementExecutor = Executors.newScheduledThreadPool(queryManagerConfig.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s"));
this.queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryManagementExecutor);

this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor);
this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor, clusterQueryTrackerService);
requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null");
this.historyBasedPlanStatisticsTracker = historyBasedPlanStatisticsManager.getHistoryBasedPlanStatisticsTracker();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import com.facebook.drift.client.DriftClient;
import com.facebook.presto.util.PeriodicTaskExecutor;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Objects.requireNonNull;

public class ClusterQueryTrackerService
{
private final DriftClient<ResourceManagerClient> resourceManagerClient;
private final ScheduledExecutorService executorService;
private final long runningTaskCountFetchIntervalMillis;
private AtomicInteger runningTaskCount;
private final PeriodicTaskExecutor runningTaskCountUpdater;

@Inject
public ClusterQueryTrackerService(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
@ForResourceManager ScheduledExecutorService executorService,
ResourceManagerConfig resourceManagerConfig)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null");
this.executorService = requireNonNull(executorService, "executorService is null");
this.runningTaskCountFetchIntervalMillis = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getRunningTaskCountFetchInterval().toMillis();
this.runningTaskCount = new AtomicInteger(0);
this.runningTaskCountUpdater = new PeriodicTaskExecutor(runningTaskCountFetchIntervalMillis, executorService, () -> updateRunningTaskCount());
}

@PostConstruct
public void init()
{
runningTaskCountUpdater.start();
}

@PreDestroy
public void stop()
{
runningTaskCountUpdater.stop();
}

public int getRunningTaskCount()
{
return runningTaskCount.get();
}

private void updateRunningTaskCount()
{
this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ List<ResourceGroupRuntimeInfo> getResourceGroupInfo(String excludingNode)

@ThriftMethod
void resourceGroupRuntimeHeartbeat(String node, List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo);

@ThriftMethod
int getRunningTaskCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.BasicQueryStats;
import com.facebook.presto.server.NodeStatus;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.resourceOvercommit;
Expand Down Expand Up @@ -261,6 +263,18 @@ public List<BasicQueryInfo> getClusterQueries()
.collect(toImmutableList());
}

public int getRunningTaskCount()
{
int runningTaskCount = nodeQueryStates.values().stream()
.map(CoordinatorQueriesState::getActiveQueries)
.flatMap(Collection::stream)
.map(Query::getBasicQueryInfo)
.filter(q -> q.getState() == RUNNING)
.map(BasicQueryInfo::getQueryStats)
.collect(Collectors.summingInt(BasicQueryStats::getRunningTasks));
return runningTaskCount;
}

public Map<MemoryPoolId, ClusterMemoryPoolInfo> getClusterMemoryPoolInfo()
{
return clusterMemoryPoolInfosSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ResourceManagerConfig
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);

private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS);

@MinDuration("1ms")
public Duration getQueryExpirationTimeout()
{
Expand Down Expand Up @@ -251,4 +253,15 @@ public ResourceManagerConfig setResourceGroupServiceCacheRefreshInterval(Duratio
this.resourceGroupServiceCacheRefreshInterval = resourceGroupServiceCacheRefreshInterval;
return this;
}

public Duration getRunningTaskCountFetchInterval()
{
return runningTaskCountFetchInterval;
}
@Config("resource-manager.running-task-count-fetch-interval")
public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTaskCountFetchInterval)
{
this.runningTaskCountFetchInterval = runningTaskCountFetchInterval;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,10 @@ public void resourceGroupRuntimeHeartbeat(String node, List<ResourceGroupRuntime
{
executor.execute(() -> clusterStateProvider.registerResourceGroupRuntimeHeartbeat(node, resourceGroupRuntimeInfos));
}

@ThriftMethod
public ListenableFuture<Integer> getRunningTaskCount()
{
return executor.submit(clusterStateProvider::getRunningTaskCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import com.facebook.presto.operator.TaskMemoryReservationSummary;
import com.facebook.presto.operator.index.IndexJoinLookupStats;
import com.facebook.presto.resourcemanager.ClusterMemoryManagerService;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ClusterStatusSender;
import com.facebook.presto.resourcemanager.ForResourceManager;
import com.facebook.presto.resourcemanager.NoopResourceGroupService;
Expand Down Expand Up @@ -397,6 +398,7 @@ else if (serverConfig.isCoordinator()) {
addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(RandomCatalogServerAddressSelector.class));

newOptionalBinder(binder, ClusterMemoryManagerService.class);
newOptionalBinder(binder, ClusterQueryTrackerService.class);
install(installModuleIf(
ServerConfig.class,
ServerConfig::isResourceManagerEnabled,
Expand All @@ -409,6 +411,7 @@ public void configure(Binder moduleBinder)
moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON);
if (serverConfig.isCoordinator()) {
moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON);
moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON);
moduleBinder.bind(ResourceGroupService.class).to(ResourceManagerResourceGroupService.class).in(Scopes.SINGLETON);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
*/
package com.facebook.presto.execution;

import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ResourceManagerClient;
import com.facebook.presto.resourcemanager.ResourceManagerConfig;
import com.facebook.presto.resourcemanager.TestingClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.TestingResourceManagerClient;
import com.facebook.presto.spi.PrestoException;
import org.testng.annotations.Test;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
Expand All @@ -34,7 +40,7 @@ public void testMultipleQueriesKilledDueToTaskCount()
.setMaxTotalRunningTaskCountToKillQuery(200);
ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor();
try {
QueryTracker<MockQueryExecution> queryTracker = new QueryTracker<>(config, scheduledExecutorService);
QueryTracker<MockQueryExecution> queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.empty());
MockQueryExecution smallQuery1 = MockQueryExecution.withRunningTaskCount(50);
MockQueryExecution largeQueryButNotKilled = MockQueryExecution.withRunningTaskCount(101);
MockQueryExecution largeQueryToBeKilled1 = MockQueryExecution.withRunningTaskCount(200);
Expand Down Expand Up @@ -64,4 +70,33 @@ public void testMultipleQueriesKilledDueToTaskCount()
scheduledExecutorService.shutdownNow();
}
}

@Test
public void testLargeQueryKilledDueToTaskCount_withClusterQueryTracker()
{
QueryManagerConfig config = new QueryManagerConfig()
.setMaxQueryRunningTaskCount(100)
.setMaxTotalRunningTaskCountToKillQuery(200);
ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor();
ResourceManagerClient resourceManagerClient = new TestingResourceManagerClient();
ClusterQueryTrackerService clusterQueryTrackerService = new TestingClusterQueryTrackerService((addressSelectionContext, headers) -> resourceManagerClient, newSingleThreadScheduledExecutor(), new ResourceManagerConfig(), 201);
try {
QueryTracker<MockQueryExecution> queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.of(clusterQueryTrackerService));
MockQueryExecution smallQuery = MockQueryExecution.withRunningTaskCount(50);
MockQueryExecution largeQueryToBeKilled = MockQueryExecution.withRunningTaskCount(101);

queryTracker.addQuery(smallQuery);
queryTracker.addQuery(largeQueryToBeKilled);

queryTracker.enforceTaskLimits();

assertFalse(smallQuery.getFailureReason().isPresent(), "small query should not be killed");
Throwable failureReason = largeQueryToBeKilled.getFailureReason().get();
assertTrue(failureReason instanceof PrestoException);
assertEquals(((PrestoException) failureReason).getErrorCode(), QUERY_HAS_TOO_MANY_STAGES.toErrorCode());
}
finally {
scheduledExecutorService.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,34 @@ public void testShuttingDownCoordinatorHeartbeat()
assertQueryInfos(provider.getClusterQueries(), 4, 3);
}

@Test
public void testRunningTaskCount()
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addShuttingDownNode(new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));

ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, new SessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), true, newSingleThreadScheduledExecutor());

assertEquals(provider.getRunningTaskCount(), 0);

long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;

provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++);
assertEquals(provider.getRunningTaskCount(), 0);

provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);

provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);

provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);
}

void assertWorkerMemoryInfo(ResourceManagerClusterStateProvider provider, int count)
{
Map<String, MemoryInfo> workerMemoryInfo = provider.getWorkerMemoryInfo();
Expand Down
Loading

0 comments on commit 2744f0e

Please sign in to comment.