From 72e0f685ffdf4d7bc4d785c9fe3606da7bcf95d3 Mon Sep 17 00:00:00 2001 From: Ehsan Noureddin Moosa Date: Fri, 24 May 2024 02:15:37 +0300 Subject: [PATCH] [fastws] fix reading very big messages --- kit/go.mod | 6 +- kit/go.sum | 12 ++-- std/gateways/fasthttp/go.mod | 2 +- std/gateways/fasthttp/go.sum | 4 +- std/gateways/fastws/conn.go | 28 ++++++++- std/gateways/fastws/gateway.go | 64 ++++++++++---------- std/gateways/fastws/go.mod | 2 +- testenv/fastws_test.go | 103 +++++++++++++++++++++++++++++++++ testenv/services/echo.go | 34 +++++++++++ 9 files changed, 208 insertions(+), 47 deletions(-) create mode 100644 testenv/fastws_test.go create mode 100644 testenv/services/echo.go diff --git a/kit/go.mod b/kit/go.mod index 865b5cfe..3f4b5ec4 100644 --- a/kit/go.mod +++ b/kit/go.mod @@ -3,13 +3,13 @@ module github.com/clubpay/ronykit/kit go 1.19 require ( - github.com/fasthttp/websocket v1.5.8 + github.com/fasthttp/websocket v1.5.9 github.com/goccy/go-json v0.10.3 github.com/goccy/go-reflect v1.2.0 github.com/jedib0t/go-pretty/v6 v6.5.9 github.com/onsi/ginkgo/v2 v2.18.0 github.com/onsi/gomega v1.33.1 - github.com/valyala/fasthttp v1.53.0 + github.com/valyala/fasthttp v1.54.0 golang.org/x/net v0.25.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -20,7 +20,7 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/rivo/uniseg v0.4.3 // indirect diff --git a/kit/go.sum b/kit/go.sum index 551b4efc..8cceb374 100644 --- a/kit/go.sum +++ b/kit/go.sum @@ -2,8 +2,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/fasthttp/websocket v1.5.8 h1:k5DpirKkftIF/w1R8ZzjSgARJrs54Je9YJK37DL/Ah8= -github.com/fasthttp/websocket v1.5.8/go.mod h1:d08g8WaT6nnyvg9uMm8K9zMYyDjfKyj3170AtPRuVU0= +github.com/fasthttp/websocket v1.5.9 h1:9deGuzYcCRKjk940kNwSN6Hd14hk4zYwropm4UsUIUQ= +github.com/fasthttp/websocket v1.5.9/go.mod h1:NLzHBFur260OMuZHohOfYQwMTpR7sfSpUnuqKxMpgKA= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= @@ -18,8 +18,8 @@ github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQN github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU= github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -47,8 +47,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc= -github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= +github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= +github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= diff --git a/std/gateways/fasthttp/go.mod b/std/gateways/fasthttp/go.mod index a71958e7..c98537a9 100644 --- a/std/gateways/fasthttp/go.mod +++ b/std/gateways/fasthttp/go.mod @@ -7,7 +7,7 @@ require ( github.com/fasthttp/router v1.5.0 github.com/fasthttp/websocket v1.5.8 github.com/goccy/go-reflect v1.2.0 - github.com/valyala/fasthttp v1.53.0 + github.com/valyala/fasthttp v1.54.0 ) require ( diff --git a/std/gateways/fasthttp/go.sum b/std/gateways/fasthttp/go.sum index 2f5bfe42..a67f6a1f 100644 --- a/std/gateways/fasthttp/go.sum +++ b/std/gateways/fasthttp/go.sum @@ -37,8 +37,8 @@ github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEo github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4zc= -github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= +github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= +github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= diff --git a/std/gateways/fastws/conn.go b/std/gateways/fastws/conn.go index 51bbf80f..ebe18b0b 100644 --- a/std/gateways/fastws/conn.go +++ b/std/gateways/fastws/conn.go @@ -1,6 +1,8 @@ package fastws import ( + "io" + "github.com/clubpay/ronykit/kit" "github.com/clubpay/ronykit/kit/errors" "github.com/clubpay/ronykit/kit/utils" @@ -19,6 +21,7 @@ type wsConn struct { w *wsutil.Writer handshakeDone bool rpcOutFactory kit.OutgoingRPCFactory + msgs []wsutil.Message } var _ kit.Conn = (*wsConn)(nil) @@ -29,13 +32,36 @@ func newWebsocketConn( ) *wsConn { wsc := &wsConn{ w: wsutil.NewWriter(c, ws.StateServerSide, ws.OpText), - r: wsutil.NewReader(c, ws.StateServerSide), id: id, kv: map[string]string{}, c: c, rpcOutFactory: rpcOutFactory, } + wsc.r = &wsutil.Reader{ + Source: c, + State: ws.StateServerSide, + CheckUTF8: true, + OnIntermediate: func(hdr ws.Header, src io.Reader) error { + if hdr.OpCode.IsControl() { + return wsutil.ControlHandler{ + Src: wsc.r, + Dst: wsc.c, + State: wsc.r.State, + DisableSrcCiphering: true, + }.Handle(hdr) + } + + bts, err := io.ReadAll(src) + if err != nil { + return err + } + wsc.msgs = append(wsc.msgs, wsutil.Message{OpCode: hdr.OpCode, Payload: bts}) + + return nil + }, + } + return wsc } diff --git a/std/gateways/fastws/gateway.go b/std/gateways/fastws/gateway.go index 6d205743..2a614009 100644 --- a/std/gateways/fastws/gateway.go +++ b/std/gateways/fastws/gateway.go @@ -111,48 +111,46 @@ func (gw *gateway) OnTraffic(c gnet.Conn) gnet.Action { hdr ws.Header ) - for { - hdr, err = wsc.r.NextFrame() - if err != nil { - if builtinErr.Is(err, io.EOF) { - return gnet.None - } - - return gnet.Close + hdr, err = wsc.r.NextFrame() + if err != nil { + if builtinErr.Is(err, io.EOF) { + return gnet.None } - if hdr.OpCode.IsControl() { - wsc.r.OnIntermediate = func(header ws.Header, _ io.Reader) error { - return wsutil.ControlHandler{ - Src: wsc.r, - Dst: wsc.c, - State: wsc.r.State, - DisableSrcCiphering: true, - }.Handle(header) - } - if err := wsc.r.OnIntermediate(hdr, wsc.r); err != nil { - return gnet.Close - } - continue - } - if hdr.OpCode&(ws.OpText|ws.OpBinary) == 0 { - if err := wsc.r.Discard(); err != nil { - return gnet.Close - } + return gnet.Close + } + var p []byte + if hdr.Fin { + // No more frames will be read. Use fixed sized buffer to read payload. + p = make([]byte, hdr.Length) + // It is not possible to receive io.EOF here because Reader does not + // return EOF if frame payload was successfully fetched. + _, err = io.ReadFull(wsc.r, p) + } else { + // Frame is fragmented, thus use io.ReadAll behavior. + var buff bytes.Buffer + _, err = buff.ReadFrom(wsc.r) + p = buff.Bytes() + } + if err != nil { + return gnet.Close + } + + wsc.msgs = append(wsc.msgs, wsutil.Message{OpCode: hdr.OpCode, Payload: p}) + + for _, msg := range wsc.msgs { + if msg.OpCode&(ws.OpText|ws.OpBinary) == 0 { continue } - break - } + payloadBuffer := buf.GetLen(len(msg.Payload)) + payloadBuffer.CopyFrom(msg.Payload) - payloadBuffer := buf.GetLen(int(hdr.Length)) - n, err := wsc.r.Read(*payloadBuffer.Bytes()) - if err != nil && !builtinErr.Is(err, io.EOF) { - return gnet.None + go gw.reactFunc(wsc, payloadBuffer, len(msg.Payload)) } - go gw.reactFunc(wsc, payloadBuffer, n) + wsc.msgs = wsc.msgs[:0] return gnet.None } diff --git a/std/gateways/fastws/go.mod b/std/gateways/fastws/go.mod index 7ae9dc30..94596a06 100644 --- a/std/gateways/fastws/go.mod +++ b/std/gateways/fastws/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/clubpay/ronykit/kit v0.14.3 github.com/gobwas/ws v1.4.0 - github.com/panjf2000/gnet/v2 v2.5.0 + github.com/panjf2000/gnet/v2 v2.5.2 ) require ( diff --git a/testenv/fastws_test.go b/testenv/fastws_test.go new file mode 100644 index 00000000..7103b3a1 --- /dev/null +++ b/testenv/fastws_test.go @@ -0,0 +1,103 @@ +package testenv + +import ( + "context" + "fmt" + "testing" + "time" + + "ronykit/testenv/services" + + "github.com/clubpay/ronykit/kit/utils" + + "github.com/clubpay/ronykit/std/gateways/fastws" + + "github.com/clubpay/ronykit/kit" + "github.com/clubpay/ronykit/kit/stub" + . "github.com/smartystreets/goconvey/convey" + "go.uber.org/fx" +) + +func TestFastWS(t *testing.T) { + Convey("Kit with FastWS", t, func(c C) { + testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){ + "Edger Server With Huge Websocket Payload": fastwsWithHugePayload, + } + for title, fn := range testCases { + Convey(title, + fn( + t, invokeEdgeServerWithFastWS(8082, services.EchoService), + ), + ) + } + }) +} + +func invokeEdgeServerWithFastWS(port int, desc ...kit.ServiceDescriptor) fx.Option { + return fx.Invoke( + func(lc fx.Lifecycle) { + edge := kit.NewServer( + kit.WithLogger(&stdLogger{}), + kit.WithErrorHandler( + func(ctx *kit.Context, err error) { + fmt.Println("EdgeError: ", err) + }, + ), + kit.WithGateway( + fastws.MustNew( + fastws.WithPredicateKey("cmd"), + fastws.Listen(fmt.Sprintf("tcp4://0.0.0.0:%d", port)), + ), + ), + kit.WithServiceDesc(desc...), + ) + + lc.Append( + fx.Hook{ + OnStart: func(ctx context.Context) error { + edge.Start(ctx) + + return nil + }, + OnStop: func(ctx context.Context) error { + edge.Shutdown(ctx) + + return nil + }, + }, + ) + }, + ) +} + +func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) { + ctx := context.Background() + + return func(c C) { + Prepare( + t, c, + fx.Options( + opt, + ), + ) + + time.Sleep(time.Second * 5) + + wsCtx := stub.New("localhost:8082"). + Websocket( + stub.WithPredicateKey("cmd"), + ) + c.So(wsCtx.Connect(ctx, "/"), ShouldBeNil) + + req := &services.EchoRequest{Input: utils.RandomID(12000)} + res := &services.EchoResponse{} + err := wsCtx.BinaryMessage( + ctx, "echo", req, res, + func(ctx context.Context, msg kit.Message, hdr stub.Header, err error) { + c.So(err, ShouldBeNil) + c.So(msg.(*services.EchoResponse).Output, ShouldEqual, req.Input) //nolint:forcetypeassert + }, + ) + c.So(err, ShouldBeNil) + } +} diff --git a/testenv/services/echo.go b/testenv/services/echo.go new file mode 100644 index 00000000..3aea5d42 --- /dev/null +++ b/testenv/services/echo.go @@ -0,0 +1,34 @@ +package services + +import ( + "time" + + "github.com/clubpay/ronykit/kit" + "github.com/clubpay/ronykit/kit/desc" + "github.com/clubpay/ronykit/std/gateways/fastws" +) + +type EchoRequest struct { + Input string `json:"input"` +} + +type EchoResponse struct { + Output string `json:"output"` +} + +var EchoService kit.ServiceDescriptor = desc.NewService("EchoService"). + AddContract( + desc.NewContract(). + SetInput(&EchoRequest{}). + SetOutput(&EchoResponse{}). + Selector(fastws.RPC("echo")). + SetHandler( + contextMW(10*time.Second), + func(ctx *kit.Context) { + req, _ := ctx.In().GetMsg().(*EchoRequest) + + ctx.Out(). + SetMsg(&EchoResponse{Output: req.Input}). + Send() + }), + )