Skip to content

Commit

Permalink
Merge branch '2.x' into integ-add-legacy-syntax-for-matchphrase-function
Browse files Browse the repository at this point in the history
  • Loading branch information
GabeFernandez310 authored Dec 1, 2022
2 parents 6b23d10 + e2bf254 commit 3a96e0b
Show file tree
Hide file tree
Showing 61 changed files with 1,829 additions and 246 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ gen
/artifacts/
/.pid.lock
/.prom.pid.lock

.java-version
4 changes: 4 additions & 0 deletions DEVELOPER_GUIDE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ The plugin codebase is in standard layout of Gradle project::
├── core
├── doctest
├── opensearch
├── filesystem
├── prometheus
├── integ-test
├── legacy
├── plugin
Expand All @@ -159,6 +161,8 @@ Here are sub-folders (Gradle modules) for plugin source code:
- ``ppl``: PPL language processor.
- ``core``: core query engine.
- ``opensearch``: OpenSearch storage engine.
- ``prometheus``: Prometheus storage engine.
- ``filesystem``: File System storage engine (in development).
- ``protocol``: request/response protocol formatter.
- ``common``: common util code.
- ``integ-test``: integration and comparison test.
Expand Down
7 changes: 1 addition & 6 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'java-test-fixtures'
}

repositories {
mavenCentral()
}
//
//configurations.all {
// resolutionStrategy.dependencySubstitution {
// substitute module('com.google.guava:guava:26.0-jre') with module('com.google.guava:guava:29.0-jre')
// }
//}

dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@RequiredArgsConstructor
public class Query extends Statement {

private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
package org.opensearch.sql.datasource.model;

public enum DataSourceType {
PROMETHEUS,OPENSEARCH
PROMETHEUS,
OPENSEARCH,
FILESYSTEM
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import java.util.Optional;
import lombok.Getter;
import org.opensearch.sql.storage.split.Split;

/**
* Execution context hold planning related information.
*/
public class ExecutionContext {
@Getter
private final Optional<Split> split;

public ExecutionContext(Split split) {
this.split = Optional.of(split);
}

private ExecutionContext(Optional<Split> split) {
this.split = split;
}

public static ExecutionContext emptyExecutionContext() {
return new ExecutionContext(Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ public interface ExecutionEngine {

/**
* Execute physical plan and call back response listener.
* Todo. deprecated this interface after finalize {@link ExecutionContext}.
*
* @param plan executable physical plan
* @param listener response listener
*/
void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener);

/**
* Execute physical plan with {@link ExecutionContext} and call back response listener.
*/
void execute(PhysicalPlan plan, ExecutionContext context,
ResponseListener<QueryResponse> listener);

/**
* Explain physical plan and call back response listener. The reason why this has to
* be part of execution engine interface is that the physical plan probably needs to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ public interface QueryManager {
* @return {@link QueryId}.
*/
QueryId submit(AbstractPlan queryPlan);

/**
* Cancel submitted {@link AbstractPlan} by {@link QueryId}.
*
* @return true indicate successful.
*/
default boolean cancel(QueryId queryId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public void executePlan(LogicalPlan plan,
PlanContext planContext,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan(plan), listener);
planContext
.getSplit()
.ifPresentOrElse(
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
() -> executionEngine.execute(
plan(plan), ExecutionContext.emptyExecutionContext(), listener));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class QueryPlan extends AbstractPlan {
/**
* The query plan ast.
*/
private final UnresolvedPlan plan;
protected final UnresolvedPlan plan;

/**
* Query service.
*/
private final QueryService queryService;
protected final QueryService queryService;

private final ResponseListener<ExecutionEngine.QueryResponse> listener;
protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** constructor. */
public QueryPlan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalRelation;

/**
* Streaming Query Plan.
*/
public class StreamingQueryPlan extends QueryPlan {

private static final Logger log = LogManager.getLogger(StreamingQueryPlan.class);

private final ExecutionStrategy executionStrategy;

private MicroBatchStreamingExecution streamingExecution;

/**
* constructor.
*/
public StreamingQueryPlan(QueryId queryId,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
ExecutionStrategy executionStrategy) {
super(queryId, plan, queryService, listener);

this.executionStrategy = executionStrategy;
}

@Override
public void execute() {
try {
LogicalPlan logicalPlan = queryService.analyze(plan);
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
streamingExecution =
new MicroBatchStreamingExecution(
streamingSource,
logicalPlan,
queryService,
new DefaultMetadataLog<>(),
new DefaultMetadataLog<>());
executionStrategy.execute(streamingExecution::execute);
} catch (UnsupportedOperationException | IllegalArgumentException e) {
listener.onFailure(e);
} catch (InterruptedException e) {
log.error(e);
// todo, update async task status.
}
}

interface ExecutionStrategy {
/**
* execute task.
*/
void execute(Runnable task) throws InterruptedException;
}

/**
* execute task with fixed interval.
* if task run time < interval, trigger next task on next interval.
* if task run time >= interval, trigger next task immediately.
*/
@RequiredArgsConstructor
public static class IntervalTriggerExecution implements ExecutionStrategy {

private final long intervalInSeconds;

@Override
public void execute(Runnable runnable) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
try {
Instant start = Instant.now();
runnable.run();
Instant end = Instant.now();
long took = Duration.between(start, end).toSeconds();
TimeUnit.SECONDS.sleep(intervalInSeconds > took ? intervalInSeconds - took : 0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

private StreamingSource buildStreamingSource(LogicalPlan logicalPlan) {
return logicalPlan.accept(new StreamingSourceBuilder(), null);
}

static class StreamingSourceBuilder extends LogicalPlanNodeVisitor<StreamingSource, Void> {
@Override
public StreamingSource visitNode(LogicalPlan plan, Void context) {
List<LogicalPlan> children = plan.getChild();
if (children.isEmpty()) {
String errorMsg =
String.format(
"Could find relation plan, %s does not have child node.",
plan.getClass().getSimpleName());
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
return children.get(0).accept(this, context);
}

@Override
public StreamingSource visitRelation(LogicalRelation plan, Void context) {
try {
return plan.getTable().asStreamingSource();
} catch (UnsupportedOperationException e) {
String errorMsg =
String.format(
"table %s could not been used as streaming source.", plan.getRelationName());
log.error(errorMsg);
throw new UnsupportedOperationException(errorMsg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void onResponse(ExecutionEngine.QueryResponse response) {

@Override
public void onFailure(Exception e) {
log.error("streaming processing failed. source = {}", source);
log.error("streaming processing failed. source = {} {}", source, e);
}
});
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/opensearch/sql/expression/DSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public static FunctionExpression sqrt(Expression... expressions) {
return compile(BuiltinFunctionName.SQRT, expressions);
}

public static FunctionExpression cbrt(Expression... expressions) {
return compile(BuiltinFunctionName.CBRT, expressions);
}

public static FunctionExpression truncate(Expression... expressions) {
return compile(BuiltinFunctionName.TRUNCATE, expressions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum BuiltinFunctionName {
ROUND(FunctionName.of("round")),
SIGN(FunctionName.of("sign")),
SQRT(FunctionName.of("sqrt")),
CBRT(FunctionName.of("cbrt")),
TRUNCATE(FunctionName.of("truncate")),

ACOS(FunctionName.of("acos")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class OpenSearchFunctions {
*/
public void register(BuiltinFunctionRepository repository) {
repository.register(match_bool_prefix());
repository.register(match());
repository.register(match(BuiltinFunctionName.MATCH));
repository.register(match(BuiltinFunctionName.MATCHQUERY));
repository.register(match(BuiltinFunctionName.MATCH_QUERY));
repository.register(multi_match());
repository.register(simple_query_string());
repository.register(query());
Expand All @@ -45,8 +47,8 @@ private static FunctionResolver match_bool_prefix() {
return new RelevanceFunctionResolver(name, STRING);
}

private static FunctionResolver match() {
FunctionName funcName = BuiltinFunctionName.MATCH.getName();
private static FunctionResolver match(BuiltinFunctionName match) {
FunctionName funcName = match.getName();
return new RelevanceFunctionResolver(funcName, STRING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class MathematicalFunction {
*/
public static void register(BuiltinFunctionRepository repository) {
repository.register(abs());
repository.register(cbrt());
repository.register(ceil());
repository.register(ceiling());
repository.register(conv());
Expand Down Expand Up @@ -471,6 +472,20 @@ private static DefaultFunctionResolver sqrt() {
DOUBLE, type)).collect(Collectors.toList()));
}

/**
* Definition of cbrt(x) function.
* Calculate the cube root of a number x
* The supported signature is
* INTEGER/LONG/FLOAT/DOUBLE -> DOUBLE
*/
private static DefaultFunctionResolver cbrt() {
return FunctionDSL.define(BuiltinFunctionName.CBRT.getName(),
ExprCoreType.numberTypes().stream()
.map(type -> FunctionDSL.impl(FunctionDSL.nullMissingHandling(
v -> new ExprDoubleValue(Math.cbrt(v.doubleValue()))),
DOUBLE, type)).collect(Collectors.toList()));
}

/**
* Definition of truncate(x, d) function.
* Returns the number x, truncated to d decimal places
Expand Down
Loading

0 comments on commit 3a96e0b

Please sign in to comment.