Skip to content

Commit

Permalink
Add prometheus connector initial code
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vamsimanohar committed Sep 30, 2022
1 parent 6607f0e commit c3bb272
Show file tree
Hide file tree
Showing 27 changed files with 606 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,18 @@ public static String format(final String format, Object... args) {
private static boolean isQuoted(String text, String mark) {
return !Strings.isNullOrEmpty(text) && text.startsWith(mark) && text.endsWith(mark);
}

/**
* Unquote query which has $ as mark.
* @param rawQuery querystring that possibly enclosed by '$'.
* @return An unquoted string whose outer pair of '$' quotes have been
* removed
*/
public static String getQueryText(String rawQuery) {
if (isQuoted(rawQuery, "$")) {
return rawQuery.substring(1, rawQuery.length() - 1);
} else {
return rawQuery;
}
}
}
32 changes: 32 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Map;
Expand All @@ -53,6 +54,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
Expand Down Expand Up @@ -81,6 +83,7 @@
import org.opensearch.sql.planner.logical.LogicalRemove;
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalTableFunction;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;
Expand Down Expand Up @@ -158,6 +161,17 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
return subquery;
}

@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
Function function = (Function) node.getTableFunction();
String catalogName
= getCatalogNameFromQualifiedTableFunctionName(function.getFuncName());
Expression tableFunction = expressionAnalyzer.analyze(function, context);
return new LogicalTableFunction(tableFunction,
catalogService.getStorageEngine(catalogName).getTable(null));
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down Expand Up @@ -478,4 +492,22 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

private String getCatalogNameFromQualifiedTableFunctionName(String qualifiedFunctionName) {
int firstDotIndex = qualifiedFunctionName.indexOf('.');
if (firstDotIndex > 0 && catalogService.getCatalogs()
.contains(qualifiedFunctionName.substring(0, firstDotIndex))) {
return qualifiedFunctionName.substring(0, firstDotIndex);
} else {
if (firstDotIndex > 0) {
throw new SemanticCheckException(
String.format("Catalog: %s not found",
qualifiedFunctionName.substring(0, firstDotIndex)));
} else {
throw new SemanticCheckException(
String.format("Catalog not specified along with table function: %s",
qualifiedFunctionName));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public Expression visitAggregateFunction(AggregateFunction node, AnalysisContext
if (builtinFunctionName.isPresent()) {
Expression arg = node.getField().accept(this, context);
Aggregator aggregator = (Aggregator) repository.compile(
builtinFunctionName.get().getName(), Collections.singletonList(arg));
builtinFunctionName.get().getName(), Collections.singletonList(arg));
aggregator.distinct(node.getDistinct());
if (node.condition() != null) {
aggregator.condition(analyze(node.condition(), context));
Expand Down Expand Up @@ -185,11 +185,16 @@ public Expression visitConstantFunction(ConstantFunction node, AnalysisContext c

@Override
public Expression visitFunction(Function node, AnalysisContext context) {
FunctionName functionName = FunctionName.of(node.getFuncName());
List<Expression> arguments =
node.getFuncArgs().stream()
.map(unresolvedExpression -> analyze(unresolvedExpression, context))
.collect(Collectors.toList());
FunctionName functionName;
if (node.getIsTableFunction()) {
functionName =
FunctionName.of(node.getFuncName().substring(node.getFuncName().indexOf(".") + 1));
} else {
functionName = FunctionName.of(node.getFuncName());
}
List<Expression> arguments = node.getFuncArgs().stream()
.map(unresolvedExpression -> analyze(unresolvedExpression, context))
.collect(Collectors.toList());
return (Expression) repository.compile(functionName, arguments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.RelationSubquery;
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Values;

/**
Expand Down Expand Up @@ -93,6 +94,10 @@ public T visitRelationSubquery(RelationSubquery node, C context) {
return visitChildren(node, context);
}

public T visitTableFunction(TableFunction node, C context) {
return visitChildren(node, context);
}

public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;

Expand Down Expand Up @@ -94,6 +95,10 @@ public static UnresolvedPlan projectWithArg(
return new Project(Arrays.asList(projectList), argList).attach(input);
}

public UnresolvedPlan tableFunction(String funcName, UnresolvedExpression... funcArgs) {
return new TableFunction(new Function(funcName, List.of(funcArgs), true));
}

public static UnresolvedPlan agg(
UnresolvedPlan input,
List<UnresolvedExpression> aggList,
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/expression/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Function extends UnresolvedExpression {

private final String funcName;
private final List<UnresolvedExpression> funcArgs;
private final Boolean isTableFunction;

/**
* Constructor support with only funcName and funcArgs.
*
* @param funcName FunctionName.
* @param funcArgs FunctionArguments.
*/
public Function(String funcName, List<UnresolvedExpression> funcArgs) {
this.funcName = funcName;
this.funcArgs = funcArgs;
this.isTableFunction = false;
}

@Override
public List<UnresolvedExpression> getChild() {
Expand Down
47 changes: 47 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/TableFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* ASTNode for Table Function.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class TableFunction extends UnresolvedPlan {

private final UnresolvedExpression tableFunction;

public UnresolvedExpression getTableFunction() {
return tableFunction;
}


@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of();
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitTableFunction(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,21 @@
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Getter;
import lombok.Setter;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "connector",
defaultImpl = AbstractAuthenticationData.class,
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = PrometheusCatalogMetadata.class, name = "prometheus")
})
@Getter
@Setter
public class CatalogMetadata {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
public class PrometheusCatalogMetadata extends CatalogMetadata {

private Long defaultTimeRange;

}
4 changes: 4 additions & 0 deletions core/src/main/java/org/opensearch/sql/expression/DSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ public FunctionExpression current_date(Expression... args) {
return compile(BuiltinFunctionName.CURRENT_DATE, args);
}

public FunctionExpression query_range_function(Expression... args) {
return compile(BuiltinFunctionName.QUERY_RANGE, args);
}

private FunctionExpression compile(BuiltinFunctionName bfn, Expression... args) {
return (FunctionExpression) repository.compile(bfn.getName(), Arrays.asList(args.clone()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.expression.datetime.IntervalClause;
import org.opensearch.sql.expression.function.BuiltinFunctionRepository;
import org.opensearch.sql.expression.function.OpenSearchFunctions;
import org.opensearch.sql.expression.function.PrometheusFunctions;
import org.opensearch.sql.expression.operator.arthmetic.ArithmeticFunction;
import org.opensearch.sql.expression.operator.arthmetic.MathematicalFunction;
import org.opensearch.sql.expression.operator.convert.TypeCastOperator;
Expand Down Expand Up @@ -46,6 +47,7 @@ public BuiltinFunctionRepository functionRepository() {
TextFunction.register(builtinFunctionRepository);
TypeCastOperator.register(builtinFunctionRepository);
OpenSearchFunctions.register(builtinFunctionRepository);
PrometheusFunctions.register(builtinFunctionRepository);
return builtinFunctionRepository;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ public enum BuiltinFunctionName {
QUERY(FunctionName.of("query")),
MATCH_QUERY(FunctionName.of("match_query")),
MATCHQUERY(FunctionName.of("matchquery")),
MULTI_MATCH(FunctionName.of("multi_match"));
MULTI_MATCH(FunctionName.of("multi_match")),

/**
* Prometheus Table Function.
*/
QUERY_RANGE(FunctionName.of("query_range"));

private final FunctionName name;

Expand Down
Loading

0 comments on commit c3bb272

Please sign in to comment.