Skip to content

Commit

Permalink
Merge branch '2.x' into hp/feature/maximus/streamingplan
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Nov 19, 2022
2 parents 085adff + e280866 commit d1d7a25
Show file tree
Hide file tree
Showing 163 changed files with 3,422 additions and 3,262 deletions.
27 changes: 27 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,40 @@ buildscript {
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
prometheus_binary_version = "2.37.2"
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
}
if (isSnapshot) {
// 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT (opensearch_build)
opensearch_build += "-SNAPSHOT"
}
getArchType = {
if (System.getProperty("os.arch").startsWith("x") || System.getProperty("os.arch").startsWith("amd")) {
return "amd64"
}
else {
return "arm64"
}
}
getOSFamilyType = {
def os = org.gradle.internal.os.OperatingSystem.current();
if (os.isMacOsX()) {
return "darwin"
}
else if(os.isLinux()){
return "linux"
}
else if(os.isWindows()) {
return "windows"
}
else {
return os.getFamilyName().toString()
}
}
getPrometheusBinaryLocation = { ->
return "https://github.com/prometheus/prometheus/releases/download/v${prometheus_binary_version}/prometheus-${prometheus_binary_version}."+ getOSFamilyType() + "-" + getArchType() + ".tar.gz"
}
}

repositories {
Expand Down
4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jacocoTestReport {
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it,
exclude: ['**/ast/**', '**/catalog/model/**'])
exclude: ['**/ast/**', '**/datasource/model/**'])
}))
}
}
Expand All @@ -80,7 +80,7 @@ jacocoTestCoverageVerification {
excludes = [
'org.opensearch.sql.utils.MLCommonsConstants',
'org.opensearch.sql.utils.Constants',
'org.opensearch.sql.catalog.model.*'
'org.opensearch.sql.datasource.model.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

@Getter
@RequiredArgsConstructor
public class CatalogSchemaName {
public class DataSourceSchemaName {

private final String catalogName;
private final String dataSourceName;

private final String schemaName;

Expand Down
67 changes: 31 additions & 36 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -37,7 +29,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -69,10 +61,10 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
Expand Down Expand Up @@ -101,7 +93,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand All @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final NamedExpressionAnalyzer namedExpressionAnalyzer;

private final CatalogService catalogService;
private final DataSourceService dataSourceService;

private final BuiltinFunctionRepository repository;

Expand All @@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService,
DataSourceService dataSourceService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
this.dataSourceService = dataSourceService;
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
this.repository = repository;
Expand All @@ -142,25 +134,27 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
if (DATASOURCES_TABLE_NAME.equals(tableName)) {
table = new DataSourceTable(dataSourceService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
table = dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
catalogSchemaIdentifierNameResolver.getSchemaName()),
catalogSchemaIdentifierNameResolver.getIdentifierName());
.getTable(new DataSourceSchemaName(
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

Expand Down Expand Up @@ -188,28 +182,29 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);

FunctionName functionName
= FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName());
= FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,53 @@
import java.util.List;
import java.util.Set;

public class CatalogSchemaIdentifierNameResolver {
public class DataSourceSchemaIdentifierNameResolver {

public static final String DEFAULT_CATALOG_NAME = "@opensearch";
public static final String DEFAULT_DATASOURCE_NAME = "@opensearch";
public static final String DEFAULT_SCHEMA_NAME = "default";
public static final String INFORMATION_SCHEMA_NAME = "information_schema";

private String catalogName = DEFAULT_CATALOG_NAME;
private String dataSourceName = DEFAULT_DATASOURCE_NAME;
private String schemaName = DEFAULT_SCHEMA_NAME;
private String identifierName;

private static final String DOT = ".";

/**
* Data model for capturing catalog, schema and identifier from
* Data model for capturing dataSourceName, schema and identifier from
* fully qualifiedName. In the current state, it is used to capture
* CatalogSchemaTable name and CatalogSchemaFunction in case of table
* DataSourceSchemaTable name and DataSourceSchemaFunction in case of table
* functions.
*
* @param parts parts of qualifiedName.
* @param allowedCatalogs allowedCatalogs.
* @param allowedDataSources allowedDataSources.
*/
public CatalogSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
List<String> remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs));
public DataSourceSchemaIdentifierNameResolver(List<String> parts,
Set<String> allowedDataSources) {
List<String> remainingParts
= captureSchemaName(captureDataSourceName(parts, allowedDataSources));
identifierName = String.join(DOT, remainingParts);
}

public String getIdentifierName() {
return identifierName;
}

public String getCatalogName() {
return catalogName;
public String getDataSourceName() {
return dataSourceName;
}

public String getSchemaName() {
return schemaName;
}


// Capture catalog name and return remaining parts(schema name and table name)
// Capture datasource name and return remaining parts(schema name and table name)
// from the fully qualified name.
private List<String> captureCatalogName(List<String> parts, Set<String> allowedCatalogs) {
if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0))
|| DEFAULT_CATALOG_NAME.equals(parts.get(0))) {
catalogName = parts.get(0);
private List<String> captureDataSourceName(List<String> parts, Set<String> allowedDataSources) {
if (parts.size() > 1 && allowedDataSources.contains(parts.get(0))
|| DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) {
dataSourceName = parts.get(0);
return parts.subList(1, parts.size());
} else {
return parts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Cast;
import org.opensearch.sql.ast.expression.Compare;
Expand Down Expand Up @@ -74,7 +73,6 @@
public class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, AnalysisContext> {
@Getter
private final BuiltinFunctionRepository repository;
private final DSL dsl;

@Override
public Expression visitCast(Cast node, AnalysisContext context) {
Expand All @@ -86,7 +84,6 @@ public Expression visitCast(Cast node, AnalysisContext context) {
public ExpressionAnalyzer(
BuiltinFunctionRepository repository) {
this.repository = repository;
this.dsl = new DSL(repository);
}

public Expression analyze(UnresolvedExpression unresolved, AnalysisContext context) {
Expand All @@ -103,7 +100,7 @@ public Expression visitEqualTo(EqualTo node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.equal(left, right);
return DSL.equal(left, right);
}

@Override
Expand All @@ -116,36 +113,36 @@ public Expression visitLiteral(Literal node, AnalysisContext context) {
public Expression visitInterval(Interval node, AnalysisContext context) {
Expression value = node.getValue().accept(this, context);
Expression unit = DSL.literal(node.getUnit().name());
return dsl.interval(value, unit);
return DSL.interval(value, unit);
}

@Override
public Expression visitAnd(And node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.and(left, right);
return DSL.and(left, right);
}

@Override
public Expression visitOr(Or node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.or(left, right);
return DSL.or(left, right);
}

@Override
public Expression visitXor(Xor node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.xor(left, right);
return DSL.xor(left, right);
}

@Override
public Expression visitNot(Not node, AnalysisContext context) {
return dsl.not(node.getExpression().accept(this, context));
return DSL.not(node.getExpression().accept(this, context));
}

@Override
Expand Down Expand Up @@ -226,7 +223,7 @@ private Expression visitIn(
if (valueList.size() == 1) {
return visitCompare(new Compare("=", field, valueList.get(0)), context);
} else if (valueList.size() > 1) {
return dsl.or(
return DSL.or(
visitCompare(new Compare("=", field, valueList.get(0)), context),
visitIn(field, valueList.subList(1, valueList.size()), context));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String getAlias() {

/**
* Get Qualified name preservs parts of the user given identifiers.
* This can later be utilized to determine Catalog,Schema and Table Name during
* This can later be utilized to determine DataSource,Schema and Table Name during
* Analyzer stage. So Passing QualifiedName directly to Analyzer Stage.
*
* @return TableQualifiedName.
Expand Down
Loading

0 comments on commit d1d7a25

Please sign in to comment.