Skip to content

Commit

Permalink
Merge pull request #369 from lesismal/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
lesismal authored Dec 5, 2023
2 parents 0d7a6db + 0097556 commit 926fb9b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 14 deletions.
2 changes: 1 addition & 1 deletion nbhttp/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (p *Parser) Read(data []byte) error {
return nil
}

var c byte
var start = 0
var offset = len(p.cache)
if offset > 0 {
Expand All @@ -161,6 +160,7 @@ UPGRADER:
return err
}

var c byte
for i := offset; i < len(data); i++ {
if p.Reader != nil {
goto UPGRADER
Expand Down
2 changes: 1 addition & 1 deletion nbhttp/websocket/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (c *Conn) nextFrame() (opcode MessageType, body []byte, ok, fin, res1, res2
func (c *Conn) Read(p *nbhttp.Parser, data []byte) error {
oldLen := len(c.buffer)
readLimit := c.Engine.ReadLimit
if readLimit > 0 && ((oldLen+len(data) > readLimit) || ((oldLen + len(c.message) + len(data)) > readLimit)) {
if readLimit > 0 && (oldLen+len(data) > readLimit) {
return nbhttp.ErrTooLong
}

Expand Down
65 changes: 53 additions & 12 deletions nbhttp/websocket/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ var (
// DefaultBlockingModAsyncWrite .
DefaultBlockingModAsyncWrite = true

// DefaultBlockingModHandleRead .
DefaultBlockingModHandleRead = true

// DefaultBlockingModTransferConnToPoller .
DefaultBlockingModTransferConnToPoller = false

// DefaultBlockingModSendQueueInitSize .
DefaultBlockingModSendQueueInitSize = 4

// DefaultBlockingModSendQueueMaxSize .
DefaultBlockingModSendQueueMaxSize = 0

DefaultBlockingModAsyncCloseDelay = time.Second / 10
Expand Down Expand Up @@ -82,6 +90,33 @@ type Upgrader struct {
// false: write buffer to the conn directely.
BlockingModAsyncWrite bool

// BlockingModHandleRead represents whether start a goroutine to handle reading automatically during `Upgrade``:
// true: use dynamic goroutine to handle writing.
// false: write buffer to the conn directely.
//
//
// Notice:
// If we start a goroutine to handle read during `Upgrade`, we may receive a new websocket message
// before we have left the http.Handler for the `Websocket Handshake`.
// Then if we have the logic of `websocket.Conn.SetSession` in the http.Handler, it's possible that when we receive
// and are handling a websocket message and call `websocket.Conn.Session()`, we get nil.
//
// To fix this nil session problem, can use `websocket.Conn.SessionWithLock()`.
//
// For other concurrent problems(including the nil session problem), we can:
// 1st: set this `BlockingModHandleRead = false`
// 2nd: `go wsConn.HandleRead(YourBufSize)` after `Upgrade` and finished initialization.
// Then the websocket message wouldn't come before the http.Handler for `Websocket Handshake` has done.
BlockingModHandleRead bool

// BlockingModTrasferConnToPoller represents whether try to transfer a blocking connection to nonblocking and add to `Engine``.
// true: try to transfer.
// false: don't try to transfer.
//
// Notice:
// Only `net.TCPConn` and `llib's blocking tls.Conn` can be transferred to nonblocking.
BlockingModTrasferConnToPoller bool

// BlockingModSendQueueInitSize represents the init size of a Conn's send queue,
// only takes effect when `BlockingModAsyncWrite` is true.
BlockingModSendQueueInitSize int
Expand All @@ -99,10 +134,12 @@ func NewUpgrader() *Upgrader {
compressionLevel: defaultCompressionLevel,
BlockingModAsyncCloseDelay: DefaultBlockingModAsyncCloseDelay,
},
BlockingModReadBufferSize: DefaultBlockingReadBufferSize,
BlockingModAsyncWrite: DefaultBlockingModAsyncWrite,
BlockingModSendQueueInitSize: DefaultBlockingModSendQueueInitSize,
BlockingModSendQueueMaxSize: DefaultBlockingModSendQueueMaxSize,
BlockingModReadBufferSize: DefaultBlockingReadBufferSize,
BlockingModAsyncWrite: DefaultBlockingModAsyncWrite,
BlockingModHandleRead: DefaultBlockingModHandleRead,
BlockingModTrasferConnToPoller: DefaultBlockingModTransferConnToPoller,
BlockingModSendQueueInitSize: DefaultBlockingModSendQueueInitSize,
BlockingModSendQueueMaxSize: DefaultBlockingModSendQueueMaxSize,
}
u.pingMessageHandler = func(c *Conn, data string) {
err := c.WriteMessage(PongMessage, []byte(data))
Expand Down Expand Up @@ -219,7 +256,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
var nbc *nbio.Conn
var engine = u.Engine
var parser *nbhttp.Parser
var transferConn bool
var transferConn = u.BlockingModTrasferConnToPoller
if len(args) > 0 {
var b bool
b, ok = args[0].(bool)
Expand Down Expand Up @@ -270,12 +307,14 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
nbhttpConn.Trasfered = true
}
vt.ResetRawInput()
parser = &nbhttp.Parser{Execute: nbc.Execute}
parser = nbhttp.NewParser(nil, false, engine.ReadLimit, nbc.Execute)
if engine.EpollMod == nbio.EPOLLET && engine.EPOLLONESHOT == nbio.EPOLLONESHOT {
parser.Execute = nbhttp.SyncExecutor
}
wsc = NewConn(u, vt, subprotocol, compress, false)
nbc.SetSession(wsc)
parser.Reader = wsc
parser.Engine = engine
nbc.SetSession(parser)
nbc.OnData(func(c *nbio.Conn, data []byte) {
defer func() {
if err := recover(); err != nil {
Expand All @@ -299,7 +338,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
return
}
if nread > 0 {
errRead = wsc.Read(parser, buffer[:nread])
errRead = parser.Read(buffer[:nread])
if err != nil {
logging.Debug("websocket Conn Read failed: %v", errRead)
c.CloseWithError(errRead)
Expand Down Expand Up @@ -344,12 +383,14 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
if nbhttpConn != nil {
nbhttpConn.Trasfered = true
}
parser = &nbhttp.Parser{Execute: nbc.Execute}
parser = nbhttp.NewParser(nil, false, engine.ReadLimit, nbc.Execute)
if engine.EpollMod == nbio.EPOLLET && engine.EPOLLONESHOT == nbio.EPOLLONESHOT {
parser.Execute = nbhttp.SyncExecutor
}
wsc = NewConn(u, nbc, subprotocol, compress, false)
nbc.SetSession(wsc)
parser.Reader = wsc
parser.Engine = engine
nbc.SetSession(parser)
nbc.OnData(func(c *nbio.Conn, data []byte) {
defer func() {
if err := recover(); err != nil {
Expand All @@ -360,7 +401,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
}
}()

errRead := wsc.Read(parser, data)
errRead := parser.Read(data)
if errRead != nil {
logging.Debug("websocket Conn Read failed: %v", errRead)
c.CloseWithError(errRead)
Expand Down Expand Up @@ -398,7 +439,7 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
}
wsc.isReadingByParser = isReadingByParser
if wsc.isBlockingMod && (!wsc.isReadingByParser) {
var handleRead = true
var handleRead = u.BlockingModHandleRead
if len(args) > 1 {
var b bool
b, ok = args[1].(bool)
Expand Down

0 comments on commit 926fb9b

Please sign in to comment.