Skip to content

Commit

Permalink
chore: init code
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed May 27, 2022
1 parent e066a2b commit 80a53d1
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 0 deletions.
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.vscode/
.DS_Store
vendor/
dist/
# delve debug binaries
cmd/**/debug
hack/**/debug
debug.test
*.iml
.coverage
*.out
test/*.cov
site/
/go-diagrams/
*.idea/
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Numaflow Golang SDK

This SDK provides the interfaces to implement [Numaflow](https://github.com/numaproj/numaflow) User Defined Functions or Sinks in Golang.

## Implement User Defined Functions

```golang
package main

import (
"context"

funcsdk "github.com/numaproj/numaflow-go/function"
)

func handle(ctx context.Context, key, msg []byte) (funcsdk.Messages, error) {
return funcsdk.MessagesBuilder().Append(funcsdk.MessageToAll(msg)), nil
}

func main() {
funcsdk.Start(context.Background(), handle)
}
```

## Implement User Defined Sinks

```golang
package main

import (
"context"
"fmt"

sinksdk "github.com/numaproj/numaflow-go/sink"
)

func handle(ctx context.Context, msgs []sinksdk.Message) (sinksdk.Responses, error) {
result := sinksdk.ResponsesBuilder()
for _, m := range msgs {
fmt.Println(string(m.Payload))
result = result.Append(sinksdk.ResponseOK(m.ID))
}
return result, nil
}

func main() {
sinksdk.Start(context.Background(), handle)
}

```
15 changes: 15 additions & 0 deletions examples/function/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package main

import (
"context"

funcsdk "github.com/numaproj/numaflow-go/function"
)

func handle(ctx context.Context, key, msg []byte) (funcsdk.Messages, error) {
return funcsdk.MessagesBuilder().Append(funcsdk.MessageToAll(msg)), nil
}

func main() {
funcsdk.Start(context.Background(), handle)
}
21 changes: 21 additions & 0 deletions examples/sink/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"fmt"

sinksdk "github.com/numaproj/numaflow-go/sink"
)

func handle(ctx context.Context, msgs []sinksdk.Message) (sinksdk.Responses, error) {
result := sinksdk.ResponsesBuilder()
for _, m := range msgs {
fmt.Println(string(m.Payload))
result = result.Append(sinksdk.ResponseOK(m.ID))
}
return result, nil
}

func main() {
sinksdk.Start(context.Background(), handle)
}
171 changes: 171 additions & 0 deletions function/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Package golang provides an interface to write UDF in golang which will be exposed over HTTP. It accepts a handler of the following definition
// func(ctx context.Context, key, msg []byte) (messages Messages, err error)
// which will be invoked for message. If error is returned, the HTTP StatusCode will be set to 500.
package function

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/vmihailenco/msgpack/v5"
)

const (
contentTypeJson = "application/json"
contentTypeMsgPack = "application/msgpack"

envUDFContentType = "NUMAFLOW_UDF_CONTENT_TYPE"

messagekey = "x-numa-message-key"
)

type Handle func(ctx context.Context, key, msg []byte) (Messages, error)

// options for starting the http udf server
type options struct {
drainTimeout time.Duration
}

// Option to apply different options
type Option interface {
apply(*options)
}

type drainTimeout time.Duration

func (f drainTimeout) apply(opts *options) {
opts.drainTimeout = time.Duration(f)
}

// WithDrainTimeout sets a max drain timeout time. It is the maximum time we will wait for the connection to drain out once we have
// initiated the shutdown sequence. Default is 1 minute.
func WithDrainTimeout(f time.Duration) Option {
return drainTimeout(f)
}

// Start starts the HTTP Server after registering the handler at `/messages` endpoint.
func Start(ctx context.Context, handler Handle, opts ...Option) {
options := options{
drainTimeout: time.Minute,
}

for _, o := range opts {
o.apply(&options)
}

ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGTERM)
defer stop()
if err := startWithContext(ctxWithSignal, handler, options); err != nil {
panic(err)
}
}

func udf(ctx context.Context, w http.ResponseWriter, r *http.Request, handler func(ctx context.Context, key, msg []byte) (Messages, error), contentType string) {
messages, err := func() ([]Message, error) {
k := r.Header.Get(messagekey)
in, err := io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
return nil, err
} else {
return handler(ctx, []byte(k), in)
}
}()
if err != nil {
log.Printf("Failed to read and process input message, %s", err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
} else {
if len(messages) == 0 { // Return a DROP message
messages = append(messages, MessageToDrop())
}
b, err := marshalMessages(messages, contentType)
if err != nil {
log.Printf("Marshal message failed, %s", err)
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
} else {
w.Header().Add("Content-Type", contentType)
w.WriteHeader(200)
n, err := w.Write(b)
if err != nil {
log.Printf("Write failed (wrote: %d bytes), %s", n, err)
}
}
}
}

func marshalMessages(messages Messages, contentType string) ([]byte, error) {
switch contentType {
case contentTypeJson:
b, err := json.Marshal(&messages)
if err != nil {
return nil, fmt.Errorf("marshal messages with json failed, %w", err)
}
return b, nil
case contentTypeMsgPack:
b, err := msgpack.Marshal(&messages)
if err != nil {
return nil, fmt.Errorf("marshal messages with msgpack failed, %w", err)
}
return b, nil
default:
return nil, fmt.Errorf("unsupported Content-Type %q", contentType)
}
}

func startWithContext(ctx context.Context, handler func(ctx context.Context, key, msg []byte) (Messages, error), opts options) error {
contentType := os.Getenv(envUDFContentType)
if contentType == "" { // defaults to application/msgpack
contentType = contentTypeMsgPack
}
if contentType != contentTypeJson && contentType != contentTypeMsgPack {
return fmt.Errorf("unsupported Content-Type %q", contentType)
}
http.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(204)
})
http.HandleFunc("/messages", func(w http.ResponseWriter, r *http.Request) {
udf(ctx, w, r, handler, contentType)
})

path := "/var/run/numaflow/udf.sock"
if err := os.Remove(path); !os.IsNotExist(err) && err != nil {
return err
}
udsServer := &http.Server{}
listener, err := net.Listen("unix", path)
if err != nil {
return err
}
defer func() { _ = listener.Close() }()
go func() {
if err := udsServer.Serve(listener); err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
log.Printf("udf server is ready")

// wait for signal
<-ctx.Done()
log.Println("udf server is now shutting down")
defer log.Println("udf server has exited")

// let's not wait indefinitely
stopCtx, cancel := context.WithTimeout(context.Background(), opts.drainTimeout)
defer cancel()
if err := udsServer.Shutdown(stopCtx); err != nil {
return err
}

return nil
}
92 changes: 92 additions & 0 deletions function/start_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package function

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/vmihailenco/msgpack/v5"
)

func TestStart_simpleStop(t *testing.T) {
t.SkipNow()
// start and stop in a second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// start at random port and block till timeout
Start(ctx, dummyTestHandler, WithDrainTimeout(time.Minute))
}

func TestStart_testReady(t *testing.T) {
// 204
req := httptest.NewRequest(http.MethodGet, "/messages", nil)
w := httptest.NewRecorder()
ctx := context.Background()
udf(ctx, w, req, dummyTestHandler, contentTypeMsgPack)
res := w.Result()
defer func() { _ = res.Body.Close() }()
_, err := io.ReadAll(res.Body)
assert.NoError(t, err)
assert.Equal(t, 200, res.StatusCode)

// 200
input := bytes.NewBufferString("hello")
req = httptest.NewRequest(http.MethodPost, "/messages", input)
w = httptest.NewRecorder()
udf(ctx, w, req, dummyTestHandler, contentTypeMsgPack)
res = w.Result()
defer func() { _ = res.Body.Close() }()
assert.Equal(t, 200, res.StatusCode)
var data []byte
data, err = io.ReadAll(res.Body)
assert.NoError(t, err)
messages := []Message{}
err = msgpack.Unmarshal(data, &messages)
assert.NoError(t, err)
assert.Equal(t, bytes.NewBufferString("hello").Bytes(), messages[0].Key)
assert.Equal(t, bytes.NewBufferString("hello").Bytes(), messages[0].Value)

// 200 with nil key
input = bytes.NewBufferString("no_key")
req = httptest.NewRequest(http.MethodPost, "/messages", input)
w = httptest.NewRecorder()
udf(ctx, w, req, func(ctx context.Context, key, msg []byte) (Messages, error) {
return MessagesBuilder().Append(MessageTo("", msg)), nil
}, contentTypeMsgPack)
res = w.Result()
defer func() { _ = res.Body.Close() }()

data, err = io.ReadAll(res.Body)
assert.NoError(t, err)
assert.Equal(t, 200, res.StatusCode)
messages = []Message{}
err = msgpack.Unmarshal(data, &messages)
assert.NoError(t, err)
assert.Equal(t, []byte{}, messages[0].Key)
assert.Equal(t, bytes.NewBufferString("no_key").Bytes(), messages[0].Value)

// 50X
req = httptest.NewRequest(http.MethodPost, "/messages", nil)
w = httptest.NewRecorder()
udf(ctx, w, req, func(ctx context.Context, key, msg []byte) (Messages, error) {
return nil, fmt.Errorf("test error")
}, contentTypeMsgPack)
res = w.Result()
defer func() { _ = res.Body.Close() }()
_, err = io.ReadAll(res.Body)
assert.NoError(t, err)
assert.Equal(t, 500, res.StatusCode)
}

func dummyTestHandler(_ context.Context, key, m []byte) (messages Messages, error error) {
if len(m) == 0 {
return nil, nil
}
return MessagesBuilder().Append(Message{Key: m, Value: m}), nil
}
Loading

0 comments on commit 80a53d1

Please sign in to comment.