Skip to content

Commit

Permalink
fix: don't create threads per request (#6665)
Browse files Browse the repository at this point in the history
* fix: creating threads per request

* make HARouting singleton

* working on fixing tests

* use enabled ksql client in test

* Trigger Build

* fix tests?
  • Loading branch information
vpapavas authored Dec 5, 2020
1 parent 989e52b commit 132d50d
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -152,6 +153,7 @@ TransientQueryMetadata executeQuery(
PullQueryResult executePullQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingFilterFactory routingFilterFactory,
RoutingOptions routingOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ ExecuteResult execute(final KsqlPlan plan) {
*/
PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
Expand All @@ -164,12 +165,9 @@ PullQueryResult executePullQuery(
ksqlConfig,
analysis,
statement);

try (HARouting routing = new HARouting(
ksqlConfig, physicalPlan, routingFilterFactory, routingOptions, statement, serviceContext,
physicalPlan.getOutputSchema(), physicalPlan.getQueryId(), pullQueryMetrics)) {
return routing.handlePullQuery();
}
return routing.handlePullQuery(
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId());
} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -266,6 +267,7 @@ public TransientQueryMetadata executeQuery(
public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
Expand All @@ -278,6 +280,7 @@ public PullQueryResult executePullQuery(
)
.executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -162,6 +163,7 @@ public TransientQueryMetadata executeQuery(
public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
Expand All @@ -172,6 +174,7 @@ public PullQueryResult executePullQuery(
statement.getSessionConfig()
).executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode;
Expand Down Expand Up @@ -56,61 +57,54 @@
public final class HARouting implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(HARouting.class);

private final ExecutorService executorService;
private final PullPhysicalPlan pullPhysicalPlan;
private final RoutingFilterFactory routingFilterFactory;
private final RoutingOptions routingOptions;
private final ConfiguredStatement<Query> statement;
private final ServiceContext serviceContext;
private final LogicalSchema outputSchema;
private final QueryId queryId;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RouteQuery routeQuery;

public HARouting(
final KsqlConfig ksqlConfig,
final PullPhysicalPlan pullPhysicalPlan,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext,
final LogicalSchema outputSchema,
final QueryId queryId,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final KsqlConfig ksqlConfig
) {
this(ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement,
serviceContext, outputSchema, queryId, pullQueryMetrics, HARouting::executeOrRouteQuery);
this(routingFilterFactory, serviceContext, pullQueryMetrics, ksqlConfig,
HARouting::executeOrRouteQuery);
}


@VisibleForTesting
HARouting(
final KsqlConfig ksqlConfig,
final PullPhysicalPlan pullPhysicalPlan,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final ConfiguredStatement<Query> statement,
final ServiceContext serviceContext,
final LogicalSchema outputSchema,
final QueryId queryId,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final KsqlConfig ksqlConfig,
final RouteQuery routeQuery
) {
this.pullPhysicalPlan = Objects.requireNonNull(pullPhysicalPlan, "pullPhysicalPlan");
this.routingFilterFactory =
Objects.requireNonNull(routingFilterFactory, "routingFilterFactory");
this.routingOptions = Objects.requireNonNull(routingOptions, "routingOptions");
this.statement = Objects.requireNonNull(statement, "statement");
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.outputSchema = Objects.requireNonNull(outputSchema, "outputSchema");
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
this.executorService = Executors.newFixedThreadPool(
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)
);
this.routeQuery = Objects.requireNonNull(routeQuery, "routeQuery");
ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG),
new ThreadFactoryBuilder().setNameFormat("pull-query-executor-%d").build());
this.pullQueryMetrics = Objects.requireNonNull(pullQueryMetrics, "pullQueryMetrics");
this.routeQuery = Objects.requireNonNull(routeQuery);
}

public PullQueryResult handlePullQuery() throws InterruptedException {
@Override
public void close() {
executorService.shutdown();
}

public PullQueryResult handlePullQuery(
final PullPhysicalPlan pullPhysicalPlan,
final ConfiguredStatement<Query> statement,
final RoutingOptions routingOptions,
final LogicalSchema outputSchema,
final QueryId queryId
) throws InterruptedException {
final List<KsqlPartitionLocation> locations = pullPhysicalPlan.getMaterialization().locator()
.locate(
pullPhysicalPlan.getKeys(),
Expand Down Expand Up @@ -196,11 +190,6 @@ public PullQueryResult handlePullQuery() throws InterruptedException {
}
}

@Override
public void close() {
executorService.shutdown();
}

/**
* Groups all of the partition locations by the round-th entry in their prioritized list of host
* nodes.
Expand Down Expand Up @@ -262,7 +251,6 @@ static PullQueryResult executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1));
rows = pullPhysicalPlan.execute(locations);

} else {
LOG.debug("Query {} routed to host {} at timestamp {}.",
statement.getStatementText(), node.location(), System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ public class HARoutingTest {
@Mock
private RoutingFilterFactory routingFilterFactory;
@Mock
private KsqlConfig ksqlConfig;
@Mock
private PullPhysicalPlan pullPhysicalPlan;
@Mock
private Materialization materialization;
@Mock
private Locator locator;
@Mock
private RouteQuery routeQuery;
@Mock
private KsqlConfig ksqlConfig;

private HARouting haRouting;

Expand All @@ -101,12 +101,9 @@ public void setUp() {
when(location2.getNodes()).thenReturn(ImmutableList.of(node2, node1));
when(location3.getNodes()).thenReturn(ImmutableList.of(node1, node2));
when(location4.getNodes()).thenReturn(ImmutableList.of(node2, node1));
when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG))
.thenReturn(1);

when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)).thenReturn(1);
haRouting = new HARouting(
ksqlConfig, pullPhysicalPlan, routingFilterFactory, routingOptions, statement,
serviceContext, logicalSchema, queryId, Optional.empty(), routeQuery);
routingFilterFactory, serviceContext, Optional.empty(), ksqlConfig, routeQuery);

}

Expand Down Expand Up @@ -145,7 +142,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException {
});

// When:
PullQueryResult result = haRouting.handlePullQuery();
PullQueryResult result = haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId);

// Then:
verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any());
Expand Down Expand Up @@ -195,7 +192,7 @@ public Object answer(InvocationOnMock invocation) {
});

// When:
PullQueryResult result = haRouting.handlePullQuery();
PullQueryResult result = haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId);

// Then:
verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any());
Expand Down Expand Up @@ -247,7 +244,7 @@ public Object answer(InvocationOnMock invocation) {
// When:
final Exception e = assertThrows(
MaterializationException.class,
() -> haRouting.handlePullQuery()
() -> haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId)
);

// Then:
Expand Down Expand Up @@ -280,7 +277,7 @@ public void shouldCallRouteQuery_allFiltered() {
// When:
final Exception e = assertThrows(
MaterializationException.class,
() -> haRouting.handlePullQuery()
() -> haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId)
);

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public void tearDown() {
.excludeTerminated()
// There is a pool of ksql worker threads that grows over time, but is capped.
.nameMatches(name -> !name.startsWith("ksql-workers"))
// There is a pool for HARouting worker threads that grows over time, but is capped to 100
.nameMatches(name -> !name.startsWith("pull-query-executor"))
.build()));
} else {
STARTING_THREADS.get().assertSameThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.rest.entity.TableRows;
Expand Down Expand Up @@ -59,19 +60,22 @@ public class QueryEndpoint {
private final RoutingFilterFactory routingFilterFactory;
private final Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final RateLimiter rateLimiter;
private final HARouting routing;

public QueryEndpoint(
final KsqlEngine ksqlEngine,
final KsqlConfig ksqlConfig,
final RoutingFilterFactory routingFilterFactory,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final RateLimiter rateLimiter
final RateLimiter rateLimiter,
final HARouting routing
) {
this.ksqlEngine = ksqlEngine;
this.ksqlConfig = ksqlConfig;
this.routingFilterFactory = routingFilterFactory;
this.pullQueryMetrics = pullQueryMetrics;
this.rateLimiter = rateLimiter;
this.routing = routing;
}

public QueryPublisher createQueryPublisher(
Expand Down Expand Up @@ -131,6 +135,7 @@ private QueryPublisher createPullQueryPublisher(
final PullQueryResult result = ksqlEngine.executePullQuery(
serviceContext,
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
Expand Down
Loading

0 comments on commit 132d50d

Please sign in to comment.