Skip to content

Commit

Permalink
[fastws] fix reading very big messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed May 23, 2024
1 parent 6f9aafd commit 72e0f68
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 47 deletions.
6 changes: 3 additions & 3 deletions kit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ module github.com/clubpay/ronykit/kit
go 1.19

require (
github.com/fasthttp/websocket v1.5.8
github.com/fasthttp/websocket v1.5.9
github.com/goccy/go-json v0.10.3
github.com/goccy/go-reflect v1.2.0
github.com/jedib0t/go-pretty/v6 v6.5.9
github.com/onsi/ginkgo/v2 v2.18.0
github.com/onsi/gomega v1.33.1
github.com/valyala/fasthttp v1.53.0
github.com/valyala/fasthttp v1.54.0
golang.org/x/net v0.25.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -20,7 +20,7 @@ require (
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.4.3 // indirect
Expand Down
12 changes: 6 additions & 6 deletions kit/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/fasthttp/websocket v1.5.8 h1:k5DpirKkftIF/w1R8ZzjSgARJrs54Je9YJK37DL/Ah8=
github.com/fasthttp/websocket v1.5.8/go.mod h1:d08g8WaT6nnyvg9uMm8K9zMYyDjfKyj3170AtPRuVU0=
github.com/fasthttp/websocket v1.5.9 h1:9deGuzYcCRKjk940kNwSN6Hd14hk4zYwropm4UsUIUQ=
github.com/fasthttp/websocket v1.5.9/go.mod h1:NLzHBFur260OMuZHohOfYQwMTpR7sfSpUnuqKxMpgKA=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
Expand All @@ -18,8 +18,8 @@ github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQN
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU=
github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand Down Expand Up @@ -47,8 +47,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc=
github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0=
github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
Expand Down
2 changes: 1 addition & 1 deletion std/gateways/fasthttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/fasthttp/router v1.5.0
github.com/fasthttp/websocket v1.5.8
github.com/goccy/go-reflect v1.2.0
github.com/valyala/fasthttp v1.53.0
github.com/valyala/fasthttp v1.54.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions std/gateways/fasthttp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEo
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc=
github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0=
github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
Expand Down
28 changes: 27 additions & 1 deletion std/gateways/fastws/conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fastws

import (
"io"

"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/errors"
"github.com/clubpay/ronykit/kit/utils"
Expand All @@ -19,6 +21,7 @@ type wsConn struct {
w *wsutil.Writer
handshakeDone bool
rpcOutFactory kit.OutgoingRPCFactory
msgs []wsutil.Message
}

var _ kit.Conn = (*wsConn)(nil)
Expand All @@ -29,13 +32,36 @@ func newWebsocketConn(
) *wsConn {
wsc := &wsConn{
w: wsutil.NewWriter(c, ws.StateServerSide, ws.OpText),
r: wsutil.NewReader(c, ws.StateServerSide),
id: id,
kv: map[string]string{},
c: c,
rpcOutFactory: rpcOutFactory,
}

wsc.r = &wsutil.Reader{
Source: c,
State: ws.StateServerSide,
CheckUTF8: true,
OnIntermediate: func(hdr ws.Header, src io.Reader) error {
if hdr.OpCode.IsControl() {
return wsutil.ControlHandler{
Src: wsc.r,
Dst: wsc.c,
State: wsc.r.State,
DisableSrcCiphering: true,
}.Handle(hdr)
}

bts, err := io.ReadAll(src)
if err != nil {
return err
}
wsc.msgs = append(wsc.msgs, wsutil.Message{OpCode: hdr.OpCode, Payload: bts})

return nil
},
}

return wsc
}

Expand Down
64 changes: 31 additions & 33 deletions std/gateways/fastws/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,48 +111,46 @@ func (gw *gateway) OnTraffic(c gnet.Conn) gnet.Action {
hdr ws.Header
)

for {
hdr, err = wsc.r.NextFrame()
if err != nil {
if builtinErr.Is(err, io.EOF) {
return gnet.None
}

return gnet.Close
hdr, err = wsc.r.NextFrame()
if err != nil {
if builtinErr.Is(err, io.EOF) {
return gnet.None
}
if hdr.OpCode.IsControl() {
wsc.r.OnIntermediate = func(header ws.Header, _ io.Reader) error {
return wsutil.ControlHandler{
Src: wsc.r,
Dst: wsc.c,
State: wsc.r.State,
DisableSrcCiphering: true,
}.Handle(header)
}
if err := wsc.r.OnIntermediate(hdr, wsc.r); err != nil {
return gnet.Close
}

continue
}
if hdr.OpCode&(ws.OpText|ws.OpBinary) == 0 {
if err := wsc.r.Discard(); err != nil {
return gnet.Close
}
return gnet.Close
}

var p []byte
if hdr.Fin {
// No more frames will be read. Use fixed sized buffer to read payload.
p = make([]byte, hdr.Length)
// It is not possible to receive io.EOF here because Reader does not
// return EOF if frame payload was successfully fetched.
_, err = io.ReadFull(wsc.r, p)
} else {
// Frame is fragmented, thus use io.ReadAll behavior.
var buff bytes.Buffer
_, err = buff.ReadFrom(wsc.r)
p = buff.Bytes()
}
if err != nil {
return gnet.Close
}

wsc.msgs = append(wsc.msgs, wsutil.Message{OpCode: hdr.OpCode, Payload: p})

for _, msg := range wsc.msgs {
if msg.OpCode&(ws.OpText|ws.OpBinary) == 0 {
continue
}

break
}
payloadBuffer := buf.GetLen(len(msg.Payload))
payloadBuffer.CopyFrom(msg.Payload)

payloadBuffer := buf.GetLen(int(hdr.Length))
n, err := wsc.r.Read(*payloadBuffer.Bytes())
if err != nil && !builtinErr.Is(err, io.EOF) {
return gnet.None
go gw.reactFunc(wsc, payloadBuffer, len(msg.Payload))
}

go gw.reactFunc(wsc, payloadBuffer, n)
wsc.msgs = wsc.msgs[:0]

return gnet.None
}
Expand Down
2 changes: 1 addition & 1 deletion std/gateways/fastws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/clubpay/ronykit/kit v0.14.3
github.com/gobwas/ws v1.4.0
github.com/panjf2000/gnet/v2 v2.5.0
github.com/panjf2000/gnet/v2 v2.5.2
)

require (
Expand Down
103 changes: 103 additions & 0 deletions testenv/fastws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package testenv

import (
"context"
"fmt"
"testing"
"time"

"ronykit/testenv/services"

"github.com/clubpay/ronykit/kit/utils"

"github.com/clubpay/ronykit/std/gateways/fastws"

"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/stub"
. "github.com/smartystreets/goconvey/convey"
"go.uber.org/fx"
)

func TestFastWS(t *testing.T) {
Convey("Kit with FastWS", t, func(c C) {
testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){
"Edger Server With Huge Websocket Payload": fastwsWithHugePayload,
}
for title, fn := range testCases {
Convey(title,
fn(
t, invokeEdgeServerWithFastWS(8082, services.EchoService),
),
)
}
})
}

func invokeEdgeServerWithFastWS(port int, desc ...kit.ServiceDescriptor) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle) {
edge := kit.NewServer(
kit.WithLogger(&stdLogger{}),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fastws.MustNew(
fastws.WithPredicateKey("cmd"),
fastws.Listen(fmt.Sprintf("tcp4://0.0.0.0:%d", port)),
),
),
kit.WithServiceDesc(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

return func(c C) {
Prepare(
t, c,
fx.Options(
opt,
),
)

time.Sleep(time.Second * 5)

wsCtx := stub.New("localhost:8082").
Websocket(
stub.WithPredicateKey("cmd"),
)
c.So(wsCtx.Connect(ctx, "/"), ShouldBeNil)

req := &services.EchoRequest{Input: utils.RandomID(12000)}
res := &services.EchoResponse{}
err := wsCtx.BinaryMessage(
ctx, "echo", req, res,
func(ctx context.Context, msg kit.Message, hdr stub.Header, err error) {
c.So(err, ShouldBeNil)
c.So(msg.(*services.EchoResponse).Output, ShouldEqual, req.Input) //nolint:forcetypeassert
},
)
c.So(err, ShouldBeNil)
}
}
34 changes: 34 additions & 0 deletions testenv/services/echo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package services

import (
"time"

"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/desc"
"github.com/clubpay/ronykit/std/gateways/fastws"
)

type EchoRequest struct {
Input string `json:"input"`
}

type EchoResponse struct {
Output string `json:"output"`
}

var EchoService kit.ServiceDescriptor = desc.NewService("EchoService").
AddContract(
desc.NewContract().
SetInput(&EchoRequest{}).
SetOutput(&EchoResponse{}).
Selector(fastws.RPC("echo")).
SetHandler(
contextMW(10*time.Second),
func(ctx *kit.Context) {
req, _ := ctx.In().GetMsg().(*EchoRequest)

ctx.Out().
SetMsg(&EchoResponse{Output: req.Input}).
Send()
}),
)

0 comments on commit 72e0f68

Please sign in to comment.