Skip to content

Commit

Permalink
Change Request Reply signature
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed May 18, 2024
1 parent 1fe8d73 commit 04cf73a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
10 changes: 9 additions & 1 deletion examples/request-reply-single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"

"ella.to/bus"
Expand Down Expand Up @@ -36,8 +37,15 @@ func main() {

funcName := "func.div"

bus.Reply(ctx, c, funcName, func(ctx context.Context, req *Req) (*Resp, error) {
bus.Reply(ctx, c, funcName, func(ctx context.Context, in json.RawMessage) (any, error) {
fmt.Println("Got a request")

req := &Req{}
err := json.Unmarshal(in, req)
if err != nil {
return nil, err
}

if req.B == 0 {
return nil, fmt.Errorf("division by zero")
}
Expand Down
10 changes: 9 additions & 1 deletion examples/request-reply/reply/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"

"ella.to/bus"
Expand All @@ -19,8 +20,15 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus.Reply(ctx, c, "func.div", func(ctx context.Context, req *data.Req) (*data.Resp, error) {
bus.Reply(ctx, c, "func.div", func(ctx context.Context, in json.RawMessage) (any, error) {
fmt.Println("Got a request")

req := &data.Req{}
err := json.Unmarshal(in, req)
if err != nil {
return nil, err
}

if req.B == 0 {
return nil, fmt.Errorf("division by zero")
}
Expand Down
12 changes: 3 additions & 9 deletions request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type RequestFunc func(ctx context.Context, req any, resp any) error
type ReplyFunc[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)
type ReplyFunc func(ctx context.Context, req json.RawMessage) (any, error)

func Request(stream Stream, subject string) RequestFunc {
return func(ctx context.Context, req any, resp any) (err error) {
Expand Down Expand Up @@ -74,7 +74,7 @@ func Request(stream Stream, subject string) RequestFunc {
}
}

func Reply[Req, Resp any](ctx context.Context, stream Stream, subject string, fn ReplyFunc[*Req, *Resp]) {
func Reply(ctx context.Context, stream Stream, subject string, fn ReplyFunc) {
queueName := fmt.Sprintf("queue.%s", subject)
msgs := stream.Get(
ctx,
Expand All @@ -97,18 +97,12 @@ func Reply[Req, Resp any](ctx context.Context, stream Stream, subject string, fn

event := msg.Events[0]

var req Req
err = json.Unmarshal(event.Data, &req)
if err != nil {
return
}

var replyMsg struct {
Type string `json:"type"`
Payload any `json:"payload"`
}

resp, err := fn(ctx, &req)
resp, err := fn(ctx, event.Data)
if err != nil {
replyMsg.Type = "error"
replyMsg.Payload = err.Error()
Expand Down
9 changes: 8 additions & 1 deletion request_reply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bus_test

import (
"context"
"encoding/json"
"fmt"
"testing"

Expand All @@ -21,7 +22,13 @@ func TestRequestReply(t *testing.T) {
Result int
}

bus.Reply(context.TODO(), client, "func.div", func(ctx context.Context, req *Req) (resp *Resp, err error) {
bus.Reply(context.TODO(), client, "func.div", func(ctx context.Context, in json.RawMessage) (resp any, err error) {
req := &Req{}
err = json.Unmarshal(in, req)
if err != nil {
return nil, err
}

if req.B == 0 {
return nil, fmt.Errorf("division by zero")
}
Expand Down

0 comments on commit 04cf73a

Please sign in to comment.