diff --git a/protocol/kafka_confluent/v2/message.go b/protocol/kafka_confluent/v2/message.go index 164879a11..43df8d4ff 100644 --- a/protocol/kafka_confluent/v2/message.go +++ b/protocol/kafka_confluent/v2/message.go @@ -11,14 +11,15 @@ import ( "strconv" "strings" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" "github.com/cloudevents/sdk-go/v2/binding/spec" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) const ( - prefix = "ce-" + prefix = "ce_" contentTypeKey = "content-type" ) diff --git a/protocol/kafka_confluent/v2/message_test.go b/protocol/kafka_confluent/v2/message_test.go index e7f599b63..9676fe7ac 100644 --- a/protocol/kafka_confluent/v2/message_test.go +++ b/protocol/kafka_confluent/v2/message_test.go @@ -42,14 +42,14 @@ var ( TopicPartition: topicPartition, Value: []byte("hello world!"), Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", "exta": "someext", }), } @@ -89,14 +89,14 @@ func TestNewMessage(t *testing.T) { TopicPartition: topicPartition, Value: nil, Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", }), }, expectedEncoding: binding.EncodingBinary, diff --git a/sql/v2/README.md b/sql/v2/README.md index f45641d97..948f48f41 100644 --- a/sql/v2/README.md +++ b/sql/v2/README.md @@ -18,6 +18,54 @@ expression, err := cesqlparser.Parse("subject = 'Hello world'") res, err := expression.Evaluate(event) ``` +Add a user defined function +```go +import ( + cesql "github.com/cloudevents/sdk-go/sql/v2" + cefn "github.com/cloudevents/sdk-go/sql/v2/function" + cesqlparser "github.com/cloudevents/sdk-go/sql/v2/parser" + ceruntime "github.com/cloudevents/sdk-go/sql/v2/runtime" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +// Create a test event +event := cloudevents.NewEvent() +event.SetID("aaaa-bbbb-dddd") +event.SetSource("https://my-source") +event.SetType("dev.tekton.event") + +// Create and add a new user defined function +var HasPrefixFunction cesql.Function = cefn.NewFunction( + "HASPREFIX", + []cesql.Type{cesql.StringType, cesql.StringType}, + nil, + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + str := i[0].(string) + prefix := i[1].(string) + + return strings.HasPrefix(str, prefix), nil + }, +) + +err := ceruntime.AddFunction(HasPrefixFunction) + +// parse the expression +expression, err := cesqlparser.Parse("HASPREFIX(type, 'dev.tekton.event')") + if err != nil { + fmt.Println("parser err: ", err) + os.Exit(1) + } + +// Evalute the expression with the test event +res, err := expression.Evaluate(event) + +if res.(bool) { + fmt.Println("Event type has the prefix") +} else { + fmt.Println("Event type doesn't have the prefix") +} +``` + ## Development guide To regenerate the parser, make sure you have [ANTLR4 installed](https://github.com/antlr/antlr4/blob/master/doc/getting-started.md) and then run: diff --git a/sql/v2/expression/like_expression.go b/sql/v2/expression/like_expression.go index 5f557fa5a..01734852a 100644 --- a/sql/v2/expression/like_expression.go +++ b/sql/v2/expression/like_expression.go @@ -6,9 +6,6 @@ package expression import ( - "regexp" - "strings" - cesql "github.com/cloudevents/sdk-go/sql/v2" "github.com/cloudevents/sdk-go/sql/v2/utils" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -16,7 +13,7 @@ import ( type likeExpression struct { baseUnaryExpression - pattern *regexp.Regexp + pattern string } func (l likeExpression) Evaluate(event cloudevents.Event) (interface{}, error) { @@ -30,70 +27,65 @@ func (l likeExpression) Evaluate(event cloudevents.Event) (interface{}, error) { return nil, err } - return l.pattern.MatchString(val.(string)), nil + return matchString(val.(string), l.pattern), nil + } func NewLikeExpression(child cesql.Expression, pattern string) (cesql.Expression, error) { - // Converting to regex is not the most performant impl, but it works - p, err := convertLikePatternToRegex(pattern) - if err != nil { - return nil, err - } - return likeExpression{ baseUnaryExpression: baseUnaryExpression{ child: child, }, - pattern: p, + pattern: pattern, }, nil } -func convertLikePatternToRegex(pattern string) (*regexp.Regexp, error) { - var chunks []string - chunks = append(chunks, "^") +func matchString(text, pattern string) bool { + textLen := len(text) + patternLen := len(pattern) + textIdx := 0 + patternIdx := 0 + lastWildcardIdx := -1 + lastMatchIdx := 0 - var chunk strings.Builder + if patternLen == 0 { + return patternLen == textLen + } - for i := 0; i < len(pattern); i++ { - if pattern[i] == '\\' && i < len(pattern)-1 { - if pattern[i+1] == '%' { - // \% case - chunk.WriteRune('%') - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - i++ - continue - } else if pattern[i+1] == '_' { - // \_ case - chunk.WriteRune('_') - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - i++ - continue - } else { - // if there is an actual literal \ character, we need to include that in the string - chunk.WriteRune('\\') - } - } else if pattern[i] == '_' { - // replace with . - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - chunks = append(chunks, ".") - } else if pattern[i] == '%' { - // replace with .* - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - chunks = append(chunks, ".*") + for textIdx < textLen { + if patternIdx < patternLen-1 && pattern[patternIdx] == '\\' && + ((pattern[patternIdx+1] == '_' || pattern[patternIdx+1] == '%') && + pattern[patternIdx+1] == text[textIdx]) { + // handle escaped characters -> pattern needs to increment two places here + patternIdx += 2 + textIdx += 1 + } else if patternIdx < patternLen && (pattern[patternIdx] == '_' || pattern[patternIdx] == text[textIdx]) { + // handle non escaped characters + textIdx += 1 + patternIdx += 1 + } else if patternIdx < patternLen && pattern[patternIdx] == '%' { + // handle wildcard characters + lastWildcardIdx = patternIdx + lastMatchIdx = textIdx + patternIdx += 1 + } else if lastWildcardIdx != -1 { + // greedy match didn't work, try again from the last known match + patternIdx = lastWildcardIdx + 1 + lastMatchIdx += 1 + textIdx = lastMatchIdx } else { - chunk.WriteByte(pattern[i]) + return false } } - if chunk.Len() != 0 { - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - } + // consume remaining pattern characters as long as they are wildcards + for patternIdx < patternLen { + if pattern[patternIdx] != '%' { + return false + } - chunks = append(chunks, "$") + patternIdx += 1 + } - return regexp.Compile(strings.Join(chunks, "")) + return true } diff --git a/sql/v2/function/function.go b/sql/v2/function/function.go index 4ad61faed..f43db3e9d 100644 --- a/sql/v2/function/function.go +++ b/sql/v2/function/function.go @@ -10,11 +10,13 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" ) +type FuncType func(cloudevents.Event, []interface{}) (interface{}, error) + type function struct { name string fixedArgs []cesql.Type variadicArgs *cesql.Type - fn func(cloudevents.Event, []interface{}) (interface{}, error) + fn FuncType } func (f function) Name() string { @@ -39,3 +41,15 @@ func (f function) ArgType(index int) *cesql.Type { func (f function) Run(event cloudevents.Event, arguments []interface{}) (interface{}, error) { return f.fn(event, arguments) } + +func NewFunction(name string, + fixedargs []cesql.Type, + variadicArgs *cesql.Type, + fn FuncType) cesql.Function { + return function{ + name: name, + fixedArgs: fixedargs, + variadicArgs: variadicArgs, + fn: fn, + } +} diff --git a/sql/v2/go.mod b/sql/v2/go.mod index 86d00e196..631a5b536 100644 --- a/sql/v2/go.mod +++ b/sql/v2/go.mod @@ -6,6 +6,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 github.com/cloudevents/sdk-go/v2 v2.5.0 github.com/stretchr/testify v1.8.0 + gopkg.in/yaml.v2 v2.4.0 sigs.k8s.io/yaml v1.3.0 ) @@ -20,7 +21,6 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sql/v2/runtime/functions_resolver.go b/sql/v2/runtime/functions_resolver.go index b80136842..5ab964fb7 100644 --- a/sql/v2/runtime/functions_resolver.go +++ b/sql/v2/runtime/functions_resolver.go @@ -58,6 +58,11 @@ func (table functionTable) AddFunction(function cesql.Function) error { } } +// Adds user defined function +func AddFunction(fn cesql.Function) error { + return globalFunctionTable.AddFunction(fn) +} + func (table functionTable) ResolveFunction(name string, args int) cesql.Function { item := table[strings.ToUpper(name)] if item == nil { diff --git a/sql/v2/runtime/test/tck/user_defined_functions.yaml b/sql/v2/runtime/test/tck/user_defined_functions.yaml new file mode 100644 index 000000000..c2a3a922e --- /dev/null +++ b/sql/v2/runtime/test/tck/user_defined_functions.yaml @@ -0,0 +1,27 @@ +name: User defined functions +tests: + - name: HASPREFIX (1) + expression: "HASPREFIX('abcdef', 'ab')" + result: true + - name: HASPREFIX (2) + expression: "HASPREFIX('abcdef', 'abcdef')" + result: true + - name: HASPREFIX (3) + expression: "HASPREFIX('abcdef', '')" + result: true + - name: HASPREFIX (4) + expression: "HASPREFIX('abcdef', 'gh')" + result: false + - name: HASPREFIX (5) + expression: "HASPREFIX('abcdef', 'abcdefg')" + result: false + + - name: KONKAT (1) + expression: "KONKAT('a', 'b', 'c')" + result: abc + - name: KONKAT (2) + expression: "KONKAT()" + result: "" + - name: KONKAT (3) + expression: "KONKAT('a')" + result: "a" \ No newline at end of file diff --git a/sql/v2/runtime/test/user_defined_functions_test.go b/sql/v2/runtime/test/user_defined_functions_test.go new file mode 100644 index 000000000..944ba98dd --- /dev/null +++ b/sql/v2/runtime/test/user_defined_functions_test.go @@ -0,0 +1,209 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package runtime_test + +import ( + "io" + "os" + "path" + "runtime" + "strings" + "testing" + + cesql "github.com/cloudevents/sdk-go/sql/v2" + "github.com/cloudevents/sdk-go/sql/v2/function" + "github.com/cloudevents/sdk-go/sql/v2/parser" + ceruntime "github.com/cloudevents/sdk-go/sql/v2/runtime" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +var TCKFileNames = []string{ + "user_defined_functions", +} + +var TCKUserDefinedFunctions = []cesql.Function{ + function.NewFunction( + "HASPREFIX", + []cesql.Type{cesql.StringType, cesql.StringType}, + nil, + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + str := i[0].(string) + prefix := i[1].(string) + + return strings.HasPrefix(str, prefix), nil + }, + ), + function.NewFunction( + "KONKAT", + []cesql.Type{}, + cesql.TypePtr(cesql.StringType), + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + var sb strings.Builder + for _, v := range i { + sb.WriteString(v.(string)) + } + return sb.String(), nil + }, + ), +} + +type ErrorType string + +const ( + ParseError ErrorType = "parse" + MathError ErrorType = "math" + CastError ErrorType = "cast" + MissingAttributeError ErrorType = "missingAttribute" + MissingFunctionError ErrorType = "missingFunction" + FunctionEvaluationError ErrorType = "functionEvaluation" +) + +type TckFile struct { + Name string `json:"name"` + Tests []TckTestCase `json:"tests"` +} + +type TckTestCase struct { + Name string `json:"name"` + Expression string `json:"expression"` + + Result interface{} `json:"result"` + Error ErrorType `json:"error"` + + Event *cloudevents.Event `json:"event"` + EventOverrides map[string]interface{} `json:"eventOverrides"` +} + +func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { + var inputEvent cloudevents.Event + if tc.Event != nil { + inputEvent = *tc.Event + } else { + inputEvent = test.FullEvent() + } + + // Make sure the event is v1 + inputEvent.SetSpecVersion(event.CloudEventsVersionV1) + + for k, v := range tc.EventOverrides { + require.NoError(t, spec.V1.SetAttribute(inputEvent.Context, k, v)) + } + + return inputEvent +} + +func (tc TckTestCase) ExpectedResult() interface{} { + switch tc.Result.(type) { + case int: + return int32(tc.Result.(int)) + case float64: + return int32(tc.Result.(float64)) + case bool: + return tc.Result.(bool) + } + return tc.Result +} + +func TestFunctionTableAddFunction(t *testing.T) { + + type args struct { + functions []cesql.Function + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "Add user functions to global table", + + args: args{ + functions: TCKUserDefinedFunctions, + }, + wantErr: false, + }, + { + name: "Fail add user functions to global table", + args: args{ + functions: TCKUserDefinedFunctions, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, fn := range tt.args.functions { + if err := ceruntime.AddFunction(fn); (err != nil) != tt.wantErr { + t.Errorf("AddFunction() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} + +func TestUserFunctions(t *testing.T) { + tckFiles := make([]TckFile, 0, len(TCKFileNames)) + + _, basePath, _, _ := runtime.Caller(0) + basePath, _ = path.Split(basePath) + + for _, testFile := range TCKFileNames { + testFilePath := path.Join(basePath, "tck", testFile+".yaml") + + t.Logf("Loading file %s", testFilePath) + file, err := os.Open(testFilePath) + require.NoError(t, err) + + fileBytes, err := io.ReadAll(file) + require.NoError(t, err) + + tckFileModel := TckFile{} + require.NoError(t, yaml.Unmarshal(fileBytes, &tckFileModel)) + + tckFiles = append(tckFiles, tckFileModel) + } + + for i, file := range tckFiles { + i := i + t.Run(file.Name, func(t *testing.T) { + for j, testCase := range tckFiles[i].Tests { + j := j + testCase := testCase + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + testCase := tckFiles[i].Tests[j] + + t.Logf("Test expression: '%s'", testCase.Expression) + + if testCase.Error == ParseError { + _, err := parser.Parse(testCase.Expression) + require.NotNil(t, err) + return + } + + expr, err := parser.Parse(testCase.Expression) + require.NoError(t, err) + require.NotNil(t, expr) + + inputEvent := testCase.InputEvent(t) + result, err := expr.Evaluate(inputEvent) + + if testCase.Error != "" { + require.NotNil(t, err) + } else { + require.NoError(t, err) + require.Equal(t, testCase.ExpectedResult(), result) + } + }) + } + }) + } +} diff --git a/sql/v2/test/tck_test.go b/sql/v2/test/tck_test.go index f215c8db4..d22555517 100644 --- a/sql/v2/test/tck_test.go +++ b/sql/v2/test/tck_test.go @@ -6,6 +6,7 @@ package test import ( + "fmt" "io" "os" "path" @@ -70,7 +71,7 @@ type TckTestCase struct { EventOverrides map[string]interface{} `json:"eventOverrides"` } -func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { +func (tc TckTestCase) InputEvent(tb testing.TB) cloudevents.Event { var inputEvent cloudevents.Event if tc.Event != nil { inputEvent = *tc.Event @@ -82,7 +83,7 @@ func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { inputEvent.SetSpecVersion(event.CloudEventsVersionV1) for k, v := range tc.EventOverrides { - require.NoError(t, spec.V1.SetAttribute(inputEvent.Context, k, v)) + require.NoError(tb, spec.V1.SetAttribute(inputEvent.Context, k, v)) } return inputEvent @@ -159,3 +160,60 @@ func TestTCK(t *testing.T) { }) } } + +func BenchmarkTCK(b *testing.B) { + tckFiles := make([]TckFile, 0, len(TCKFileNames)) + + _, basePath, _, _ := runtime.Caller(0) + basePath, _ = path.Split(basePath) + + for _, testFile := range TCKFileNames { + testFilePath := path.Join(basePath, "tck", testFile+".yaml") + + b.Logf("Loading file %s", testFilePath) + + file, err := os.Open(testFilePath) + require.NoError(b, err) + + fileBytes, err := io.ReadAll(file) + require.NoError(b, err) + + tckFileModel := TckFile{} + require.NoError(b, yaml.Unmarshal(fileBytes, &tckFileModel)) + + tckFiles = append(tckFiles, tckFileModel) + } + + for i, file := range tckFiles { + i := i + b.Run(file.Name, func(b *testing.B) { + for j, testCase := range tckFiles[i].Tests { + j := j + testCase := testCase + b.Run(fmt.Sprintf("%v parse", testCase.Name), func(b *testing.B) { + testCase := tckFiles[i].Tests[j] + for k := 0; k < b.N; k++ { + _, _ = parser.Parse(testCase.Expression) + } + }) + + if testCase.Error == ParseError { + return + } + + b.Run(fmt.Sprintf("%v evaluate", testCase.Name), func(b *testing.B) { + testCase := tckFiles[i].Tests[j] + + expr, _ := parser.Parse(testCase.Expression) + + inputEvent := testCase.InputEvent(b) + + for k := 0; k < b.N; k++ { + _, _ = expr.Evaluate(inputEvent) + } + + }) + } + }) + } +}