diff --git a/eventmesh-filter/build.gradle b/eventmesh-function/build.gradle similarity index 92% rename from eventmesh-filter/build.gradle rename to eventmesh-function/build.gradle index ba88591b41..2944f98194 100644 --- a/eventmesh-filter/build.gradle +++ b/eventmesh-function/build.gradle @@ -14,8 +14,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - -dependencies { - implementation project(":eventmesh-common") -} diff --git a/eventmesh-transformer/build.gradle b/eventmesh-function/eventmesh-function-api/build.gradle similarity index 92% rename from eventmesh-transformer/build.gradle rename to eventmesh-function/eventmesh-function-api/build.gradle index ba88591b41..2944f98194 100644 --- a/eventmesh-transformer/build.gradle +++ b/eventmesh-function/eventmesh-function-api/build.gradle @@ -14,8 +14,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - -dependencies { - implementation project(":eventmesh-common") -} diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java new file mode 100644 index 0000000000..8cbb0f9381 --- /dev/null +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/AbstractEventMeshFunctionChain.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.api; + +import java.util.ArrayList; +import java.util.List; + +/** + * AbstractEventMeshFunctionChain is an abstract class that implements the {@link EventMeshFunction} interface and provides a framework + * for chaining multiple {@link EventMeshFunction} instances that operate on inputs of type {@code T} and produce outputs of type + * {@code R}. This class can be extended to create specific function chains with customized behavior for different + * data types. + * + *

The primary purpose of this class is to allow the sequential execution of functions, where the output of one + * function is passed as the input to the next function in the chain. The chain can be dynamically modified by adding + * functions either at the beginning or the end of the chain.

+ * + * @param the type of the input to the function + * @param the type of the result of the function + */ +public abstract class AbstractEventMeshFunctionChain implements EventMeshFunction { + + protected final List> functions; + + /** + * Default constructor that initializes an empty function chain. + */ + public AbstractEventMeshFunctionChain() { + this.functions = new ArrayList<>(); + } + + /** + * Constructor that initializes the function chain with a given list of functions. The functions will be executed + * in the order they are provided when the {@link #apply(Object)} method is called. + * + * @param functions the initial list of functions to be added to the chain + */ + public AbstractEventMeshFunctionChain(List> functions) { + this.functions = functions; + } + + /** + * Adds a {@link EventMeshFunction} to the beginning of the chain. The function will be executed first when the + * {@link #apply(Object)} method is called. + * + * @param function the function to be added to the beginning of the chain + */ + public void addFirst(EventMeshFunction function) { + this.functions.add(0, function); + } + + /** + * Adds a {@link EventMeshFunction} to the end of the chain. The function will be executed in sequence after all previously + * added functions when the {@link #apply(Object)} method is called. + * + * @param function the function to be added to the end of the chain + */ + public void addLast(EventMeshFunction function) { + this.functions.add(function); + } +} \ No newline at end of file diff --git a/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java new file mode 100644 index 0000000000..973f097ae0 --- /dev/null +++ b/eventmesh-function/eventmesh-function-api/src/main/java/org/apache/eventmesh/function/api/EventMeshFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.api; + +/** + * EventMesh Interface for a function that accepts one argument and produces a result. This is a functional interface whose functional method is + * {@link #apply(Object)}. + * + *

This interface is similar to {@link java.util.function.Function}, + * but it is specifically designed for use within the EventMesh. It allows defining custom functions to process data or events in the EventMesh. The + * main use case is to encapsulate operations that can be passed around and applied to data or event messages in the EventMesh processing + * pipeline.

+ * + * @param the type of the input to the function + * @param the type of the result of the function + */ +public interface EventMeshFunction { + + /** + * Applies this function to the given argument within the context of the EventMesh module. This method encapsulates the logic for processing the + * input data and producing a result, which can be used in the EventMesh event processing pipeline. + * + * @param t the function argument, representing the input data or event to be processed + * @return the function result, representing the processed output + */ + R apply(T t); + +} \ No newline at end of file diff --git a/eventmesh-function/eventmesh-function-filter/build.gradle b/eventmesh-function/eventmesh-function-filter/build.gradle new file mode 100644 index 0000000000..21e28d7baf --- /dev/null +++ b/eventmesh-function/eventmesh-function-filter/build.gradle @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + implementation project(":eventmesh-common") + implementation project(":eventmesh-function:eventmesh-function-api") +} \ No newline at end of file diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java similarity index 94% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java index 5a2493a371..acc2d5f073 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/PatternEntry.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.eventmesh.filter; +package org.apache.eventmesh.function.filter; -import org.apache.eventmesh.filter.condition.Condition; +import org.apache.eventmesh.function.filter.condition.Condition; import java.util.ArrayList; import java.util.List; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java index 2d58136a70..d4f209225e 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/AnythingButCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/AnythingButCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import java.util.ArrayList; import java.util.Iterator; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java similarity index 94% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java index fbb4276c7b..9890d5e0d3 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/Condition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/Condition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java index 4e207663aa..961be85e5b 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ConditionsBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ConditionsBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java index 53c15bb297..c085ba6585 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/ExistsCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/ExistsCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java similarity index 97% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java index 5eb5374c7c..40eb16a75e 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/NumericCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/NumericCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import java.util.ArrayList; import java.util.List; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java index 633ed1fb02..ff5d0313ce 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/PrefixCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/PrefixCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java index f9cc3fb5db..9eefb6b641 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SpecifiedCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SpecifiedCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java similarity index 95% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java index 805df0ee17..090df24834 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/condition/SuffixCondition.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/condition/SuffixCondition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.condition; +package org.apache.eventmesh.function.filter.condition; import com.fasterxml.jackson.databind.JsonNode; diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java similarity index 75% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java index 8abb306b84..955d9f59ef 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/pattern/Pattern.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/pattern/Pattern.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.pattern; +package org.apache.eventmesh.function.filter.pattern; import org.apache.eventmesh.common.utils.JsonPathUtils; -import org.apache.eventmesh.filter.PatternEntry; +import org.apache.eventmesh.function.api.EventMeshFunction; +import org.apache.eventmesh.function.filter.PatternEntry; import org.apache.commons.lang3.StringUtils; @@ -29,12 +30,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.jayway.jsonpath.PathNotFoundException; -public class Pattern { - private List requiredFieldList = new ArrayList<>(); - private List dataList = new ArrayList<>(); +public class Pattern implements EventMeshFunction { - private String content; + private final List requiredFieldList = new ArrayList<>(); + private final List dataList = new ArrayList<>(); public void addRequiredFieldList(PatternEntry patternEntry) { this.requiredFieldList.add(patternEntry); @@ -45,19 +45,22 @@ public void addDataList(PatternEntry patternEntry) { } public boolean filter(String content) { - this.content = content; - // this.jsonNode = JacksonUtils.STRING_TO_JSONNODE(content); + return matchRequiredFieldList(content, requiredFieldList) && matchRequiredFieldList(content, dataList); + } - return matchRequiredFieldList(requiredFieldList) && matchRequiredFieldList(dataList); + @Override + public String apply(String content) { + // filter content + return filter(content) ? content : null; } - private boolean matchRequiredFieldList(List dataList) { + private boolean matchRequiredFieldList(String content, List dataList) { for (final PatternEntry patternEntry : dataList) { JsonNode jsonElement = null; try { // content:filter - String matchRes = JsonPathUtils.matchJsonPathValue(this.content, patternEntry.getPatternPath()); + String matchRes = JsonPathUtils.matchJsonPathValue(content, patternEntry.getPatternPath()); if (StringUtils.isNoneBlank(matchRes)) { jsonElement = JsonPathUtils.parseStrict(matchRes); diff --git a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java similarity index 85% rename from eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java rename to eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java index 5f9a71d262..60193a4efa 100644 --- a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java +++ b/eventmesh-function/eventmesh-function-filter/src/main/java/org/apache/eventmesh/function/filter/patternbuild/PatternBuilder.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.eventmesh.filter.patternbuild; +package org.apache.eventmesh.function.filter.patternbuild; import org.apache.eventmesh.common.exception.JsonException; -import org.apache.eventmesh.filter.PatternEntry; -import org.apache.eventmesh.filter.condition.Condition; -import org.apache.eventmesh.filter.condition.ConditionsBuilder; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.PatternEntry; +import org.apache.eventmesh.function.filter.condition.Condition; +import org.apache.eventmesh.function.filter.condition.ConditionsBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; import java.util.ArrayDeque; import java.util.Iterator; @@ -38,19 +38,33 @@ public class PatternBuilder { private static final ObjectMapper mapper = new ObjectMapper(); - public static Pattern build(String jsonStr) { - Pattern pattern = new Pattern(); - JsonNode jsonNode = null; + public static Pattern build(String jsonStr) { try { - jsonNode = mapper.readTree(jsonStr); + JsonNode jsonNode = mapper.readTree(jsonStr); + if (jsonNode.isEmpty() || !jsonNode.isObject()) { + return null; + } + return build(jsonNode); } catch (Exception e) { throw new JsonException("INVALID_JSON_STRING", e); } + } - if (jsonNode.isEmpty() || !jsonNode.isObject()) { - return null; + public static Pattern build(Map conditionMap) { + try { + JsonNode jsonNode = mapper.valueToTree(conditionMap); + if (jsonNode.isEmpty() || !jsonNode.isObject()) { + return null; + } + return build(jsonNode); + } catch (Exception e) { + throw new JsonException("INVALID_MAP", e); } + } + + public static Pattern build(JsonNode jsonNode) { + Pattern pattern = new Pattern(); // iter all json data Iterator> iterator = jsonNode.fields(); diff --git a/eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java similarity index 82% rename from eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java rename to eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java index 207992b0c1..bc0aeff4ea 100644 --- a/eventmesh-filter/src/test/java/org/apache/eventmesh/filter/PatternTest.java +++ b/eventmesh-function/eventmesh-function-filter/src/test/java/org/apache/eventmesh/function/filter/PatternTest.java @@ -15,10 +15,15 @@ * limitations under the License. */ -package org.apache.eventmesh.filter; +package org.apache.eventmesh.function.filter; -import org.apache.eventmesh.filter.pattern.Pattern; -import org.apache.eventmesh.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -144,4 +149,20 @@ public void testAnythingButFilter() { Assertions.assertEquals(false, res); } + @Test + public void testPrefixFilterMap() { + // Create the inner Map representing {prefix=eventmesh.} + Map innerMap = new HashMap<>(); + innerMap.put("prefix", "eventmesh."); + // Create a List representing [{prefix=eventmesh.}] + List> sourceList = Collections.singletonList(innerMap); + // Create the condition representing {source=[{prefix=eventmesh.}]} + Map condition = new HashMap<>(); + condition.put("source", sourceList); + + Pattern pattern = PatternBuilder.build(condition); + Boolean res = pattern.filter(event); + Assertions.assertEquals(true, res); + } + } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java b/eventmesh-function/eventmesh-function-transformer/build.gradle similarity index 70% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java rename to eventmesh-function/eventmesh-function-transformer/build.gradle index 8239dfcb6e..6939bbd483 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Transformer.java +++ b/eventmesh-function/eventmesh-function-transformer/build.gradle @@ -15,18 +15,8 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; - -import com.fasterxml.jackson.core.JsonProcessingException; - -/** - * EventMesh transformer interface, specified transformer implementation includes: - * 1. Constant - * 2. Original - * 3. Template - */ -public interface Transformer { - - String transform(String json) throws JsonProcessingException; +dependencies { + implementation project(":eventmesh-common") + implementation project(":eventmesh-function:eventmesh-function-api") } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java similarity index 95% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java index dd7c20aace..ae77f149f7 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/ConstantTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/ConstantTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class ConstantTransformer implements Transformer { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java similarity index 85% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java index a0ebde12d2..c578310dc4 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/JsonPathParser.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import org.apache.eventmesh.common.utils.JsonPathUtils; @@ -35,6 +35,19 @@ public List getVariablesList() { return variablesList; } + /** + * parser input jsonpath map into variable list + * + * @param jsonPathMap jsonpath map + */ + public JsonPathParser(Map jsonPathMap) { + for (Map.Entry entry : jsonPathMap.entrySet()) { + String name = entry.getKey(); + String value = entry.getValue(); + variablesList.add(new Variable(name, value)); + } + } + /** * parser input jsonpath string into variable list * diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java similarity index 94% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java index 61aa059d59..59ce0350eb 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/OriginalTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/OriginalTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; class OriginalTransformer implements Transformer { @@ -23,4 +23,5 @@ class OriginalTransformer implements Transformer { public String transform(String json) { return json; } + } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java index 19c3b5cec3..29d975c371 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Template.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import org.apache.commons.text.StringSubstitutor; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java index bc9907ff48..69cee68269 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TemplateTransformer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import java.util.List; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java similarity index 95% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java index 1b11a29d80..aeb827fc88 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformException.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; /** * Transform exception diff --git a/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java new file mode 100644 index 0000000000..be0e815808 --- /dev/null +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Transformer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.function.transformer; + +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.function.api.EventMeshFunction; + +import com.fasterxml.jackson.core.JsonProcessingException; + +/** + * EventMesh transformer interface, specified transformer implementation includes: + * 1. Constant + * 2. Original + * 3. Template + */ +public interface Transformer extends EventMeshFunction { + + String transform(String json) throws JsonProcessingException; + + @Override + default String apply(String content) { + try { + return transform(content); + } catch (JsonProcessingException e) { + throw new EventMeshException("Failed to transform content", e); + } + } + +} diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java similarity index 69% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java index e7277af73c..916f1ef7bc 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerBuilder.java @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; + +import java.util.Map; public class TransformerBuilder { @@ -32,9 +34,23 @@ public static Transformer buildTransformer(TransformerParam transformerParam) { } } - public static Transformer buildTemplateTransFormer(String jsonContent, String template) { - JsonPathParser jsonPathParser = new JsonPathParser(jsonContent); + /** + * build template transformer + * @param jsonContent json content, support string and map, other type will throw IllegalArgumentException + * @param template template string + * @return transformer + */ + @SuppressWarnings("unchecked") + public static Transformer buildTemplateTransFormer(Object jsonContent, String template) { Template templateEntry = new Template(template); + JsonPathParser jsonPathParser; + if (jsonContent instanceof String) { + jsonPathParser = new JsonPathParser((String) jsonContent); + } else if (jsonContent instanceof Map) { + jsonPathParser = new JsonPathParser((Map) jsonContent); + } else { + throw new TransformException("invalid json content"); + } return new TemplateTransformer(jsonPathParser, templateEntry); } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java similarity index 97% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java index d747d7be4c..915111e01d 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerParam.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class TransformerParam { diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java similarity index 97% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java index 2dc7809478..969c49ce80 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/TransformerType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; import java.util.Objects; diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java similarity index 96% rename from eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java rename to eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java index c9259d335c..aee80e1454 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java +++ b/eventmesh-function/eventmesh-function-transformer/src/main/java/org/apache/eventmesh/function/transformer/Variable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; public class Variable { diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java similarity index 88% rename from eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java rename to eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java index a55cde0baf..f9a444e8f9 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-function/eventmesh-function-transformer/src/test/java/org/apache/eventmesh/function/transformer/TransformTest.java @@ -15,7 +15,10 @@ * limitations under the License. */ -package org.apache.eventmesh.transformer; +package org.apache.eventmesh.function.transformer; + +import java.util.Collections; +import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -139,4 +142,19 @@ public void testTemplateTransFormerWithConstant() throws JsonProcessingException output); } + @Test + public void testTemplateTransFormerWithStringValueMap() throws JsonProcessingException { + Map content = Collections.singletonMap("data-name", "$.data.name"); + + String template = "Transformers test:data name is ${data-name}"; + Transformer transform = TransformerBuilder.buildTemplateTransFormer(content, template); + String output = transform.transform(EVENT); + Assertions.assertEquals("Transformers test:data name is test-transformer", output); + + Transformer transformer1 = TransformerBuilder.buildTemplateTransFormer(content, template); + String output1 = transformer1.transform(EVENT); + Assertions.assertEquals("Transformers test:data name is test-transformer", output1); + + } + } diff --git a/eventmesh-runtime-v2/build.gradle b/eventmesh-runtime-v2/build.gradle index 04b460ade3..74b9759b10 100644 --- a/eventmesh-runtime-v2/build.gradle +++ b/eventmesh-runtime-v2/build.gradle @@ -36,6 +36,9 @@ dependencies { implementation project(":eventmesh-common") implementation project(":eventmesh-connectors:eventmesh-connector-canal") implementation project(":eventmesh-connectors:eventmesh-connector-http") + implementation project(":eventmesh-function:eventmesh-function-api") + implementation project(":eventmesh-function:eventmesh-function-filter") + implementation project(":eventmesh-function:eventmesh-function-transformer") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-registry:eventmesh-registry-api") diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java index 66ba0a0c3d..4a68001909 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntime.java @@ -17,22 +17,487 @@ package org.apache.eventmesh.runtime.function; +import org.apache.eventmesh.common.ThreadPoolFactory; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.config.connector.SinkConfig; +import org.apache.eventmesh.common.config.connector.SourceConfig; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceBlockingStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.JobType; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.function.api.AbstractEventMeshFunctionChain; +import org.apache.eventmesh.function.api.EventMeshFunction; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.transformer.Transformer; +import org.apache.eventmesh.function.transformer.TransformerBuilder; +import org.apache.eventmesh.function.transformer.TransformerType; +import org.apache.eventmesh.openconnect.api.ConnectorCreateService; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext; +import org.apache.eventmesh.openconnect.api.factory.ConnectorPluginFactory; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.util.ConfigUtil; import org.apache.eventmesh.runtime.Runtime; +import org.apache.eventmesh.runtime.RuntimeInstanceConfig; +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class FunctionRuntime implements Runtime { + private final RuntimeInstanceConfig runtimeInstanceConfig; + + private ManagedChannel channel; + + private AdminServiceStub adminServiceStub; + + private AdminServiceBlockingStub adminServiceBlockingStub; + + StreamObserver responseObserver; + + StreamObserver requestObserver; + + private final LinkedBlockingQueue queue; + + private FunctionRuntimeConfig functionRuntimeConfig; + + private AbstractEventMeshFunctionChain functionChain; + + private Sink sinkConnector; + + private Source sourceConnector; + + private final ExecutorService sourceService = ThreadPoolFactory.createSingleExecutor("eventMesh-sourceService"); + + private final ExecutorService sinkService = ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService"); + + private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(); + + private volatile boolean isRunning = false; + + private volatile boolean isFailed = false; + + private String adminServerAddr; + + + public FunctionRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { + this.runtimeInstanceConfig = runtimeInstanceConfig; + this.queue = new LinkedBlockingQueue<>(1000); + } + + @Override public void init() throws Exception { + // load function runtime config from local file + this.functionRuntimeConfig = ConfigService.getInstance().buildConfigInstance(FunctionRuntimeConfig.class); + + // init admin service + initAdminService(); + + // get remote config from admin service and update local config + getAndUpdateRemoteConfig(); + + // init connector service + initConnectorService(); + + // report status to admin server + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.INIT); + } + + private void initAdminService() { + adminServerAddr = getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr()); + // create gRPC channel + channel = ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build(); + + adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady(); + + adminServiceBlockingStub = AdminServiceGrpc.newBlockingStub(channel).withWaitForReady(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.info("runtime receive message: {} ", response); + } + + @Override + public void onError(Throwable t) { + log.error("runtime receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("runtime finished receive message and completed"); + } + }; + + requestObserver = adminServiceStub.invokeBiStream(responseObserver); + } + + private String getRandomAdminServerAddr(String adminServerAddrList) { + String[] addresses = adminServerAddrList.split(";"); + if (addresses.length == 0) { + throw new IllegalArgumentException("Admin server address list is empty"); + } + Random random = new Random(); + int randomIndex = random.nextInt(addresses.length); + return addresses[randomIndex]; + } + + private void getAndUpdateRemoteConfig() { + String jobId = functionRuntimeConfig.getJobID(); + FetchJobRequest jobRequest = new FetchJobRequest(); + jobRequest.setJobID(jobId); + + Metadata metadata = Metadata.newBuilder().setType(FetchJobRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(jobRequest)))).build()) + .build(); + Payload response = adminServiceBlockingStub.invoke(request); + FetchJobResponse jobResponse = null; + if (response.getMetadata().getType().equals(FetchJobResponse.class.getSimpleName())) { + jobResponse = JsonUtils.parseObject(response.getBody().getValue().toStringUtf8(), FetchJobResponse.class); + } + + if (jobResponse == null || jobResponse.getErrorCode() != ErrorCode.SUCCESS) { + if (jobResponse != null) { + log.error("Failed to get remote config from admin server. ErrorCode: {}, Response: {}", + jobResponse.getErrorCode(), jobResponse); + } else { + log.error("Failed to get remote config from admin server. "); + } + isFailed = true; + try { + stop(); + } catch (Exception e) { + log.error("Failed to stop after exception", e); + } + throw new RuntimeException("Failed to get remote config from admin server."); + } + + // update local config + // source + functionRuntimeConfig.setSourceConnectorType(jobResponse.getTransportType().getSrc().getName()); + functionRuntimeConfig.setSourceConnectorDesc(jobResponse.getConnectorConfig().getSourceConnectorDesc()); + functionRuntimeConfig.setSourceConnectorConfig(jobResponse.getConnectorConfig().getSourceConnectorConfig()); + + // sink + functionRuntimeConfig.setSinkConnectorType(jobResponse.getTransportType().getDst().getName()); + functionRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc()); + functionRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig()); + + // TODO: update functionConfigs } + + private void initConnectorService() throws Exception { + final JobType jobType = (JobType) functionRuntimeConfig.getRuntimeConfig().get("jobType"); + + // create sink connector + ConnectorCreateService sinkConnectorCreateService = + ConnectorPluginFactory.createConnector(functionRuntimeConfig.getSinkConnectorType() + "-Sink"); + this.sinkConnector = (Sink) sinkConnectorCreateService.create(); + + // parse sink config and init sink connector + SinkConfig sinkConfig = (SinkConfig) ConfigUtil.parse(functionRuntimeConfig.getSinkConnectorConfig(), sinkConnector.configClass()); + SinkConnectorContext sinkConnectorContext = new SinkConnectorContext(); + sinkConnectorContext.setSinkConfig(sinkConfig); + sinkConnectorContext.setRuntimeConfig(functionRuntimeConfig.getRuntimeConfig()); + sinkConnectorContext.setJobType(jobType); + sinkConnector.init(sinkConnectorContext); + + // create source connector + ConnectorCreateService sourceConnectorCreateService = + ConnectorPluginFactory.createConnector(functionRuntimeConfig.getSourceConnectorType() + "-Source"); + this.sourceConnector = (Source) sourceConnectorCreateService.create(); + + // parse source config and init source connector + SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(functionRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass()); + SourceConnectorContext sourceConnectorContext = new SourceConnectorContext(); + sourceConnectorContext.setSourceConfig(sourceConfig); + sourceConnectorContext.setRuntimeConfig(functionRuntimeConfig.getRuntimeConfig()); + sourceConnectorContext.setJobType(jobType); + + sourceConnector.init(sourceConnectorContext); + } + + private void reportJobRequest(String jobId, JobState jobState) { + ReportJobRequest reportJobRequest = new ReportJobRequest(); + reportJobRequest.setJobID(jobId); + reportJobRequest.setState(jobState); + Metadata metadata = Metadata.newBuilder() + .setType(ReportJobRequest.class.getSimpleName()) + .build(); + Payload payload = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest)))) + .build()) + .build(); + requestObserver.onNext(payload); + } + + @Override public void start() throws Exception { + this.isRunning = true; + + // build function chain + this.functionChain = buildFunctionChain(functionRuntimeConfig.getFunctionConfigs()); + // start heart beat + this.heartBeatExecutor.scheduleAtFixedRate(() -> { + + ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest(); + heartBeat.setAddress(IPUtils.getLocalAddress()); + heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis())); + heartBeat.setJobID(functionRuntimeConfig.getJobID()); + + Metadata metadata = Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build()) + .build(); + + requestObserver.onNext(request); + }, 5, 5, TimeUnit.SECONDS); + + // start sink service + this.sinkService.execute(() -> { + try { + startSinkConnector(); + } catch (Exception e) { + isFailed = true; + log.error("Sink Connector [{}] failed to start.", sinkConnector.name(), e); + try { + this.stop(); + } catch (Exception ex) { + log.error("Failed to stop after exception", ex); + } + throw new RuntimeException(e); + } + }); + + // start source service + this.sourceService.execute(() -> { + try { + startSourceConnector(); + } catch (Exception e) { + isFailed = true; + log.error("Source Connector [{}] failed to start.", sourceConnector.name(), e); + try { + this.stop(); + } catch (Exception ex) { + log.error("Failed to stop after exception", ex); + } + throw new RuntimeException(e); + } + }); + + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.RUNNING); } + private StringEventMeshFunctionChain buildFunctionChain(List> functionConfigs) { + StringEventMeshFunctionChain functionChain = new StringEventMeshFunctionChain(); + + // build function chain + for (Map functionConfig : functionConfigs) { + String functionType = String.valueOf(functionConfig.getOrDefault("functionType", "")); + if (StringUtils.isEmpty(functionType)) { + throw new IllegalArgumentException("'functionType' is required for function"); + } + + // build function based on functionType + EventMeshFunction function; + switch (functionType) { + case "filter": + function = buildFilter(functionConfig); + break; + case "transformer": + function = buildTransformer(functionConfig); + break; + default: + throw new IllegalArgumentException( + "Invalid functionType: '" + functionType + "'. Supported functionType: 'filter', 'transformer'"); + } + + // add function to functionChain + functionChain.addLast(function); + } + + return functionChain; + } + + + @SuppressWarnings("unchecked") + private Pattern buildFilter(Map functionConfig) { + // get condition from attributes + Object condition = functionConfig.get("condition"); + if (condition == null) { + throw new IllegalArgumentException("'condition' is required for filter function"); + } + if (condition instanceof String) { + return PatternBuilder.build(String.valueOf(condition)); + } else if (condition instanceof Map) { + return PatternBuilder.build((Map) condition); + } else { + throw new IllegalArgumentException("Invalid condition"); + } + } + + private Transformer buildTransformer(Map functionConfig) { + // get transformerType from attributes + String transformerTypeStr = String.valueOf(functionConfig.getOrDefault("transformerType", "")).toLowerCase(); + TransformerType transformerType = TransformerType.getItem(transformerTypeStr); + if (transformerType == null) { + throw new IllegalArgumentException( + "Invalid transformerType: '" + transformerTypeStr + + "'. Supported transformerType: 'constant', 'template', 'original' (case insensitive)"); + } + + // build transformer + Transformer transformer = null; + + switch (transformerType) { + case CONSTANT: + // check value + String content = String.valueOf(functionConfig.getOrDefault("content", "")); + if (StringUtils.isEmpty(content)) { + throw new IllegalArgumentException("'content' is required for constant transformer"); + } + transformer = TransformerBuilder.buildConstantTransformer(content); + break; + case TEMPLATE: + // check value and template + Object valueMap = functionConfig.get("valueMap"); + String template = String.valueOf(functionConfig.getOrDefault("template", "")); + if (valueMap == null || StringUtils.isEmpty(template)) { + throw new IllegalArgumentException("'valueMap' and 'template' are required for template transformer"); + } + transformer = TransformerBuilder.buildTemplateTransFormer(valueMap, template); + break; + case ORIGINAL: + // ORIGINAL transformer does not need any parameter + break; + default: + throw new IllegalArgumentException( + "Invalid transformerType: '" + transformerType + "', supported transformerType: 'CONSTANT', 'TEMPLATE', 'ORIGINAL'"); + } + + return transformer; + } + + + private void startSinkConnector() throws Exception { + // start sink connector + this.sinkConnector.start(); + + // try to get data from queue and send it. + while (this.isRunning) { + ConnectRecord connectRecord = null; + try { + connectRecord = queue.poll(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Failed to poll data from queue.", e); + Thread.currentThread().interrupt(); + } + + // send data if not null + if (connectRecord != null) { + sinkConnector.put(Collections.singletonList(connectRecord)); + } + } + } + + private void startSourceConnector() throws Exception { + // start source connector + this.sourceConnector.start(); + + // try to get data from source connector and handle it. + while (this.isRunning) { + List connectorRecordList = sourceConnector.poll(); + + // handle data + if (connectorRecordList != null && !connectorRecordList.isEmpty()) { + for (ConnectRecord connectRecord : connectorRecordList) { + if (connectRecord == null || connectRecord.getData() == null) { + // If data is null, just put it into queue. + this.queue.put(connectRecord); + } else { + // Apply function chain to data + String data = functionChain.apply((String) connectRecord.getData()); + if (data != null) { + if (log.isDebugEnabled()) { + log.debug("Function chain applied. Original data: {}, Transformed data: {}", connectRecord.getData(), data); + } + connectRecord.setData(data); + this.queue.put(connectRecord); + } else if (log.isDebugEnabled()) { + log.debug("Data filtered out by function chain. Original data: {}", connectRecord.getData()); + } + } + } + } + } + } + + @Override public void stop() throws Exception { + log.info("FunctionRuntime is stopping..."); + + isRunning = false; + + if (isFailed) { + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.FAIL); + } else { + reportJobRequest(functionRuntimeConfig.getJobID(), JobState.COMPLETE); + } + + sinkConnector.stop(); + sourceConnector.stop(); + sinkService.shutdown(); + sourceService.shutdown(); + heartBeatExecutor.shutdown(); + + requestObserver.onCompleted(); + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } + log.info("FunctionRuntime stopped."); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java index 40aec65e99..4d57c83e82 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeConfig.java @@ -17,5 +17,40 @@ package org.apache.eventmesh.runtime.function; +import org.apache.eventmesh.common.config.Config; + +import java.util.List; +import java.util.Map; + + +import lombok.Data; + +@Data +@Config(path = "classPath://function.yaml") public class FunctionRuntimeConfig { + + private String functionRuntimeInstanceId; + + private String taskID; + + private String jobID; + + private String region; + + private Map runtimeConfig; + + private String sourceConnectorType; + + private String sourceConnectorDesc; + + private Map sourceConnectorConfig; + + private String sinkConnectorType; + + private String sinkConnectorDesc; + + private Map sinkConnectorConfig; + + private List> functionConfigs; + } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java index 3ba91986cb..40346e272f 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/FunctionRuntimeFactory.java @@ -30,7 +30,7 @@ public void init() throws Exception { @Override public Runtime createRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { - return null; + return new FunctionRuntime(runtimeInstanceConfig); } @Override diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java new file mode 100644 index 0000000000..0035999ecb --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/function/StringEventMeshFunctionChain.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.function; + +import org.apache.eventmesh.function.api.AbstractEventMeshFunctionChain; +import org.apache.eventmesh.function.api.EventMeshFunction; + +/** + * ConnectRecord Function Chain. + */ +public class StringEventMeshFunctionChain extends AbstractEventMeshFunctionChain { + + @Override + public String apply(String content) { + for (EventMeshFunction function : functions) { + if (content == null) { + break; + } + content = function.apply(content); + } + return content; + } +} diff --git a/eventmesh-runtime-v2/src/main/resources/function.yaml b/eventmesh-runtime-v2/src/main/resources/function.yaml new file mode 100644 index 0000000000..eae2b063ec --- /dev/null +++ b/eventmesh-runtime-v2/src/main/resources/function.yaml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +taskID: c6233632-ab9a-4aba-904f-9d22fba6aa74 +jobID: 8190fe5b-1f9b-4815-8983-2467e76edbf0 +region: region1 + diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 95924faad4..b016e18bfe 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -36,8 +36,10 @@ dependencies { implementation "commons-validator:commons-validator" implementation project(":eventmesh-common") - implementation project(":eventmesh-filter") implementation project(":eventmesh-spi") + implementation project(":eventmesh-function:eventmesh-function-api") + implementation project(":eventmesh-function:eventmesh-function-filter") + implementation project(":eventmesh-function:eventmesh-function-transformer") implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") implementation project(":eventmesh-storage-plugin:eventmesh-storage-standalone") implementation project(":eventmesh-storage-plugin:eventmesh-storage-rocketmq") @@ -45,7 +47,6 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic") implementation project(":eventmesh-security-plugin:eventmesh-security-auth-token") - implementation project(":eventmesh-transformer") implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-meta:eventmesh-meta-nacos") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java index bf6eb9dadc..14677dc690 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java @@ -19,8 +19,8 @@ import org.apache.eventmesh.api.meta.MetaServiceListener; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.filter.pattern.Pattern; -import org.apache.eventmesh.filter.patternbuild.PatternBuilder; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.patternbuild.PatternBuilder; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java index 551bcb2799..1d2f8ca30c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java @@ -19,14 +19,14 @@ import org.apache.eventmesh.api.meta.MetaServiceListener; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.function.transformer.Transformer; +import org.apache.eventmesh.function.transformer.TransformerBuilder; +import org.apache.eventmesh.function.transformer.TransformerParam; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager; import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager; import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer; import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager; import org.apache.eventmesh.runtime.meta.MetaStorage; -import org.apache.eventmesh.transformer.Transformer; -import org.apache.eventmesh.transformer.TransformerBuilder; -import org.apache.eventmesh.transformer.TransformerParam; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java index b30238a28c..0e41d827ab 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java @@ -31,7 +31,8 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.transformer.Transformer; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.acl.Acl; @@ -44,7 +45,6 @@ import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.RemotingHelper; import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants; -import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index be95971536..69506ede8a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -30,14 +30,14 @@ import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.common.utils.RandomStringUtils; -import org.apache.eventmesh.filter.pattern.Pattern; +import org.apache.eventmesh.function.filter.pattern.Pattern; +import org.apache.eventmesh.function.transformer.Transformer; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.ProtocolPluginFactory; import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext; import org.apache.eventmesh.runtime.util.EventMeshUtil; import org.apache.eventmesh.runtime.util.WebhookUtil; -import org.apache.eventmesh.transformer.Transformer; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; diff --git a/settings.gradle b/settings.gradle index a22363c809..c49a5c4242 100644 --- a/settings.gradle +++ b/settings.gradle @@ -47,8 +47,6 @@ include 'eventmesh-common' include 'eventmesh-starter' include 'eventmesh-examples' include 'eventmesh-spi' -include 'eventmesh-filter' -include 'eventmesh-transformer' include 'eventmesh-openconnect:eventmesh-openconnect-java' include 'eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api' @@ -133,3 +131,8 @@ include 'eventmesh-registry' include 'eventmesh-registry:eventmesh-registry-api' include 'eventmesh-registry:eventmesh-registry-nacos' +include 'eventmesh-function' +include 'eventmesh-function:eventmesh-function-api' +include 'eventmesh-function:eventmesh-function-filter' +include 'eventmesh-function:eventmesh-function-transformer' +