Skip to content

Commit

Permalink
Table Function Initial Setup.
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vamsimanohar committed Oct 19, 2022
1 parent b30d156 commit f5db21e
Show file tree
Hide file tree
Showing 37 changed files with 1,437 additions and 58 deletions.
29 changes: 27 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.opensearch.sql.analysis;

import static org.opensearch.sql.analysis.model.CatalogName.DEFAULT_CATALOG_NAME;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
Expand Down Expand Up @@ -57,6 +56,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
Expand All @@ -70,6 +70,9 @@
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.Aggregator;
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.function.BuiltinFunctionRepository;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
Expand Down Expand Up @@ -103,16 +106,20 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final CatalogService catalogService;

private final BuiltinFunctionRepository repository;

/**
* Constructor.
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService) {
CatalogService catalogService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
this.repository = repository;
}

public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
Expand Down Expand Up @@ -153,6 +160,24 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
return subquery;
}

@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
CatalogSchemaIdentifierName catalogSchemaIdentifierName
= new CatalogSchemaIdentifierName(qualifiedName.getParts(), catalogService.getCatalogs());

FunctionName functionName = FunctionName.of(catalogSchemaIdentifierName.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierName.getCatalogName(), functionName, arguments);
return new LogicalRelation(catalogSchemaIdentifierName.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.RelationSubquery;
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Values;

/**
Expand Down Expand Up @@ -93,6 +94,10 @@ public T visitRelationSubquery(RelationSubquery node, C context) {
return visitChildren(node, context);
}

public T visitTableFunction(TableFunction node, C context) {
return visitChildren(node, context);
}

public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;

Expand Down Expand Up @@ -87,6 +88,10 @@ public UnresolvedPlan relation(String tableName, String alias) {
return new Relation(qualifiedName(tableName), alias);
}

public UnresolvedPlan tableFunction(List<String> functionName, UnresolvedExpression... args) {
return new TableFunction(new QualifiedName(functionName), Arrays.asList(args));
}

public static UnresolvedPlan project(UnresolvedPlan input, UnresolvedExpression... projectList) {
return new Project(Arrays.asList(projectList)).attach(input);
}
Expand Down
52 changes: 52 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/TableFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* ASTNode for Table Function.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class TableFunction extends UnresolvedPlan {

private final UnresolvedExpression functionName;

@Getter
private final List<UnresolvedExpression> arguments;

public QualifiedName getFunctionName() {
return (QualifiedName) functionName;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of();
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitTableFunction(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
return null;
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/opensearch/sql/expression/DSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static NamedAggregator named(String name, Aggregator aggregator) {
return new NamedAggregator(name, aggregator);
}

public NamedArgumentExpression namedArgument(String argName, Expression value) {
public static NamedArgumentExpression namedArgument(String argName, Expression value) {
return new NamedArgumentExpression(argName, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -23,53 +30,113 @@

/**
* Builtin Function Repository.
* Repository registers catalog specific functions under catalog specific namespace and
* universal functions under default namespace. Catalog Specific Namespace carries their own
* namespace.
*
*/
@RequiredArgsConstructor
public class BuiltinFunctionRepository {
private final Map<FunctionName, FunctionResolver> functionResolverMap;

public static final String DEFAULT_NAMESPACE = "default";

private final Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap;


/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository.
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository
* under default namespace.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(FunctionResolver resolver) {
functionResolverMap.put(resolver.getFunctionName(), resolver);
register(DEFAULT_NAMESPACE, resolver);
}

/**
* Compile FunctionExpression.
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository with
* specified namespace.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(String namespace, FunctionResolver resolver) {
Map<FunctionName, FunctionResolver> functionResolverMap;
if (!namespaceFunctionResolverMap.containsKey(namespace)) {
functionResolverMap = new HashMap<>();
namespaceFunctionResolverMap.put(namespace, functionResolverMap);
}
namespaceFunctionResolverMap.get(namespace).put(resolver.getFunctionName(), resolver);
}


/**
* Compile FunctionExpression under default namespace.
*
*/
public FunctionImplementation compile(FunctionName functionName, List<Expression> expressions) {
FunctionBuilder resolvedFunctionBuilder = resolve(new FunctionSignature(functionName,
expressions.stream().map(expression -> expression.type()).collect(Collectors.toList())));
return compile(DEFAULT_NAMESPACE, functionName, expressions);
}


/**
* Compile FunctionExpression within given namespace.
* Checks for default namespace first and then tries to compile from given namespace.
*/
public FunctionImplementation compile(String namespace, FunctionName functionName,
List<Expression> expressions) {
List<String> namespaceList = new ArrayList<>(List.of(DEFAULT_NAMESPACE));
if (!namespace.equals(DEFAULT_NAMESPACE)) {
namespaceList.add(namespace);
}
FunctionBuilder resolvedFunctionBuilder = resolve(namespaceList,
new FunctionSignature(functionName, expressions
.stream().map(expression -> expression.type()).collect(Collectors.toList())));
return resolvedFunctionBuilder.apply(expressions);
}

/**
* Resolve the {@link FunctionBuilder} in Builtin Function Repository.
* Resolve the {@link FunctionBuilder} in
* repository under a list of namespaces.
* Returns the First FunctionBuilder found.
* So list of namespaces is also the priority of namespaces.
*
* @param functionSignature {@link FunctionSignature} functionsignature.
*
* @param functionSignature {@link FunctionSignature}
* @return Original function builder if it's a cast function or all arguments have expected types.
* Otherwise wrap its arguments by cast function as needed.
* @return Original function builder if it's a cast function or all arguments have expected types
* or other wise wrap its arguments by cast function as needed.
*/
public FunctionBuilder resolve(FunctionSignature functionSignature) {
public FunctionBuilder resolve(List<String> namespaces, FunctionSignature functionSignature) {
FunctionName functionName = functionSignature.getFunctionName();
if (functionResolverMap.containsKey(functionName)) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName) || sourceTypes.equals(targetTypes)) {
return funcBuilder;
FunctionBuilder result = null;
for (String namespace : namespaces) {
if (namespaceFunctionResolverMap.containsKey(namespace)
&& namespaceFunctionResolverMap.get(namespace).containsKey(functionName)) {
result = getFunctionBuilder(functionSignature, functionName,
namespaceFunctionResolverMap.get(namespace));
break;
}
return castArguments(sourceTypes, targetTypes, funcBuilder);
} else {
}
if (result == null) {
throw new ExpressionEvaluationException(
String.format("unsupported function name: %s", functionName.getFunctionName()));
} else {
return result;
}
}

private FunctionBuilder getFunctionBuilder(FunctionSignature functionSignature,
FunctionName functionName,
Map<FunctionName, FunctionResolver> functionResolverMap) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName) || sourceTypes.equals(targetTypes)) {
return funcBuilder;
}
return castArguments(sourceTypes, targetTypes, funcBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.expression.function;

import org.opensearch.sql.storage.Table;

/**
* Interface for table function which returns Table when executed.
*/
public interface TableFunctionImplementation extends FunctionImplementation {

Table applyArguments();

}
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/StorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

package org.opensearch.sql.storage;

import java.util.Collection;
import java.util.Collections;
import org.opensearch.sql.expression.function.FunctionResolver;

/**
* Storage engine for different storage to provide data access API implementation.
*/
Expand All @@ -15,4 +19,14 @@ public interface StorageEngine {
* Get {@link Table} from storage engine.
*/
Table getTable(String name);

/**
* Get list of catalog related functions.
*
* @return FunctionResolvers of catalog functions.
*/
default Collection<FunctionResolver> getFunctions() {
return Collections.emptyList();
}

}
Loading

0 comments on commit f5db21e

Please sign in to comment.