Skip to content

Commit

Permalink
Support defining variables and methods and fix some issues (#52)
Browse files Browse the repository at this point in the history
* add mux as http router. add support to path methods and variables. refine some features. fix some issues. add tests.
close #51

Signed-off-by: Lize Cai <lize.cai@sap.com>

* update tests

Signed-off-by: Lize Cai <lize.cai@sap.com>

* fix typo, rename functions, and add test for structured and binary cloud event

Signed-off-by: Lize Cai <lize.cai@sap.com>
  • Loading branch information
lizzzcai authored Jun 28, 2022
1 parent 4e00aaf commit d08c60f
Show file tree
Hide file tree
Showing 26 changed files with 796 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ jobs:
e2e: "test/declarative/sync-http-multiple/e2e.yaml"
- name: Declarative multiple Sync Cloudevent e2e test
e2e: "test/declarative/sync-cloudevent-multiple/e2e.yaml"
- name: Declarative multiple functions with variables e2e test
e2e: "test/declarative/sync-http-variables/e2e.yaml"
steps:
- uses: actions/checkout@v2

Expand Down
23 changes: 23 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/gorilla/mux"
"k8s.io/klog/v2"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
Expand Down Expand Up @@ -938,6 +939,28 @@ func ConvertUserDataToBytes(data interface{}) []byte {
}
}

type contextKey int

const (
varsKey contextKey = iota
)

// Vars returns the route variables for the current request, if any.
var (
Vars = mux.Vars
)

func CtxWithVars(ctx context.Context, vars map[string]string) context.Context {
return context.WithValue(ctx, varsKey, vars)
}

func VarsFromCtx(ctx context.Context) map[string]string {
if rv := ctx.Value(varsKey); rv != nil {
return rv.(map[string]string)
}
return nil
}

func IsTracingProviderSkyWalking(ctx RuntimeContext) bool {
if ctx.HasPluginsTracingCfg() && ctx.GetPluginsTracingCfg().IsEnabled() &&
ctx.GetPluginsTracingCfg().ProviderName() == TracingProviderSkywalking {
Expand Down
35 changes: 35 additions & 0 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"os"
"strings"
"testing"
"net/http"
"reflect"
)

var (
Expand Down Expand Up @@ -292,3 +294,36 @@ func TestParseFunctionContext(t *testing.T) {
t.Fatal("Error set function context env")
}
}


func TestGetVarsFromContext(t *testing.T) {

tests := []struct {
name string
request *http.Request
vars map[string]string
}{
{
name: "single variable",
request: &http.Request{},
vars: map[string]string{"key1": "val1"},
},
{
name: "multi variables",
request: &http.Request{},
vars: map[string]string{"key1": "val1", "key2": "val2"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := tt.request
ctx := r.Context()
ctx = CtxWithVars(ctx, tt.vars)
got := VarsFromCtx(ctx)
if !reflect.DeepEqual(got, tt.vars) {
t.Errorf("VarsFromCtx = %v, want %v", got, tt.vars)
}
})
}

}
63 changes: 48 additions & 15 deletions framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"context"
"errors"
"fmt"
"net/http"
"os"

Expand All @@ -21,19 +22,21 @@ import (
)

type functionsFrameworkImpl struct {
funcContext ofctx.RuntimeContext
prePlugins []plugin.Plugin
postPlugins []plugin.Plugin
pluginMap map[string]plugin.Plugin
runtime runtime.Interface
registry *registry.Registry
funcContext ofctx.RuntimeContext
funcContextMap map[string]ofctx.RuntimeContext
prePlugins []plugin.Plugin
postPlugins []plugin.Plugin
pluginMap map[string]plugin.Plugin
runtime runtime.Interface
registry *registry.Registry
}

// Framework is the interface for the function conversion.
type Framework interface {
Register(ctx context.Context, fn interface{}) error
RegisterPlugins(customPlugins map[string]plugin.Plugin)
Start(ctx context.Context) error
TryRegisterFunctions(ctx context.Context) error
GetRuntime() runtime.Interface
}

Expand All @@ -50,6 +53,8 @@ func NewFramework() (*functionsFrameworkImpl, error) {
} else {
fwk.funcContext = ctx
}
// for multi functions use cases
fwk.funcContextMap = map[string]ofctx.RuntimeContext{}

// Scan the local directory and register the plugins if exist
// Register the framework default plugins under `plugin` directory
Expand Down Expand Up @@ -100,7 +105,7 @@ func (fwk *functionsFrameworkImpl) Register(ctx context.Context, fn interface{})
return nil
}

func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
func (fwk *functionsFrameworkImpl) TryRegisterFunctions(ctx context.Context) error {

target := os.Getenv("FUNCTION_TARGET")

Expand All @@ -110,14 +115,25 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
klog.Infof("registering function: %s on path: %s", target, fn.GetPath())
switch fn.GetFunctionType() {
case functions.HTTPType:
fwk.Register(ctx, fn.GetHTTPFunction())
if err := fwk.Register(ctx, fn.GetHTTPFunction()); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.CloudEventType:
fwk.Register(ctx, fn.GetCloudEventFunction())
if err := fwk.Register(ctx, fn.GetCloudEventFunction()); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.OpenFunctionType:
fwk.Register(ctx, fn.GetOpenFunctionFunction())
if err := fwk.Register(ctx, fn.GetOpenFunctionFunction()); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
default:
return fmt.Errorf("Unkown function type: %s", fn.GetFunctionType())
}
} else {
klog.Errorf("function not found: %s", target)
return fmt.Errorf("function not found: %s", target)
}
} else {
// if FUNCTION_TARGET is not provided but user uses declarative function, by default all registered functions will be deployed.
Expand All @@ -129,19 +145,26 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
for _, name := range funcNames {
if rf, ok := fwk.registry.GetRegisteredFunction(name); ok {
klog.Infof("registering function: %s on path: %s", rf.GetName(), rf.GetPath())
// Parse OpenFunction FunctionContext
if ctx, err := ofctx.GetRuntimeContext(); err != nil {
klog.Errorf("failed to parse OpenFunction FunctionContext: %v\n", err)
return err
} else {
fwk.funcContextMap[rf.GetName()] = ctx
}
switch rf.GetFunctionType() {
case functions.HTTPType:
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
if err := fwk.runtime.RegisterHTTPFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.CloudEventType:
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
if err := fwk.runtime.RegisterCloudEventFunction(ctx, fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
case functions.OpenFunctionType:
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContext, fwk.prePlugins, fwk.postPlugins, rf); err != nil {
if err := fwk.runtime.RegisterOpenFunction(fwk.funcContextMap[rf.GetName()], fwk.prePlugins, fwk.postPlugins, rf); err != nil {
klog.Errorf("failed to register function: %v", err)
return err
}
Expand All @@ -150,8 +173,18 @@ func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {
}
}
}
return nil
}

func (fwk *functionsFrameworkImpl) Start(ctx context.Context) error {

err := fwk.TryRegisterFunctions(ctx)
if err != nil {
klog.Error("failed to start registering functions")
return err
}

err := fwk.runtime.Start(ctx)
err = fwk.runtime.Start(ctx)
if err != nil {
klog.Error("failed to start runtime service")
return err
Expand Down
152 changes: 152 additions & 0 deletions framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/functions"
"github.com/OpenFunction/functions-framework-go/runtime/async"
)

Expand Down Expand Up @@ -166,6 +167,157 @@ func TestCloudEventFunction(t *testing.T) {
}
}

func TestMultipleFunctions(t *testing.T) {
env := `{
"name": "function-demo",
"version": "v1.0.0",
"port": "8080",
"runtime": "Knative",
"httpPattern": "/"
}`
var ceDemo = struct {
message map[string]string
headers map[string]string
}{
message: map[string]string{
"msg": "Hello World!",
},
headers: map[string]string{
"Ce-Specversion": "1.0",
"Ce-Type": "cloudevents.openfunction.samples.helloworld",
"Ce-Source": "cloudevents.openfunction.samples/helloworldsource",
"Ce-Id": "536808d3-88be-4077-9d7a-a3f162705f79",
},
}

ctx := context.Background()
fwk, err := createFramework(env)
if err != nil {
t.Fatalf("failed to create framework: %v", err)
}

fwk.RegisterPlugins(nil)

// register multiple functions
functions.HTTP("http", fakeHTTPFunction,
functions.WithFunctionPath("/http"),
functions.WithFunctionMethods("GET"),
)

functions.CloudEvent("ce", fakeCloudEventsFunction,
functions.WithFunctionPath("/ce"),
)

functions.OpenFunction("ofn", fakeBindingsFunction,
functions.WithFunctionPath("/ofn"),
functions.WithFunctionMethods("GET", "POST"),
)

if err := fwk.TryRegisterFunctions(ctx); err != nil {
t.Fatalf("failed to start registering functions: %v", err)
}

if fwk.GetRuntime() == nil {
t.Fatal("failed to create runtime")
}
handler := fwk.GetRuntime().GetHandler()
if handler == nil {
t.Fatal("handler is nil")
}

srv := httptest.NewServer(handler.(http.Handler))
defer srv.Close()

// test http
t.Run("sending http", func(t *testing.T) {
resp, err := http.Get(srv.URL + "/http")
if err != nil {
t.Fatalf("http.Get: %v", err)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("ioutil.ReadAll: %v", err)
}

if got, want := string(body), "Hello World!"; got != want {
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
}
})

// test http to openfunction
t.Run("sending http to openfunction", func(t *testing.T) {
resp, err := http.Get(srv.URL + "/ofn")
if err != nil {
t.Fatalf("http.Get: %v", err)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("ioutil.ReadAll: %v", err)
}

if got, want := string(body), "hello there"; got != want {
t.Fatalf("TestHTTPFunction: got %v; want %v", got, want)
}
})

// test cloudevent
t.Run("sending cloudevent", func(t *testing.T) {
messageByte, err := json.Marshal(ceDemo.message)
if err != nil {
t.Fatalf("failed to marshal message: %v", err)
}

req, err := http.NewRequest("POST", srv.URL+"/ce", bytes.NewBuffer(messageByte))
if err != nil {
t.Fatalf("error creating HTTP request for test: %v", err)
}
req.Header.Set("Content-Type", "application/json")
for k, v := range ceDemo.headers {
req.Header.Set(k, v)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("failed to do client.Do: %v", err)
}

if resp.StatusCode != http.StatusOK {
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
}
})

// test cloudevent to openfunction
t.Run("sending cloudevent to openfunction", func(t *testing.T) {
messageByte, err := json.Marshal(ceDemo.message)
if err != nil {
t.Fatalf("failed to marshal message: %v", err)
}

req, err := http.NewRequest("POST", srv.URL+"/ofn", bytes.NewBuffer(messageByte))
if err != nil {
t.Fatalf("error creating HTTP request for test: %v", err)
}
req.Header.Set("Content-Type", "application/json")
for k, v := range ceDemo.headers {
req.Header.Set(k, v)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("failed to do client.Do: %v", err)
}

if resp.StatusCode != http.StatusOK {
t.Fatalf("failed to test cloudevents function: response status = %v, want %v", resp.StatusCode, http.StatusOK)
}
})

}

func TestAsyncBindingsFunction(t *testing.T) {
env := `{
"name": "function-demo",
Expand Down
Loading

0 comments on commit d08c60f

Please sign in to comment.