From 2b60a61f1e4cf3a5ecded0bd7e77ea168289e6de Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Tue, 3 Oct 2023 13:53:44 -0700 Subject: [PATCH] quic: fix several bugs in flow control accounting Connection-level flow control sets a bound on the total maximum stream offset of all data sent, not the total amount of bytes sent in STREAM frames. For example, if we send the bytes [0,10) for a stream, and then retransmit the same bytes due to packet loss, that consumes 10 bytes of connection-level flow, not 20. We were incorrectly tracking total bytes sent. Fix this. We were blocking retransmission of data in lost STREAM frames on availability of connection-level flow control. We now place a stream with retransmitted data on queueMeta (non-flow-controlled data), since we have already accounted for the flow control window consumption of the data. We were incorrectly marking a stream as being able to send an empty STREAM frame with a FIN bit, when the stream was actually blocked on stream-level flow control. Fix this. For golang/go#58547 Change-Id: Ib2ace94183750078a19d945256507060ea786735 Reviewed-on: https://go-review.googlesource.com/c/net/+/532716 LUCI-TryBot-Result: Go LUCI Reviewed-by: Jonathan Amsterdam --- internal/quic/conn_flow_test.go | 34 +++++++++++++++++++++++++++++ internal/quic/stream.go | 23 +++++++++++++++----- internal/quic/stream_test.go | 38 +++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go index d5ee74ebd..03e0757a6 100644 --- a/internal/quic/conn_flow_test.go +++ b/internal/quic/conn_flow_test.go @@ -394,3 +394,37 @@ func TestConnOutflowMetaAndData(t *testing.T) { data: data, }) } + +func TestConnOutflowResentData(t *testing.T) { + tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream, + permissiveTransportParameters, + func(p *transportParameters) { + p.initialMaxData = 10 + }) + tc.ignoreFrame(frameTypeAck) + + data := makeTestData(15) + s.Write(data[:8]) + tc.wantFrame("data is under MAX_DATA limit, all sent", + packetType1RTT, debugFrameStream{ + id: s.id, + data: data[:8], + }) + + // Lose the last STREAM packet. + const pto = false + tc.triggerLossOrPTO(packetType1RTT, false) + tc.wantFrame("lost STREAM data is retransmitted", + packetType1RTT, debugFrameStream{ + id: s.id, + data: data[:8], + }) + + s.Write(data[8:]) + tc.wantFrame("new data is sent up to the MAX_DATA limit", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 8, + data: data[8:10], + }) +} diff --git a/internal/quic/stream.go b/internal/quic/stream.go index 9310811c1..89036b19b 100644 --- a/internal/quic/stream.go +++ b/internal/quic/stream.go @@ -39,6 +39,7 @@ type Stream struct { outgate gate out pipe // buffered data to send outwin int64 // maximum MAX_STREAM_DATA received from the peer + outmaxsent int64 // maximum data offset we've sent to the peer outmaxbuf int64 // maximum amount of data we will buffer outunsent rangeset[int64] // ranges buffered but not yet sent outacked rangeset[int64] // ranges sent and acknowledged @@ -494,8 +495,12 @@ func (s *Stream) outUnlockNoQueue() streamState { case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED state = streamOutSendMeta case len(s.outunsent) > 0: // STREAM frame with data - state = streamOutSendData - case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent + if s.outunsent.min() < s.outmaxsent { + state = streamOutSendMeta // resent data, will not consume flow control + } else { + state = streamOutSendData // new data, requires flow control + } + case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit state = streamOutSendMeta case s.outopened.shouldSend(): // STREAM frame with no data state = streamOutSendMeta @@ -725,7 +730,11 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b for { // STREAM off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto) - size = min(size, s.conn.streams.outflow.avail()) + if end := off + size; end > s.outmaxsent { + // This will require connection-level flow control to send. + end = min(end, s.outmaxsent+s.conn.streams.outflow.avail()) + size = end - off + } fin := s.outclosed.isSet() && off+size == s.out.end shouldSend := size > 0 || // have data to send s.outopened.shouldSendPTO(pto) || // should open the stream @@ -738,8 +747,12 @@ func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto b return false } s.out.copy(off, b) - s.conn.streams.outflow.consume(int64(len(b))) - s.outunsent.sub(off, off+int64(len(b))) + end := off + int64(len(b)) + if end > s.outmaxsent { + s.conn.streams.outflow.consume(end - s.outmaxsent) + s.outmaxsent = end + } + s.outunsent.sub(off, end) s.frameOpensStream(pnum) if fin { s.outclosed.setSent(pnum) diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go index 86eebc698..7c1377fae 100644 --- a/internal/quic/stream_test.go +++ b/internal/quic/stream_test.go @@ -1094,6 +1094,44 @@ func TestStreamCloseUnblocked(t *testing.T) { } } +func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) { + ctx := canceledContext() + tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters, + func(p *transportParameters) { + //p.initialMaxData = 0 + p.initialMaxStreamDataUni = 0 + }) + tc.ignoreFrame(frameTypeStreamDataBlocked) + if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil { + t.Fatalf("s.Write = %v", err) + } + s.CloseWrite() + tc.wantIdle("stream write is blocked by flow control") + + tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{ + id: s.id, + max: 1, + }) + tc.wantFrame("send data up to flow control limit", + packetType1RTT, debugFrameStream{ + id: s.id, + data: []byte{0}, + }) + tc.wantIdle("stream write is again blocked by flow control") + + tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{ + id: s.id, + max: 2, + }) + tc.wantFrame("send remaining data and FIN", + packetType1RTT, debugFrameStream{ + id: s.id, + off: 1, + data: []byte{1}, + fin: true, + }) +} + func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) { testStreamTypes(t, "", func(t *testing.T, styp streamType) { ctx := canceledContext()