Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Replicator FD leak: make sure client side keeps reading until it gets…
Browse files Browse the repository at this point in the history
… an error (#266)

* Replicator FD leak: make sure client side keeps reading until it gets an error

* move stream.done() to write pump
  • Loading branch information
datoug authored and kobeyang committed Aug 7, 2017
1 parent 8b0c659 commit 091cba4
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions services/replicator/outconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func (conn *outConnection) close() {

func (conn *outConnection) writeCreditsStream() {
defer conn.stream.Done()

if err := conn.sendCredits(initialCreditSize); err != nil {
conn.logger.Error(`error writing initial credits`)

Expand Down Expand Up @@ -152,6 +151,9 @@ func (conn *outConnection) readMsgStream() {
var sealMsgRead bool
var numMsgsRead int32

// Note we must continue read until we hit an error before returning from this function
// Because the websocket client only tear down the underlying connection when it gets a read error
readloop:
for {
rmc, err := conn.stream.Read()
if err != nil {
Expand All @@ -169,7 +171,7 @@ func (conn *outConnection) readMsgStream() {
"seqNum": msg.Message.GetSequenceNumber(),
}).Error("regular message read after seal message")
go conn.close()
return
continue readloop
}

// Sequence number check to make sure we get monotonically increasing sequence number.
Expand All @@ -181,7 +183,7 @@ func (conn *outConnection) readMsgStream() {
"expectedSeqNum": expectedSeqNum,
}).Error("sequence number out of order")
go conn.close()
return
continue readloop
}

// update the lastSeqNum to this value
Expand All @@ -198,7 +200,7 @@ func (conn *outConnection) readMsgStream() {
atomic.StoreInt64(&conn.lastMsgReplicatedTime, time.Now().UnixNano())
case <-conn.closeChannel:
conn.logger.Info(`writing msg to the channel failed because of shutdown`)
return
continue readloop
}

case store.ReadMessageContentType_SEALED:
Expand All @@ -215,15 +217,16 @@ func (conn *outConnection) readMsgStream() {
atomic.StoreInt64(&conn.lastMsgReplicatedTime, time.Now().UnixNano())
case <-conn.closeChannel:
conn.logger.Info(`writing msg to the channel failed because of shutdown`)
return
}

return
continue readloop

case store.ReadMessageContentType_ERROR:
msgErr := rmc.GetError()
conn.logger.WithField(`Message`, msgErr.GetMessage()).Error(`received error from reading msg`)
go conn.close()
return
continue readloop

default:
conn.logger.WithField(`Type`, rmc.GetType()).Error(`received ReadMessageContent with unrecognized type`)
}
Expand Down

0 comments on commit 091cba4

Please sign in to comment.