Skip to content

Commit

Permalink
Fix and enable flaky test TestQueryManager.testQueryCountMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ClarenceThreepwood authored and tdcmeehan committed Mar 27, 2024
1 parent 62dca4a commit 43a53de
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

class H2TestUtil
public class H2TestUtil
{
private static final String CONFIGURATION_MANAGER_TYPE = "h2";
public static final String TEST_ENVIRONMENT = "test_environment";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.TestingSessionContext;
import com.facebook.presto.plugin.blackhole.BlackHolePlugin;
import com.facebook.presto.resourceGroups.FileResourceGroupConfigurationManagerFactory;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.tests.tpch.TpchQueryRunnerBuilder;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -40,6 +40,7 @@
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.TestQueryRunnerUtil.createQuery;
import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getSimpleQueryRunner;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_POSITIONS_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_SIZE_LIMIT;
Expand All @@ -57,12 +58,16 @@
public class TestQueryManager
{
private DistributedQueryRunner queryRunner;
private static final String LONG_LASTING_QUERY = "SELECT COUNT(*) FROM blackhole.default.dummy";

@BeforeClass
public void setUp()
throws Exception
{
queryRunner = TpchQueryRunnerBuilder.builder().build();
queryRunner = getSimpleQueryRunner();
queryRunner.installPlugin(new BlackHolePlugin());
queryRunner.createCatalog("blackhole", "blackhole");
queryRunner.execute("CREATE TABLE blackhole.default.dummy (col BIGINT) WITH (split_count = 1, rows_per_page = 1, pages_per_split = 1, page_processing_delay = '10m')");
TestingPrestoServer server = queryRunner.getCoordinator();
server.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory());
server.getResourceGroupManager().get()
Expand Down Expand Up @@ -94,7 +99,7 @@ public void testFailQuery()
"slug",
0,
new TestingSessionContext(TEST_SESSION),
"SELECT * FROM lineitem")
LONG_LASTING_QUERY)
.get();

// wait until query starts running
Expand Down Expand Up @@ -139,7 +144,7 @@ public void testFailQueryPrerun()
"slug",
0,
new TestingSessionContext(TEST_SESSION),
"SELECT * FROM lineitem")
LONG_LASTING_QUERY)
.get();

//Wait for query to be in queued state
Expand Down Expand Up @@ -176,7 +181,7 @@ void createQueries(DispatchManager dispatchManager, int queryCount)
"slug",
0,
new TestingSessionContext(TEST_SESSION),
"SELECT * FROM lineitem")
LONG_LASTING_QUERY)
.get();
}
}
Expand All @@ -199,7 +204,7 @@ public void testQueryCpuLimit()
public void testQueryScanExceeded()
throws Exception
{
try (DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder().setSingleExtraProperty("query.max-scan-raw-input-bytes", "0B").build()) {
try (DistributedQueryRunner queryRunner = builder().setSingleExtraProperty("query.max-scan-raw-input-bytes", "0B").build()) {
QueryId queryId = createQuery(queryRunner, TEST_SESSION, "SELECT COUNT(*) FROM lineitem");
waitForQueryState(queryRunner, queryId, FAILED);
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
Expand All @@ -213,7 +218,7 @@ public void testQueryScanExceeded()
public void testQueryOutputPositionsExceeded()
throws Exception
{
try (DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder().setSingleExtraProperty("query.max-output-positions", "10").build()) {
try (DistributedQueryRunner queryRunner = builder().setSingleExtraProperty("query.max-output-positions", "10").build()) {
QueryId queryId = createQuery(queryRunner, TEST_SESSION, "SELECT * FROM lineitem");
waitForQueryState(queryRunner, queryId, FAILED);
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
Expand All @@ -227,7 +232,7 @@ public void testQueryOutputPositionsExceeded()
public void testQueryOutputSizeExceeded()
throws Exception
{
try (DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder().setSingleExtraProperty("query.max-output-size", "1B").build()) {
try (DistributedQueryRunner queryRunner = builder().setSingleExtraProperty("query.max-output-size", "1B").build()) {
QueryId queryId = createQuery(queryRunner, TEST_SESSION, "SELECT COUNT(*) FROM lineitem");
waitForQueryState(queryRunner, queryId, FAILED);
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
Expand All @@ -237,28 +242,29 @@ public void testQueryOutputSizeExceeded()
}
}

// Flaky test: https://github.com/prestodb/presto/issues/20447
@Test(enabled = false)
@Test
public void testQueryCountMetrics()
throws Exception
{
DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager();
// Create a total of 10 queries to test concurrency limit and
// Create a total of 6 queries to test concurrency limit and
// ensure that some queries are queued as concurrency limit is 3
createQueries(dispatchManager, 10);
createQueries(dispatchManager, 6);

List<BasicQueryInfo> queries = dispatchManager.getQueries();
long queuedQueryCount = dispatchManager.getStats().getQueuedQueries();
long runningQueryCount = dispatchManager.getStats().getRunningQueries();
while (dispatchManager.getStats().getRunningQueries() != 3
|| dispatchManager.getStats().getQueuedQueries() != 3) {
Thread.sleep(1000);
}

assertEquals(queuedQueryCount,
List<BasicQueryInfo> queries = dispatchManager.getQueries();
assertEquals(dispatchManager.getStats().getQueuedQueries(),
queries.stream().filter(basicQueryInfo -> basicQueryInfo.getState() == QUEUED).count());
assertEquals(runningQueryCount,
assertEquals(dispatchManager.getStats().getRunningQueries(),
queries.stream().filter(basicQueryInfo -> basicQueryInfo.getState() == RUNNING).count());

Stopwatch stopwatch = Stopwatch.createStarted();

long oldQueuedQueryCount = queuedQueryCount;
long oldQueuedQueryCount = dispatchManager.getStats().getQueuedQueries();

// Assert that number of queued queries are decreasing with time and
// number of running queries are always <= 3 (max concurrency limit)
Expand All @@ -269,7 +275,7 @@ public void testQueryCountMetrics()

oldQueuedQueryCount = dispatchManager.getStats().getQueuedQueries();

Thread.sleep(100);
Thread.sleep(1000);
}
}
}

0 comments on commit 43a53de

Please sign in to comment.