Skip to content

Commit

Permalink
internal/http3: refactor in prep for sharing transport/server code
Browse files Browse the repository at this point in the history
Pull out various elements of the HTTP/3 client that can be
reused in the server. Move tests which can apply to client or server
connections into conn_test.go.

For golang/go#70914

Change-Id: I72b5eab55ba27df980ab2079120613f175b05927
Reviewed-on: https://go-review.googlesource.com/c/net/+/646616
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Auto-Submit: Damien Neil <dneil@google.com>
  • Loading branch information
neild authored and gopherbot committed Feb 5, 2025
1 parent ebd23f8 commit b914489
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 308 deletions.
116 changes: 116 additions & 0 deletions internal/http3/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.24

package http3

import (
"context"
"io"
"sync"

"golang.org/x/net/quic"
)

type streamHandler interface {
handleControlStream(*stream) error
handlePushStream(*stream) error
handleEncoderStream(*stream) error
handleDecoderStream(*stream) error
handleRequestStream(*stream)
abort(error)
}

type genericConn struct {
mu sync.Mutex

// The peer may create exactly one control, encoder, and decoder stream.
// streamsCreated is a bitset of streams created so far.
// Bits are 1 << streamType.
streamsCreated uint8
}

func (c *genericConn) acceptStreams(qconn *quic.Conn, h streamHandler) {
for {
// Use context.Background: This blocks until a stream is accepted
// or the connection closes.
st, err := qconn.AcceptStream(context.Background())
if err != nil {
return // connection closed
}
if st.IsReadOnly() {
go c.handleUnidirectionalStream(newStream(st), h)
} else {
go h.handleRequestStream(newStream(st))
}
}
}

func (c *genericConn) handleUnidirectionalStream(st *stream, h streamHandler) {
// Unidirectional stream header: One varint with the stream type.
v, err := st.readVarint()
if err != nil {
h.abort(&connectionError{
code: errH3StreamCreationError,
message: "error reading unidirectional stream header",
})
return
}
stype := streamType(v)
if err := c.checkStreamCreation(stype); err != nil {
h.abort(err)
return
}
switch stype {
case streamTypeControl:
err = h.handleControlStream(st)
case streamTypePush:
err = h.handlePushStream(st)
case streamTypeEncoder:
err = h.handleEncoderStream(st)
case streamTypeDecoder:
err = h.handleDecoderStream(st)
default:
// "Recipients of unknown stream types MUST either abort reading
// of the stream or discard incoming data without further processing."
// https://www.rfc-editor.org/rfc/rfc9114.html#section-6.2-7
//
// We should send the H3_STREAM_CREATION_ERROR error code,
// but the quic package currently doesn't allow setting error codes
// for STOP_SENDING frames.
// TODO: Should CloseRead take an error code?
st.stream.CloseRead()
err = nil
}
if err == io.EOF {
err = &connectionError{
code: errH3ClosedCriticalStream,
message: streamType(stype).String() + " stream closed",
}
}
if err != nil {
h.abort(err)
}
}

func (c *genericConn) checkStreamCreation(stype streamType) error {
switch stype {
case streamTypeControl, streamTypeEncoder, streamTypeDecoder:
// The peer may create exactly one control, encoder, and decoder stream.
default:
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
bit := uint8(1) << stype
if c.streamsCreated&bit != 0 {
return &connectionError{
code: errH3StreamCreationError,
message: "multiple " + stype.String() + " streams created",
}
}
c.streamsCreated |= bit
return nil
}
149 changes: 149 additions & 0 deletions internal/http3/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.24 && goexperiment.synctest

package http3

import (
"testing"
"testing/synctest"
)

// Tests which apply to both client and server connections.

func TestConnCreatesControlStream(t *testing.T) {
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
controlStream := tc.wantStream(streamTypeControl)
controlStream.wantFrameHeader(
"server sends SETTINGS frame on control stream",
frameTypeSettings)
controlStream.discardFrame()
})
}

func TestConnUnknownUnidirectionalStream(t *testing.T) {
// "Recipients of unknown stream types MUST either abort reading of the stream
// or discard incoming data without further processing."
// https://www.rfc-editor.org/rfc/rfc9114.html#section-6.2-7
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
st := tc.newStream(0x21) // reserved stream type

// The endpoint should send a STOP_SENDING for this stream,
// but it should not close the connection.
synctest.Wait()
if _, err := st.Write([]byte("hello")); err == nil {
t.Fatalf("write to send-only stream with an unknown type succeeded; want error")
}
tc.wantNotClosed("after receiving unknown unidirectional stream type")
})
}

func TestConnUnknownSettings(t *testing.T) {
// "An implementation MUST ignore any [settings] parameter with
// an identifier it does not understand."
// https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4-9
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
controlStream := tc.newStream(streamTypeControl)
controlStream.writeSettings(0x1f+0x21, 0) // reserved settings type
controlStream.Flush()
tc.wantNotClosed("after receiving unknown settings")
})
}

func TestConnInvalidSettings(t *testing.T) {
// "These reserved settings MUST NOT be sent, and their receipt MUST
// be treated as a connection error of type H3_SETTINGS_ERROR."
// https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-5
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
controlStream := tc.newStream(streamTypeControl)
controlStream.writeSettings(0x02, 0) // HTTP/2 SETTINGS_ENABLE_PUSH
controlStream.Flush()
tc.wantClosed("invalid setting", errH3SettingsError)
})
}

func TestConnDuplicateStream(t *testing.T) {
for _, stype := range []streamType{
streamTypeControl,
streamTypeEncoder,
streamTypeDecoder,
} {
t.Run(stype.String(), func(t *testing.T) {
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
_ = tc.newStream(stype)
tc.wantNotClosed("after creating one " + stype.String() + " stream")

// Opening a second control, encoder, or decoder stream
// is a protocol violation.
_ = tc.newStream(stype)
tc.wantClosed("duplicate stream", errH3StreamCreationError)
})
})
}
}

func TestConnUnknownFrames(t *testing.T) {
for _, stype := range []streamType{
streamTypeControl,
} {
t.Run(stype.String(), func(t *testing.T) {
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
st := tc.newStream(stype)

if stype == streamTypeControl {
// First frame on the control stream must be settings.
st.writeVarint(int64(frameTypeSettings))
st.writeVarint(0) // size
}

data := "frame content"
st.writeVarint(0x1f + 0x21) // reserved frame type
st.writeVarint(int64(len(data))) // size
st.Write([]byte(data))
st.Flush()

tc.wantNotClosed("after writing unknown frame")
})
})
}
}

func TestConnInvalidFrames(t *testing.T) {
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
control := tc.newStream(streamTypeControl)

// SETTINGS frame.
control.writeVarint(int64(frameTypeSettings))
control.writeVarint(0) // size

// DATA frame (invalid on the control stream).
control.writeVarint(int64(frameTypeData))
control.writeVarint(0) // size
control.Flush()
tc.wantClosed("after writing DATA frame to control stream", errH3FrameUnexpected)
})
}

func TestConnPeerCreatesBadUnidirectionalStream(t *testing.T) {
runConnTest(t, func(t testing.TB, tc *testQUICConn) {
// Create and close a stream without sending the unidirectional stream header.
qs, err := tc.qconn.NewSendOnlyStream(canceledCtx)
if err != nil {
t.Fatal(err)
}
st := newTestQUICStream(tc.t, newStream(qs))
st.stream.stream.Close()

tc.wantClosed("after peer creates and closes uni stream", errH3StreamCreationError)
})
}

func runConnTest(t *testing.T, f func(testing.TB, *testQUICConn)) {
t.Helper()
runSynctestSubtest(t, "client", func(t testing.TB) {
tc := newTestClientConn(t)
f(t, tc.testQUICConn)
})
}
42 changes: 42 additions & 0 deletions internal/http3/quic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.24

package http3

import (
"crypto/tls"

"golang.org/x/net/quic"
)

func initConfig(config *quic.Config) *quic.Config {
if config == nil {
config = &quic.Config{}
}

// maybeCloneTLSConfig clones the user-provided tls.Config (but only once)
// prior to us modifying it.
needCloneTLSConfig := true
maybeCloneTLSConfig := func() *tls.Config {
if needCloneTLSConfig {
config.TLSConfig = config.TLSConfig.Clone()
needCloneTLSConfig = false
}
return config.TLSConfig
}

if config.TLSConfig == nil {
config.TLSConfig = &tls.Config{}
needCloneTLSConfig = false
}
if config.TLSConfig.MinVersion == 0 {
maybeCloneTLSConfig().MinVersion = tls.VersionTLS13
}
if config.TLSConfig.NextProtos == nil {
maybeCloneTLSConfig().NextProtos = []string{"h3"}
}
return config
}
31 changes: 15 additions & 16 deletions internal/http3/quic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,8 @@ func newQUICEndpointPair(t testing.TB) (e1, e2 *quic.Endpoint) {
TLSConfig: testTLSConfig,
}
tn := &testNet{}
pc1 := tn.newPacketConn()
e1, err := quic.NewEndpoint(pc1, config)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
e1.Close(t.Context())
})
pc2 := tn.newPacketConn()
e2, err = quic.NewEndpoint(pc2, config)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
e2.Close(t.Context())
})
e1 = tn.newQUICEndpoint(t, config)
e2 = tn.newQUICEndpoint(t, config)
return e1, e2
}

Expand Down Expand Up @@ -121,6 +107,19 @@ func (tn *testNet) newPacketConn() *testPacketConn {
return tc
}

func (tn *testNet) newQUICEndpoint(t testing.TB, config *quic.Config) *quic.Endpoint {
t.Helper()
pc := tn.newPacketConn()
e, err := quic.NewEndpoint(pc, config)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
e.Close(t.Context())
})
return e
}

// connForAddr returns the conn with the given source address.
func (tn *testNet) connForAddr(srcAddr netip.AddrPort) *testPacketConn {
tn.mu.Lock()
Expand Down
Loading

0 comments on commit b914489

Please sign in to comment.