Skip to content

Commit

Permalink
modernizer for presto-pinot-toolkit
Browse files Browse the repository at this point in the history
  • Loading branch information
imjalpreet authored and ZacBlanco committed Dec 3, 2024
1 parent 190e738 commit 81cfece
Show file tree
Hide file tree
Showing 14 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public String doHttpActionWithHeaders(
if (rpcService.isPresent()) {
requestBuilder
.setHeader(pinotConfig.getCallerHeaderParam(), pinotConfig.getCallerHeaderValue())
.setHeader(pinotConfig.getServiceHeaderParam(), rpcService.get());
.setHeader(pinotConfig.getServiceHeaderParam(), rpcService.orElseThrow());
}
if (requestBody.isPresent()) {
requestBuilder.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(requestBody.get(), StandardCharsets.UTF_8));
requestBuilder.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(requestBody.orElseThrow(), StandardCharsets.UTF_8));
}
pinotConfig.getExtraHttpHeaders().forEach(requestBuilder::setHeader);
Request request = requestBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public String getMessage()
{
String message = super.getMessage();
if (pinotQuery.isPresent()) {
message += " with pinot query \"" + pinotQuery.get() + "\"";
message += " with pinot query \"" + pinotQuery.orElseThrow() + "\"";
}
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ConnectorPageSource createPageSource(
return new PinotBrokerPageSource(
pinotConfig,
session,
pinotSplit.getBrokerPinotQuery().get(),
pinotSplit.getBrokerPinotQuery().orElseThrow(),
handles,
pinotSplit.getExpectedColumnHandles(),
clusterInfoFetcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan, TableScanNode t
return Optional.empty();
}
PinotTableHandle pinotTableHandle = getPinotTableHandle(tableScanNode).orElseThrow(() -> new PinotException(PINOT_UNCLASSIFIED_ERROR, Optional.empty(), "Expected to find a pinot table handle"));
PinotQueryGeneratorContext context = pinotQuery.get().getContext();
PinotQueryGeneratorContext context = pinotQuery.orElseThrow().getContext();
TableHandle oldTableHandle = tableScanNode.getTable();
LinkedHashMap<VariableReferenceExpression, PinotColumnHandle> assignments = context.getAssignments();
boolean forBroker = pinotQuery.get().getGeneratedPinotQuery().forBroker();
boolean forBroker = pinotQuery.orElseThrow().getGeneratedPinotQuery().forBroker();
TableHandle newTableHandle = new TableHandle(
oldTableHandle.getConnectorId(),
new PinotTableHandle(
Expand All @@ -173,7 +173,7 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan, TableScanNode t
pinotTableHandle.getTableName(),
Optional.of(forBroker),
Optional.of(ImmutableList.copyOf(assignments.values())),
Optional.of(pinotQuery.get().getGeneratedPinotQuery())),
Optional.of(pinotQuery.orElseThrow().getGeneratedPinotQuery())),
oldTableHandle.getTransaction(),
oldTableHandle.getLayout());
return Optional.of(
Expand Down Expand Up @@ -226,7 +226,7 @@ public PlanNode visitFilter(FilterNode node, TableScanNode context)

filtersSplitUp.put(pushableFilter, null);
if (nonPushableFilter.isPresent()) {
FilterNode nonPushableFilterNode = nonPushableFilter.get();
FilterNode nonPushableFilterNode = nonPushableFilter.orElseThrow();
filtersSplitUp.put(nonPushableFilterNode, null);
nodeToRecurseInto = nonPushableFilterNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static List<AggregationColumnNode> computeAggregationNodes(AggregationNod
if (aggregation.getMask().isPresent()) {
// This block handles the case when a distinct aggregation is present in addition to another aggregation function.
// E.g. `SELECT count(distinct COL_A), sum(COL_B) FROM myTable` to Pinot as `SELECT distinctCount(COL_A), sum(COL_B) FROM myTable`
if (aggregation.getCall().getDisplayName().equalsIgnoreCase(COUNT_FUNCTION_NAME) && aggregation.getMask().get().getName().contains(DISTINCT_MASK)) {
if (aggregation.getCall().getDisplayName().equalsIgnoreCase(COUNT_FUNCTION_NAME) && aggregation.getMask().orElseThrow().getName().contains(DISTINCT_MASK)) {
nodeBuilder.add(new AggregationFunctionColumnNode(outputColumn, new CallExpression(aggregation.getCall().getSourceLocation(), PINOT_DISTINCT_COUNT_FUNCTION_NAME, aggregation.getCall().getFunctionHandle(), aggregation.getCall().getType(), aggregation.getCall().getArguments())));
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public List<String> getSegments()
public Optional<String> getGrpcHost()
{
if (segmentHost.isPresent()) {
String[] hostSplits = segmentHost.get().split("_");
String[] hostSplits = segmentHost.orElseThrow().split("_");
return (hostSplits.length > 1) ? Optional.of(hostSplits[hostSplits.length - 2]) : segmentHost;
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private String getSegmentPinotQuery(GeneratedPinotQuery basePinotQuery, String s
{
String pinotQuery = basePinotQuery.getQuery().replace(TABLE_NAME_SUFFIX_TEMPLATE, suffix);
if (timePredicate.isPresent()) {
String tp = timePredicate.get();
String tp = timePredicate.orElseThrow();
pinotQuery = pinotQuery.replace(TIME_BOUNDARY_FILTER_TEMPLATE, basePinotQuery.isHaveFilter() ? " AND " + tp : " WHERE " + tp);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public PinotExpression visitCall(
FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(functionHandle);
Optional<OperatorType> operatorTypeOptional = functionMetadata.getOperatorType();
if (operatorTypeOptional.isPresent()) {
OperatorType operatorType = operatorTypeOptional.get();
OperatorType operatorType = operatorTypeOptional.orElseThrow();
if (operatorType.isArithmeticOperator()) {
return handleArithmeticExpression(call, operatorType, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private Optional<PinotExpression> handleDateOrTimestampBinaryExpression(String o
Optional<String> left = handleTimeValueCast(context, arguments.get(1), arguments.get(0));
Optional<String> right = handleTimeValueCast(context, arguments.get(0), arguments.get(1));
if (left.isPresent() && right.isPresent()) {
return Optional.of(derived(format("(%s %s %s)", left.get(), operator, right.get())));
return Optional.of(derived(format("(%s %s %s)", left.orElseThrow(), operator, right.orElseThrow())));
}
return Optional.empty();
}
Expand Down Expand Up @@ -399,7 +399,7 @@ public PinotExpression visitCall(CallExpression call, Function<VariableReference
FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
Optional<OperatorType> operatorType = functionMetadata.getOperatorType();
if (standardFunctionResolution.isComparisonFunction(functionHandle) && operatorType.isPresent()) {
return handleLogicalBinary(operatorType.get().getOperator(), call, context);
return handleLogicalBinary(operatorType.orElseThrow().getOperator(), call, context);
}
if ("contains".equals(functionMetadata.getName().getObjectName())) {
return handleContains(call, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ public PinotExpression visitCall(CallExpression call, Map<VariableReferenceExpre
FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
Optional<OperatorType> operatorType = functionMetadata.getOperatorType();
if (standardFunctionResolution.isComparisonFunction(functionHandle) && operatorType.isPresent()) {
return handleLogicalBinary(call, operatorType.get().getOperator(), context);
return handleLogicalBinary(call, operatorType.orElseThrow().getOperator(), context);
}
if (standardFunctionResolution.isArithmeticFunction(functionHandle) && operatorType.isPresent()) {
return handleArithmeticExpression(call, operatorType.get(), context);
return handleArithmeticExpression(call, operatorType.orElseThrow(), context);
}
if (standardFunctionResolution.isNegateFunction(functionHandle)) {
return derived('-' + call.getArguments().get(0).accept(this, context).getDefinition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private String generatePinotQueryHelper(boolean forBroker, String expressions, S
{
String query = "SELECT " + expressions + " FROM " + tableName + (forBroker ? "" : TABLE_NAME_SUFFIX_TEMPLATE);
if (filter.isPresent()) {
String filterString = filter.get();
String filterString = filter.orElseThrow();
// this is hack!!!. Ideally we want to clone the scan pipeline and create/update the filter in the scan pipeline to contain this filter and
// at the same time add the time column to scan so that the query generator doesn't fail when it looks up the time column in scan output columns
query += format(" WHERE %s%s", filterString, forBroker ? "" : TIME_BOUNDARY_FILTER_TEMPLATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segment
SessionHolder sessionHolder = new SessionHolder(pinotConfig);
PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).orElseThrow();
List<PinotColumnHandle> expectedHandles = ImmutableList.copyOf(pinotQueryGeneratorResult.getContext().getAssignments().values());
PinotQueryGenerator.GeneratedPinotQuery generatedSql = pinotQueryGeneratorResult.getGeneratedPinotQuery();
PinotTableHandle pinotTableHandle = new PinotTableHandle(table.getConnectorId(), table.getSchemaName(), table.getTableName(), Optional.of(false), Optional.of(expectedHandles), Optional.of(generatedSql));
Expand All @@ -81,7 +81,7 @@ private void testSegmentSplitsHelperWithFilter(PinotTableHandle table, int segme
SessionHolder sessionHolder = new SessionHolder(pinotConfig);
PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
PlanNode plan = filter(planBuilder, tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch), getRowExpression("city = 'Boston'", sessionHolder));
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).orElseThrow();
List<PinotColumnHandle> expectedHandles = ImmutableList.copyOf(pinotQueryGeneratorResult.getContext().getAssignments().values());
PinotQueryGenerator.GeneratedPinotQuery generatedSql = pinotQueryGeneratorResult.getGeneratedPinotQuery();
PinotTableHandle pinotTableHandle = new PinotTableHandle(table.getConnectorId(), table.getSchemaName(), table.getTableName(), Optional.of(false), Optional.of(expectedHandles), Optional.of(generatedSql));
Expand All @@ -103,12 +103,12 @@ private void testSegmentLimitLarge(PinotTableHandle table, int sessionLimitLarge
ConnectorSession session = createSessionWithLimitLarge(sessionLimitLarge, pinotConfig);
PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, session).get();
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, session).orElseThrow();
String[] limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split("LIMIT ");
assertEquals(Integer.parseInt(limits[1]), sessionLimitLarge);

plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).get();
pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(plan, sessionHolder.getConnectorSession()).orElseThrow();
limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split("LIMIT ");
assertEquals(Integer.parseInt(limits[1]), configLimitLarge);
}
Expand All @@ -134,7 +134,7 @@ private void testBrokerTopNLarge(PinotTableHandle table, int sessionTopNLarge, i
.addAggregation(planBuilder.variable("sum_fare"), getRowExpression("sum(fare)", sessionHolder))
.addAggregation(planBuilder.variable("count_regionid"), getRowExpression("count(regionid)", sessionHolder)));

PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, session).get();
PinotQueryGenerator.PinotQueryGeneratorResult pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, session).orElseThrow();
String[] limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split(LIMIT_KEYWORD_SPLITTER);
assertEquals(Integer.parseInt(limits[1]), sessionTopNLarge);

Expand All @@ -144,18 +144,18 @@ private void testBrokerTopNLarge(PinotTableHandle table, int sessionTopNLarge, i
.singleGroupingSet(variable("city"), variable("regionid"))
.addAggregation(planBuilder.variable("sum_fare"), getRowExpression("sum(fare)", sessionHolder))
.addAggregation(planBuilder.variable("count_regionid"), getRowExpression("count(regionid)", sessionHolder)));
pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, sessionHolder.getConnectorSession()).get();
pinotQueryGeneratorResult = new PinotQueryGenerator(pinotConfig, functionAndTypeManager, functionAndTypeManager, standardFunctionResolution).generate(aggregationNode, sessionHolder.getConnectorSession()).orElseThrow();
limits = pinotQueryGeneratorResult.getGeneratedPinotQuery().getQuery().split(LIMIT_KEYWORD_SPLITTER);
assertEquals(Integer.parseInt(limits[1]), configTopNLarge);
}

@Test
public void testGetTimeBoundaryForTable()
{
assertEquals(pinotConnection.getTimeBoundary("hybrid").getOfflineTimePredicate().get(), "secondsSinceEpoch < '4562345'");
assertEquals(pinotConnection.getTimeBoundary("hybrid").getOnlineTimePredicate().get(), "secondsSinceEpoch >= '4562345'");
assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOfflineTimePredicate().get(), "ts < '2022-05-29 23:56:53.312'");
assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOnlineTimePredicate().get(), "ts >= '2022-05-29 23:56:53.312'");
assertEquals(pinotConnection.getTimeBoundary("hybrid").getOfflineTimePredicate().orElseThrow(), "secondsSinceEpoch < '4562345'");
assertEquals(pinotConnection.getTimeBoundary("hybrid").getOnlineTimePredicate().orElseThrow(), "secondsSinceEpoch >= '4562345'");
assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOfflineTimePredicate().orElseThrow(), "ts < '2022-05-29 23:56:53.312'");
assertEquals(pinotConnection.getTimeBoundary("hybridTableWithTsTimeColumn").getOnlineTimePredicate().orElseThrow(), "ts >= '2022-05-29 23:56:53.312'");
assertFalse(pinotConnection.getTimeBoundary("unknown").getOfflineTimePredicate().isPresent());
assertFalse(pinotConnection.getTimeBoundary("unknown").getOfflineTimePredicate().isPresent());
}
Expand Down Expand Up @@ -209,12 +209,12 @@ private void assertSegmentSplitWellFormed(PinotSplit split, boolean expectFilter
assertTrue(split.getSegmentPinotQuery().isPresent());
assertTrue(split.getSegmentHost().isPresent());
assertTrue(split.getGrpcHost().isPresent());
assertTrue(split.getGrpcHost().get().length() > 0);
assertEquals(split.getGrpcHost().get(), split.getSegmentHost().get());
assertTrue(split.getGrpcHost().orElseThrow().length() > 0);
assertEquals(split.getGrpcHost().orElseThrow(), split.getSegmentHost().orElseThrow());
assertTrue(split.getGrpcPort().isPresent());
assertEquals(split.getGrpcPort().get().intValue(), MockPinotClusterInfoFetcher.DEFAULT_GRPC_PORT);
assertEquals(split.getGrpcPort().orElseThrow().intValue(), MockPinotClusterInfoFetcher.DEFAULT_GRPC_PORT);
assertFalse(split.getSegments().isEmpty());
String sql = split.getSegmentPinotQuery().get();
String sql = split.getSegmentPinotQuery().orElseThrow();
assertFalse(sql.contains("__")); // templates should be fully resolved
List<String> splitOnWhere = Splitter.on(" WHERE ").splitToList(sql);
// There should be exactly one WHERE clause and it should partition the sql into two
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ private static boolean checkPinotQueryMatches(Optional<String> regex, Optional<S
return true;
}
if (sql.isPresent() && regex.isPresent()) {
String toMatch = sql.get();
Pattern compiled = Pattern.compile(regex.get(), Pattern.CASE_INSENSITIVE);
String toMatch = sql.orElseThrow();
Pattern compiled = Pattern.compile(regex.orElseThrow(), Pattern.CASE_INSENSITIVE);
return compiled.matcher(toMatch).matches();
}
return false;
Expand Down
Loading

0 comments on commit 81cfece

Please sign in to comment.