Skip to content

Commit

Permalink
[ISSUE #5108] Abstracting and transforming EventMeshFunction, and imp…
Browse files Browse the repository at this point in the history
…lementing FunctionRuntime. (#5109)

* feat: Unified function module

* feat: update something

* feat: update FunctionRuntime

* feat: update FunctionRuntime
  • Loading branch information
cnzakii authored Oct 28, 2024
1 parent fe3d56b commit 98fbf62
Show file tree
Hide file tree
Showing 42 changed files with 901 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


dependencies {
implementation project(":eventmesh-common")
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


dependencies {
implementation project(":eventmesh-common")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.function.api;

import java.util.ArrayList;
import java.util.List;

/**
* AbstractEventMeshFunctionChain is an abstract class that implements the {@link EventMeshFunction} interface and provides a framework
* for chaining multiple {@link EventMeshFunction} instances that operate on inputs of type {@code T} and produce outputs of type
* {@code R}. This class can be extended to create specific function chains with customized behavior for different
* data types.
*
* <p>The primary purpose of this class is to allow the sequential execution of functions, where the output of one
* function is passed as the input to the next function in the chain. The chain can be dynamically modified by adding
* functions either at the beginning or the end of the chain.</p>
*
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
*/
public abstract class AbstractEventMeshFunctionChain<T, R> implements EventMeshFunction<T, R> {

protected final List<EventMeshFunction<T, R>> functions;

/**
* Default constructor that initializes an empty function chain.
*/
public AbstractEventMeshFunctionChain() {
this.functions = new ArrayList<>();
}

/**
* Constructor that initializes the function chain with a given list of functions. The functions will be executed
* in the order they are provided when the {@link #apply(Object)} method is called.
*
* @param functions the initial list of functions to be added to the chain
*/
public AbstractEventMeshFunctionChain(List<EventMeshFunction<T, R>> functions) {
this.functions = functions;
}

/**
* Adds a {@link EventMeshFunction} to the beginning of the chain. The function will be executed first when the
* {@link #apply(Object)} method is called.
*
* @param function the function to be added to the beginning of the chain
*/
public void addFirst(EventMeshFunction<T, R> function) {
this.functions.add(0, function);
}

/**
* Adds a {@link EventMeshFunction} to the end of the chain. The function will be executed in sequence after all previously
* added functions when the {@link #apply(Object)} method is called.
*
* @param function the function to be added to the end of the chain
*/
public void addLast(EventMeshFunction<T, R> function) {
this.functions.add(function);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.function.api;

/**
* EventMesh Interface for a function that accepts one argument and produces a result. This is a functional interface whose functional method is
* {@link #apply(Object)}.
*
* <p>This interface is similar to {@link java.util.function.Function},
* but it is specifically designed for use within the EventMesh. It allows defining custom functions to process data or events in the EventMesh. The
* main use case is to encapsulate operations that can be passed around and applied to data or event messages in the EventMesh processing
* pipeline.</p>
*
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
*/
public interface EventMeshFunction<T, R> {

/**
* Applies this function to the given argument within the context of the EventMesh module. This method encapsulates the logic for processing the
* input data and producing a result, which can be used in the EventMesh event processing pipeline.
*
* @param t the function argument, representing the input data or event to be processed
* @return the function result, representing the processed output
*/
R apply(T t);

}
21 changes: 21 additions & 0 deletions eventmesh-function/eventmesh-function-filter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-function:eventmesh-function-api")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter;
package org.apache.eventmesh.function.filter;

import org.apache.eventmesh.filter.condition.Condition;
import org.apache.eventmesh.function.filter.condition.Condition;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.condition;
package org.apache.eventmesh.function.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.pattern;
package org.apache.eventmesh.function.filter.pattern;

import org.apache.eventmesh.common.utils.JsonPathUtils;
import org.apache.eventmesh.filter.PatternEntry;
import org.apache.eventmesh.function.api.EventMeshFunction;
import org.apache.eventmesh.function.filter.PatternEntry;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -29,12 +30,11 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.PathNotFoundException;

public class Pattern {

private List<PatternEntry> requiredFieldList = new ArrayList<>();
private List<PatternEntry> dataList = new ArrayList<>();
public class Pattern implements EventMeshFunction<String, String> {

private String content;
private final List<PatternEntry> requiredFieldList = new ArrayList<>();
private final List<PatternEntry> dataList = new ArrayList<>();

public void addRequiredFieldList(PatternEntry patternEntry) {
this.requiredFieldList.add(patternEntry);
Expand All @@ -45,19 +45,22 @@ public void addDataList(PatternEntry patternEntry) {
}

public boolean filter(String content) {
this.content = content;
// this.jsonNode = JacksonUtils.STRING_TO_JSONNODE(content);
return matchRequiredFieldList(content, requiredFieldList) && matchRequiredFieldList(content, dataList);
}

return matchRequiredFieldList(requiredFieldList) && matchRequiredFieldList(dataList);
@Override
public String apply(String content) {
// filter content
return filter(content) ? content : null;
}

private boolean matchRequiredFieldList(List<PatternEntry> dataList) {
private boolean matchRequiredFieldList(String content, List<PatternEntry> dataList) {

for (final PatternEntry patternEntry : dataList) {
JsonNode jsonElement = null;
try {
// content:filter
String matchRes = JsonPathUtils.matchJsonPathValue(this.content, patternEntry.getPatternPath());
String matchRes = JsonPathUtils.matchJsonPathValue(content, patternEntry.getPatternPath());

if (StringUtils.isNoneBlank(matchRes)) {
jsonElement = JsonPathUtils.parseStrict(matchRes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.eventmesh.filter.patternbuild;
package org.apache.eventmesh.function.filter.patternbuild;

import org.apache.eventmesh.common.exception.JsonException;
import org.apache.eventmesh.filter.PatternEntry;
import org.apache.eventmesh.filter.condition.Condition;
import org.apache.eventmesh.filter.condition.ConditionsBuilder;
import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.function.filter.PatternEntry;
import org.apache.eventmesh.function.filter.condition.Condition;
import org.apache.eventmesh.function.filter.condition.ConditionsBuilder;
import org.apache.eventmesh.function.filter.pattern.Pattern;

import java.util.ArrayDeque;
import java.util.Iterator;
Expand All @@ -38,19 +38,33 @@ public class PatternBuilder {

private static final ObjectMapper mapper = new ObjectMapper();

public static Pattern build(String jsonStr) {

Pattern pattern = new Pattern();
JsonNode jsonNode = null;
public static Pattern build(String jsonStr) {
try {
jsonNode = mapper.readTree(jsonStr);
JsonNode jsonNode = mapper.readTree(jsonStr);
if (jsonNode.isEmpty() || !jsonNode.isObject()) {
return null;
}
return build(jsonNode);
} catch (Exception e) {
throw new JsonException("INVALID_JSON_STRING", e);
}
}

if (jsonNode.isEmpty() || !jsonNode.isObject()) {
return null;
public static Pattern build(Map<String, Object> conditionMap) {
try {
JsonNode jsonNode = mapper.valueToTree(conditionMap);
if (jsonNode.isEmpty() || !jsonNode.isObject()) {
return null;
}
return build(jsonNode);
} catch (Exception e) {
throw new JsonException("INVALID_MAP", e);
}
}

public static Pattern build(JsonNode jsonNode) {
Pattern pattern = new Pattern();

// iter all json data
Iterator<Entry<String, JsonNode>> iterator = jsonNode.fields();
Expand Down
Loading

0 comments on commit 98fbf62

Please sign in to comment.