From 04cf73a1677bbf39c7fee2a0ce23a2429cd49cf0 Mon Sep 17 00:00:00 2001 From: Ali Najafizadeh Date: Sat, 18 May 2024 15:20:44 -0400 Subject: [PATCH] Change Request Reply signature --- examples/request-reply-single/main.go | 10 +++++++++- examples/request-reply/reply/main.go | 10 +++++++++- request_reply.go | 12 +++--------- request_reply_test.go | 9 ++++++++- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/examples/request-reply-single/main.go b/examples/request-reply-single/main.go index f66e2fd..35395a5 100644 --- a/examples/request-reply-single/main.go +++ b/examples/request-reply-single/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "ella.to/bus" @@ -36,8 +37,15 @@ func main() { funcName := "func.div" - bus.Reply(ctx, c, funcName, func(ctx context.Context, req *Req) (*Resp, error) { + bus.Reply(ctx, c, funcName, func(ctx context.Context, in json.RawMessage) (any, error) { fmt.Println("Got a request") + + req := &Req{} + err := json.Unmarshal(in, req) + if err != nil { + return nil, err + } + if req.B == 0 { return nil, fmt.Errorf("division by zero") } diff --git a/examples/request-reply/reply/main.go b/examples/request-reply/reply/main.go index 1d288e2..0a0e836 100644 --- a/examples/request-reply/reply/main.go +++ b/examples/request-reply/reply/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "ella.to/bus" @@ -19,8 +20,15 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - bus.Reply(ctx, c, "func.div", func(ctx context.Context, req *data.Req) (*data.Resp, error) { + bus.Reply(ctx, c, "func.div", func(ctx context.Context, in json.RawMessage) (any, error) { fmt.Println("Got a request") + + req := &data.Req{} + err := json.Unmarshal(in, req) + if err != nil { + return nil, err + } + if req.B == 0 { return nil, fmt.Errorf("division by zero") } diff --git a/request_reply.go b/request_reply.go index e2b0a95..11dda93 100644 --- a/request_reply.go +++ b/request_reply.go @@ -9,7 +9,7 @@ import ( ) type RequestFunc func(ctx context.Context, req any, resp any) error -type ReplyFunc[Req, Resp any] func(ctx context.Context, req Req) (Resp, error) +type ReplyFunc func(ctx context.Context, req json.RawMessage) (any, error) func Request(stream Stream, subject string) RequestFunc { return func(ctx context.Context, req any, resp any) (err error) { @@ -74,7 +74,7 @@ func Request(stream Stream, subject string) RequestFunc { } } -func Reply[Req, Resp any](ctx context.Context, stream Stream, subject string, fn ReplyFunc[*Req, *Resp]) { +func Reply(ctx context.Context, stream Stream, subject string, fn ReplyFunc) { queueName := fmt.Sprintf("queue.%s", subject) msgs := stream.Get( ctx, @@ -97,18 +97,12 @@ func Reply[Req, Resp any](ctx context.Context, stream Stream, subject string, fn event := msg.Events[0] - var req Req - err = json.Unmarshal(event.Data, &req) - if err != nil { - return - } - var replyMsg struct { Type string `json:"type"` Payload any `json:"payload"` } - resp, err := fn(ctx, &req) + resp, err := fn(ctx, event.Data) if err != nil { replyMsg.Type = "error" replyMsg.Payload = err.Error() diff --git a/request_reply_test.go b/request_reply_test.go index 8973106..826c3a2 100644 --- a/request_reply_test.go +++ b/request_reply_test.go @@ -2,6 +2,7 @@ package bus_test import ( "context" + "encoding/json" "fmt" "testing" @@ -21,7 +22,13 @@ func TestRequestReply(t *testing.T) { Result int } - bus.Reply(context.TODO(), client, "func.div", func(ctx context.Context, req *Req) (resp *Resp, err error) { + bus.Reply(context.TODO(), client, "func.div", func(ctx context.Context, in json.RawMessage) (resp any, err error) { + req := &Req{} + err = json.Unmarshal(in, req) + if err != nil { + return nil, err + } + if req.B == 0 { return nil, fmt.Errorf("division by zero") }