Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: udsink gRPC #16

Merged
merged 23 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions examples/sink/example.go

This file was deleted.

8 changes: 5 additions & 3 deletions function/start.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Package golang provides an interface to write UDF in golang which will be exposed over HTTP. It accepts a handler of the following definition
// func(ctx context.Context, key, msg []byte) (messages Messages, err error)
// Package function provides an interface to write UDF in golang which will be exposed over HTTP. It accepts a handler of the following definition
//
// func(ctx context.Context, key, msg []byte) (messages Messages, err error)
//
// which will be invoked for message. If error is returned, the HTTP StatusCode will be set to 500.
package function

Expand Down Expand Up @@ -139,7 +141,7 @@ func startWithContext(ctx context.Context, handler func(ctx context.Context, key
})

path := "/var/run/numaflow/udf.sock"
if err := os.Remove(path); !os.IsNotExist(err) && err != nil {
if err := os.RemoveAll(path); !os.IsNotExist(err) && err != nil {
return err
}
udsServer := &http.Server{}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/numaproj/numaflow-go

go 1.18
go 1.19

require (
github.com/golang/mock v1.6.0
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/proto/sink/v1/mockgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package v1

//go:generate mockgen -destination sinkmock/sinkmock.go -package sinkmock github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1 UserDefinedSinkClient
78 changes: 78 additions & 0 deletions pkg/apis/proto/sink/v1/sinkmock/sinkmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions pkg/apis/proto/sink/v1/udsink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/proto/sink/v1/udsink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message Datum {
bytes value = 2;
EventTime event_time = 3;
Watermark watermark = 4;
string id=5;
}

/**
Expand Down
10 changes: 0 additions & 10 deletions pkg/datum/datum.go

This file was deleted.

3 changes: 1 addition & 2 deletions pkg/function/examples/evenodd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"strconv"

"github.com/numaproj/numaflow-go/pkg/datum"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
)

func handle(_ context.Context, key string, d datum.Datum) functionsdk.Messages {
func handle(_ context.Context, key string, d functionsdk.Datum) functionsdk.Messages {
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
Expand Down
3 changes: 1 addition & 2 deletions pkg/function/examples/flatmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"strings"

"github.com/numaproj/numaflow-go/pkg/datum"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
)

func handle(_ context.Context, key string, d datum.Datum) functionsdk.Messages {
func handle(_ context.Context, key string, d functionsdk.Datum) functionsdk.Messages {
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
Expand Down
3 changes: 1 addition & 2 deletions pkg/function/examples/forward_message/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package main
import (
"context"

"github.com/numaproj/numaflow-go/pkg/datum"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
)

func mapHandle(_ context.Context, key string, d datum.Datum) functionsdk.Messages {
func mapHandle(_ context.Context, key string, d functionsdk.Datum) functionsdk.Messages {
// directly forward the input to the output
val := d.Value()
eventTime := d.EventTime()
Expand Down
3 changes: 1 addition & 2 deletions pkg/function/examples/sum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import (
"fmt"
"strconv"

"github.com/numaproj/numaflow-go/pkg/datum"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
)

func reduceHandle(_ context.Context, key string, reduceCh <-chan datum.Datum, md functionsdk.Metadata) functionsdk.Messages {
func reduceHandle(_ context.Context, key string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages {
// sum up values for the same key
intervalWindow := md.IntervalWindow()
_ = intervalWindow
Expand Down
32 changes: 19 additions & 13 deletions pkg/function/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import (
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/datum"
"google.golang.org/protobuf/types/known/emptypb"
)

// Client contains methods to call a gRPC client.
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error)
ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error)
// Datum contains methods to get the payload information.
type Datum interface {
Value() []byte
EventTime() time.Time
Watermark() time.Time
}

// Metadata contains methods to get the metadata for the reduce operation.
Expand All @@ -28,29 +26,37 @@ type IntervalWindow interface {
EndTime() time.Time
}

// Client contains methods to call a gRPC client.
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error)
ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error)
}

// MapHandler is the interface of map function implementation.
type MapHandler interface {
// HandleDo is the function to process each coming message
HandleDo(ctx context.Context, key string, datum datum.Datum) Messages
HandleDo(ctx context.Context, key string, datum Datum) Messages
}

// ReduceHandler is the interface of reduce function implementation.
type ReduceHandler interface {
HandleDo(ctx context.Context, key string, reduceCh <-chan datum.Datum, md Metadata) Messages
HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages
}

// MapFunc is utility type used to convert a HandleDo function to a MapHandler.
type MapFunc func(ctx context.Context, key string, datum datum.Datum) Messages
type MapFunc func(ctx context.Context, key string, datum Datum) Messages

// HandleDo implements the function of map function.
func (mf MapFunc) HandleDo(ctx context.Context, key string, datum datum.Datum) Messages {
func (mf MapFunc) HandleDo(ctx context.Context, key string, datum Datum) Messages {
return mf(ctx, key, datum)
}

// ReduceFunc is utility type used to convert a HandleDo function to a ReduceHandler.
type ReduceFunc func(ctx context.Context, key string, reduceCh <-chan datum.Datum, md Metadata) Messages
type ReduceFunc func(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages

// HandleDo implements the function of reduce function.
func (rf ReduceFunc) HandleDo(ctx context.Context, key string, reduceCh <-chan datum.Datum, md Metadata) Messages {
func (rf ReduceFunc) HandleDo(ctx context.Context, key string, reduceCh <-chan Datum, md Metadata) Messages {
return rf(ctx, key, reduceCh, md)
}
9 changes: 4 additions & 5 deletions pkg/function/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/datum"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/client"
"github.com/stretchr/testify/assert"
Expand All @@ -21,7 +20,7 @@ func Test_server_map(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.Remove(file.Name())
err = os.RemoveAll(file.Name())
assert.NoError(t, err)
}()

Expand All @@ -36,7 +35,7 @@ func Test_server_map(t *testing.T) {
{
name: "server_map",
fields: fields{
mapHandler: functionsdk.MapFunc(func(ctx context.Context, key string, d datum.Datum) functionsdk.Messages {
mapHandler: functionsdk.MapFunc(func(ctx context.Context, key string, d functionsdk.Datum) functionsdk.Messages {
msg := d.Value()
return functionsdk.MessagesBuilder().Append(functionsdk.MessageTo(key+"_test", msg))
}),
Expand Down Expand Up @@ -83,7 +82,7 @@ func Test_server_reduce(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.Remove(file.Name())
err = os.RemoveAll(file.Name())
assert.NoError(t, err)
}()

Expand All @@ -100,7 +99,7 @@ func Test_server_reduce(t *testing.T) {
{
name: "server_reduce",
fields: fields{
reduceHandler: functionsdk.ReduceFunc(func(ctx context.Context, key string, reduceCh <-chan datum.Datum, md functionsdk.Metadata) functionsdk.Messages {
reduceHandler: functionsdk.ReduceFunc(func(ctx context.Context, key string, reduceCh <-chan functionsdk.Datum, md functionsdk.Metadata) functionsdk.Messages {
// sum up values for the same key

// in this test case, md is nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/function/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/datum"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
)
Expand Down Expand Up @@ -84,7 +83,7 @@ func (fs *Service) ReduceFn(stream functionpb.UserDefinedFunction_ReduceFnServer
var (
ctx = stream.Context()
key string
reduceCh = make(chan datum.Datum)
reduceCh = make(chan Datum)
md Metadata
)

Expand Down
3 changes: 1 addition & 2 deletions pkg/function/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/datum"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -32,7 +31,7 @@ func TestService_MapFn(t *testing.T) {
{
name: "map_fn_forward_msg",
fields: fields{
Mapper: MapFunc(func(ctx context.Context, key string, datum datum.Datum) Messages {
Mapper: MapFunc(func(ctx context.Context, key string, datum Datum) Messages {
msg := datum.Value()
return MessagesBuilder().Append(MessageTo(key+"_test", msg))
}),
Expand Down
Loading