Skip to content

Commit

Permalink
fix err on conn close
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Aug 29, 2024
1 parent f56b1c3 commit 9190b78
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
2 changes: 1 addition & 1 deletion const.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ const (
// It's not an implementation choice, the value defined in the specification.
initialStreamWindow = 256 * 1024
maxStreamWindow = 16 * 1024 * 1024
goAwayWaitTime = 5 * time.Second
goAwayWaitTime = 50 * time.Millisecond
)

const (
Expand Down
2 changes: 1 addition & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (s *Session) close(shutdownErr error, sendGoAway bool, errCode uint32) erro
s.streamLock.Lock()
defer s.streamLock.Unlock()
for id, stream := range s.streams {
stream.forceClose()
stream.forceClose(fmt.Errorf("%w: connection closed: %w", ErrStreamReset, s.shutdownErr))
delete(s.streams, id)
stream.memorySpan.Done()
}
Expand Down
25 changes: 16 additions & 9 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Stream struct {

state streamState
writeState, readState halfStreamState
writeErr, readErr error
stateLock sync.Mutex
resetErr *StreamError

recvBuf segmentedBuffer

Expand Down Expand Up @@ -90,7 +90,7 @@ func (s *Stream) Read(b []byte) (n int, err error) {
START:
s.stateLock.Lock()
state := s.readState
resetErr := s.resetErr
resetErr := s.readErr
s.stateLock.Unlock()

switch state {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *Stream) write(b []byte) (n int, err error) {
START:
s.stateLock.Lock()
state := s.writeState
resetErr := s.resetErr
resetErr := s.writeErr
s.stateLock.Unlock()

switch state {
Expand Down Expand Up @@ -283,12 +283,13 @@ func (s *Stream) ResetWithError(errCode uint32) error {
// If we've already sent/received an EOF, no need to reset that side.
if s.writeState == halfOpen {
s.writeState = halfReset
s.writeErr = &StreamError{Remote: false, ErrorCode: errCode}
}
if s.readState == halfOpen {
s.readState = halfReset
s.readErr = &StreamError{Remote: false, ErrorCode: errCode}
}
s.state = streamFinished
s.resetErr = &StreamError{Remote: false, ErrorCode: errCode}
s.notifyWaiting()
s.stateLock.Unlock()
if sendReset {
Expand Down Expand Up @@ -344,6 +345,7 @@ func (s *Stream) CloseRead() error {
panic("invalid state")
}
s.readState = halfReset
s.readErr = ErrStreamReset
cleanup = s.writeState != halfOpen
if cleanup {
s.state = streamFinished
Expand All @@ -365,13 +367,15 @@ func (s *Stream) Close() error {
}

// forceClose is used for when the session is exiting
func (s *Stream) forceClose() {
func (s *Stream) forceClose(err error) {
s.stateLock.Lock()
if s.readState == halfOpen {
s.readState = halfReset
s.readErr = err
}
if s.writeState == halfOpen {
s.writeState = halfReset
s.writeErr = err
}
s.state = streamFinished
s.notifyWaiting()
Expand Down Expand Up @@ -426,17 +430,20 @@ func (s *Stream) processFlags(flags uint16, hdr header) {
}
if flags&flagRST == flagRST {
s.stateLock.Lock()
var resetErr error = ErrStreamReset
// Length in a window update frame with RST flag encodes an error code.
if hdr.MsgType() == typeWindowUpdate {
resetErr = &StreamError{Remote: true, ErrorCode: hdr.Length()}
}
if s.readState == halfOpen {
s.readState = halfReset
s.readErr = resetErr
}
if s.writeState == halfOpen {
s.writeState = halfReset
s.writeErr = resetErr
}
s.state = streamFinished
// Length in a window update frame with RST flag encodes an error code.
if hdr.MsgType() == typeWindowUpdate && s.resetErr == nil {
s.resetErr = &StreamError{Remote: true, ErrorCode: hdr.Length()}
}
s.stateLock.Unlock()
closeStream = true
s.notifyWaiting()
Expand Down

0 comments on commit 9190b78

Please sign in to comment.