From 2744f0ee04fc17189ecf644258b29f36b7c6e7fa Mon Sep 17 00:00:00 2001 From: Swapnil Tailor Date: Tue, 29 Nov 2022 12:54:54 -0800 Subject: [PATCH] Fix Disagg Coordinator task limit enforcement For Disagg Coodinator, each coordinator is doing it's own task limit enforcement and ended up runnig lot more tasks on the cluster than configured. With this change, fixing the behavior so each coordinator get global running task count and then enforce the limit and kill queries which using tasks larger that configured limit. --- .../presto/dispatcher/DispatchManager.java | 6 +- .../presto/execution/QueryTracker.java | 10 ++- .../presto/execution/SqlQueryManager.java | 13 +++- .../ClusterQueryTrackerService.java | 70 +++++++++++++++++++ .../ResourceManagerClient.java | 3 + .../ResourceManagerClusterStateProvider.java | 14 ++++ .../ResourceManagerConfig.java | 13 ++++ .../ResourceManagerServer.java | 6 ++ .../presto/server/ServerMainModule.java | 3 + .../TestQueryTrackerHighTaskCountKill.java | 37 +++++++++- ...stResourceManagerClusterStateProvider.java | 28 ++++++++ .../TestResourceManagerConfig.java | 7 +- .../TestingClusterQueryTrackerService.java | 37 ++++++++++ .../TestingResourceManagerClient.java | 14 +++- 14 files changed, 252 insertions(+), 9 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java create mode 100644 presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java index b72b4f8fba20d..529fe1e2cae30 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java @@ -24,6 +24,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; import com.facebook.presto.execution.warnings.WarningCollectorFactory; import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; import com.facebook.presto.security.AccessControl; import com.facebook.presto.server.BasicQueryInfo; @@ -127,7 +128,8 @@ public DispatchManager( QueryManagerConfig queryManagerConfig, DispatchExecutor dispatchExecutor, ClusterStatusSender clusterStatusSender, - SecurityConfig securityConfig) + SecurityConfig securityConfig, + Optional clusterQueryTrackerService) { this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null"); this.analyzerProvider = requireNonNull(analyzerProvider, "analyzerClient is null"); @@ -147,7 +149,7 @@ public DispatchManager( this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null"); - this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor()); + this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService); this.securityConfig = requireNonNull(securityConfig, "securityConfig is null"); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java index 99e15c9bc3c1e..e37d5c684ef61 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -16,6 +16,7 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.Session; import com.facebook.presto.execution.QueryTracker.TrackedQuery; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits; @@ -81,7 +82,9 @@ public class QueryTracker @GuardedBy("this") private ScheduledFuture backgroundTask; - public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor) + private final Optional clusterQueryTrackerService; + + public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor, Optional clusterQueryTrackerService) { requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge(); @@ -91,6 +94,7 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount(); this.queryManagementExecutor = requireNonNull(queryManagementExecutor, "queryManagementExecutor is null"); + this.clusterQueryTrackerService = clusterQueryTrackerService; } public synchronized void start() @@ -280,6 +284,10 @@ void enforceTaskLimits() } } + if (clusterQueryTrackerService.isPresent()) { + totalRunningTaskCount = clusterQueryTrackerService.get().getRunningTaskCount(); + } + runningTaskCount.set(totalRunningTaskCount); int runningTaskCountAfterKills = totalRunningTaskCount; diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java index 3ffa23ea1deca..33a41186901c3 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java @@ -26,6 +26,7 @@ import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.warnings.WarningCollectorFactory; import com.facebook.presto.memory.ClusterMemoryManager; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; @@ -49,6 +50,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -97,7 +99,14 @@ public class SqlQueryManager private final HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker; @Inject - public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMonitor, EmbedVersion embedVersion, QueryManagerConfig queryManagerConfig, WarningCollectorFactory warningCollectorFactory, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager) + public SqlQueryManager( + ClusterMemoryManager memoryManager, + QueryMonitor queryMonitor, + EmbedVersion embedVersion, + QueryManagerConfig queryManagerConfig, + WarningCollectorFactory warningCollectorFactory, + HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, + Optional clusterQueryTrackerService) { this.memoryManager = requireNonNull(memoryManager, "memoryManager is null"); this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); @@ -111,7 +120,7 @@ public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMon this.queryManagementExecutor = Executors.newScheduledThreadPool(queryManagerConfig.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s")); this.queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryManagementExecutor); - this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor); + this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor, clusterQueryTrackerService); requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null"); this.historyBasedPlanStatisticsTracker = historyBasedPlanStatisticsManager.getHistoryBasedPlanStatisticsTracker(); } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java new file mode 100644 index 0000000000000..69650289b06dd --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.drift.client.DriftClient; +import com.facebook.presto.util.PeriodicTaskExecutor; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Objects.requireNonNull; + +public class ClusterQueryTrackerService +{ + private final DriftClient resourceManagerClient; + private final ScheduledExecutorService executorService; + private final long runningTaskCountFetchIntervalMillis; + private AtomicInteger runningTaskCount; + private final PeriodicTaskExecutor runningTaskCountUpdater; + + @Inject + public ClusterQueryTrackerService( + @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager ScheduledExecutorService executorService, + ResourceManagerConfig resourceManagerConfig) + { + this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null"); + this.executorService = requireNonNull(executorService, "executorService is null"); + this.runningTaskCountFetchIntervalMillis = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getRunningTaskCountFetchInterval().toMillis(); + this.runningTaskCount = new AtomicInteger(0); + this.runningTaskCountUpdater = new PeriodicTaskExecutor(runningTaskCountFetchIntervalMillis, executorService, () -> updateRunningTaskCount()); + } + + @PostConstruct + public void init() + { + runningTaskCountUpdater.start(); + } + + @PreDestroy + public void stop() + { + runningTaskCountUpdater.stop(); + } + + public int getRunningTaskCount() + { + return runningTaskCount.get(); + } + + private void updateRunningTaskCount() + { + this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount()); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java index 7d8092ec0794c..e0ada85b8187a 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java @@ -42,4 +42,7 @@ List getResourceGroupInfo(String excludingNode) @ThriftMethod void resourceGroupRuntimeHeartbeat(String node, List resourceGroupRuntimeInfo); + + @ThriftMethod + int getRunningTaskCount(); } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java index dedaab25868b5..39def5f4bb33e 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java @@ -21,6 +21,7 @@ import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.BasicQueryStats; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; @@ -261,6 +263,18 @@ public List getClusterQueries() .collect(toImmutableList()); } + public int getRunningTaskCount() + { + int runningTaskCount = nodeQueryStates.values().stream() + .map(CoordinatorQueriesState::getActiveQueries) + .flatMap(Collection::stream) + .map(Query::getBasicQueryInfo) + .filter(q -> q.getState() == RUNNING) + .map(BasicQueryInfo::getQueryStats) + .collect(Collectors.summingInt(BasicQueryStats::getRunningTasks)); + return runningTaskCount; + } + public Map getClusterMemoryPoolInfo() { return clusterMemoryPoolInfosSupplier.get(); diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java index 5264617e0f4ae..299afb5e0faab 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java @@ -44,6 +44,8 @@ public class ResourceManagerConfig private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS); private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS); + private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS); + @MinDuration("1ms") public Duration getQueryExpirationTimeout() { @@ -251,4 +253,15 @@ public ResourceManagerConfig setResourceGroupServiceCacheRefreshInterval(Duratio this.resourceGroupServiceCacheRefreshInterval = resourceGroupServiceCacheRefreshInterval; return this; } + + public Duration getRunningTaskCountFetchInterval() + { + return runningTaskCountFetchInterval; + } + @Config("resource-manager.running-task-count-fetch-interval") + public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTaskCountFetchInterval) + { + this.runningTaskCountFetchInterval = runningTaskCountFetchInterval; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java index 5f8a29c3e3435..b79f1a2110521 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java @@ -87,4 +87,10 @@ public void resourceGroupRuntimeHeartbeat(String node, List clusterStateProvider.registerResourceGroupRuntimeHeartbeat(node, resourceGroupRuntimeInfos)); } + + @ThriftMethod + public ListenableFuture getRunningTaskCount() + { + return executor.submit(clusterStateProvider::getRunningTaskCount); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index bd32714b71b1d..d23bf31d4d8aa 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -121,6 +121,7 @@ import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.index.IndexJoinLookupStats; import com.facebook.presto.resourcemanager.ClusterMemoryManagerService; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; import com.facebook.presto.resourcemanager.ForResourceManager; import com.facebook.presto.resourcemanager.NoopResourceGroupService; @@ -397,6 +398,7 @@ else if (serverConfig.isCoordinator()) { addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(RandomCatalogServerAddressSelector.class)); newOptionalBinder(binder, ClusterMemoryManagerService.class); + newOptionalBinder(binder, ClusterQueryTrackerService.class); install(installModuleIf( ServerConfig.class, ServerConfig::isResourceManagerEnabled, @@ -409,6 +411,7 @@ public void configure(Binder moduleBinder) moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON); if (serverConfig.isCoordinator()) { moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON); + moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON); moduleBinder.bind(ResourceGroupService.class).to(ResourceManagerResourceGroupService.class).in(Scopes.SINGLETON); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java index 77fe8115adb99..60f003d53e6d0 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java @@ -13,9 +13,15 @@ */ package com.facebook.presto.execution; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; +import com.facebook.presto.resourcemanager.ResourceManagerClient; +import com.facebook.presto.resourcemanager.ResourceManagerConfig; +import com.facebook.presto.resourcemanager.TestingClusterQueryTrackerService; +import com.facebook.presto.resourcemanager.TestingResourceManagerClient; import com.facebook.presto.spi.PrestoException; import org.testng.annotations.Test; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; @@ -34,7 +40,7 @@ public void testMultipleQueriesKilledDueToTaskCount() .setMaxTotalRunningTaskCountToKillQuery(200); ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(); try { - QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService); + QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.empty()); MockQueryExecution smallQuery1 = MockQueryExecution.withRunningTaskCount(50); MockQueryExecution largeQueryButNotKilled = MockQueryExecution.withRunningTaskCount(101); MockQueryExecution largeQueryToBeKilled1 = MockQueryExecution.withRunningTaskCount(200); @@ -64,4 +70,33 @@ public void testMultipleQueriesKilledDueToTaskCount() scheduledExecutorService.shutdownNow(); } } + + @Test + public void testLargeQueryKilledDueToTaskCount_withClusterQueryTracker() + { + QueryManagerConfig config = new QueryManagerConfig() + .setMaxQueryRunningTaskCount(100) + .setMaxTotalRunningTaskCountToKillQuery(200); + ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(); + ResourceManagerClient resourceManagerClient = new TestingResourceManagerClient(); + ClusterQueryTrackerService clusterQueryTrackerService = new TestingClusterQueryTrackerService((addressSelectionContext, headers) -> resourceManagerClient, newSingleThreadScheduledExecutor(), new ResourceManagerConfig(), 201); + try { + QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.of(clusterQueryTrackerService)); + MockQueryExecution smallQuery = MockQueryExecution.withRunningTaskCount(50); + MockQueryExecution largeQueryToBeKilled = MockQueryExecution.withRunningTaskCount(101); + + queryTracker.addQuery(smallQuery); + queryTracker.addQuery(largeQueryToBeKilled); + + queryTracker.enforceTaskLimits(); + + assertFalse(smallQuery.getFailureReason().isPresent(), "small query should not be killed"); + Throwable failureReason = largeQueryToBeKilled.getFailureReason().get(); + assertTrue(failureReason instanceof PrestoException); + assertEquals(((PrestoException) failureReason).getErrorCode(), QUERY_HAS_TOO_MANY_STAGES.toErrorCode()); + } + finally { + scheduledExecutorService.shutdownNow(); + } + } } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java index 30200a3aadaa8..7b6933a9a1418 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java @@ -540,6 +540,34 @@ public void testShuttingDownCoordinatorHeartbeat() assertQueryInfos(provider.getClusterQueries(), 4, 3); } + @Test + public void testRunningTaskCount() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + nodeManager.addShuttingDownNode(new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true)); + + ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, new SessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), true, newSingleThreadScheduledExecutor()); + + assertEquals(provider.getRunningTaskCount(), 0); + + long query1Sequence = 0; + long query2Sequence = 0; + long query3Sequence = 0; + long query4Sequence = 0; + + provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++); + assertEquals(provider.getRunningTaskCount(), 0); + + provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + + provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + + provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + } + void assertWorkerMemoryInfo(ResourceManagerClusterStateProvider provider, int count) { Map workerMemoryInfo = provider.getWorkerMemoryInfo(); diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java index 60aad6b0ff240..b4d8c9ca34691 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java @@ -47,7 +47,8 @@ public void testDefaults() .setResourceGroupServiceCacheEnabled(false) .setResourceGroupServiceCacheExpireInterval(new Duration(10, SECONDS)) .setResourceGroupServiceCacheRefreshInterval(new Duration(1, SECONDS)) - .setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS))); + .setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS)) + .setRunningTaskCountFetchInterval(new Duration(1, SECONDS))); } @Test @@ -70,6 +71,7 @@ public void testExplicitPropertyMappings() .put("resource-manager.resource-group-service-cache-expire-interval", "1m") .put("resource-manager.resource-group-service-cache-refresh-interval", "10m") .put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m") + .put("resource-manager.running-task-count-fetch-interval", "1m") .build(); ResourceManagerConfig expected = new ResourceManagerConfig() @@ -88,7 +90,8 @@ public void testExplicitPropertyMappings() .setResourceGroupServiceCacheEnabled(true) .setResourceGroupServiceCacheExpireInterval(new Duration(1, MINUTES)) .setResourceGroupServiceCacheRefreshInterval(new Duration(10, MINUTES)) - .setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES)); + .setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES)) + .setRunningTaskCountFetchInterval(new Duration(1, MINUTES)); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java new file mode 100644 index 0000000000000..a58bf8b29bd9f --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.drift.client.DriftClient; + +import java.util.concurrent.ScheduledExecutorService; + +public class TestingClusterQueryTrackerService + extends ClusterQueryTrackerService +{ + DriftClient resourceManagerClient; + int runningTaskCount; + + public TestingClusterQueryTrackerService(DriftClient resourceManagerClient, ScheduledExecutorService executorService, ResourceManagerConfig resourceManagerConfig, int runningTaskCount) + { + super(resourceManagerClient, executorService, resourceManagerConfig); + this.runningTaskCount = runningTaskCount; + } + + @Override + public int getRunningTaskCount() + { + return runningTaskCount; + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java index 0ebe6f6c2b32f..282a499a6dbbd 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -class TestingResourceManagerClient +public class TestingResourceManagerClient implements ResourceManagerClient { private final AtomicInteger queryHeartbeats = new AtomicInteger(); @@ -36,6 +36,8 @@ class TestingResourceManagerClient private volatile List resourceGroupRuntimeInfos = ImmutableList.of(); + private int runningTaskCount; + @Override public void queryHeartbeat(String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId) { @@ -55,6 +57,11 @@ public void setResourceGroupRuntimeInfos(List resource this.resourceGroupRuntimeInfos = ImmutableList.copyOf(resourceGroupRuntimeInfos); } + public void setRunningTaskCount(int runningTaskCount) + { + this.runningTaskCount = runningTaskCount; + } + @Override public void nodeHeartbeat(NodeStatus nodeStatus) { @@ -92,4 +99,9 @@ public int getResourceGroupRuntimeHeartbeats() { return resourceGroupRuntimeHeartbeats.get(); } + + public int getRunningTaskCount() + { + return runningTaskCount; + } }