Skip to content

Commit

Permalink
upstream: imp quic connector
Browse files Browse the repository at this point in the history
  • Loading branch information
EugeneOne1 committed Jan 29, 2024
1 parent 91810a5 commit 72a08d7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 135 deletions.
77 changes: 11 additions & 66 deletions upstream/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,10 @@ type dnsOverQUIC struct {
// tlsConf is the configuration of TLS.
tlsConf *tls.Config

// quicConfig is the QUIC configuration that is used for establishing
// connections to the upstream. This configuration includes the TokenStore
// that needs to be stored for the lifetime of dnsOverQUIC since we can
// re-create the connection.
quicConfig *quic.Config

// bytesPool is a *sync.Pool we use to store byte buffers in. These byte
// buffers are used to read responses from the upstream.
bytesPool *sync.Pool

// quicConfigMu protects quicConfig.
quicConfigMu *sync.Mutex

// bytesPoolGuard protects bytesPool.
bytesPoolMu *sync.Mutex

Expand All @@ -93,11 +84,6 @@ func newDoQ(addr *url.URL, opts *Options) (u Upstream, err error) {
uu := &dnsOverQUIC{
getDialer: newDialerInitializer(addr, opts),
addr: addr,
quicConfig: &quic.Config{
KeepAlivePeriod: QUICKeepAlivePeriod,
TokenStore: newQUICTokenStore(),
Tracer: opts.QUICTracer,
},
// #nosec G402 -- TLS certificate verification could be disabled by
// configuration.
tlsConf: &tls.Config{
Expand All @@ -114,11 +100,14 @@ func newDoQ(addr *url.URL, opts *Options) (u Upstream, err error) {
VerifyConnection: opts.VerifyConnection,
NextProtos: compatProtoDQ,
},
quicConfigMu: &sync.Mutex{},
bytesPoolMu: &sync.Mutex{},
timeout: opts.Timeout,
bytesPoolMu: &sync.Mutex{},
timeout: opts.Timeout,
}
uu.connector = newQUICConnector(uu)
uu.connector = newQUICConnector(uu, &quic.Config{
KeepAlivePeriod: QUICKeepAlivePeriod,
TokenStore: newQUICTokenStore(),
Tracer: opts.QUICTracer,
})

u = uu

Expand Down Expand Up @@ -168,7 +157,7 @@ func (p *dnsOverQUIC) Exchange(m *dns.Msg) (resp *dns.Msg, err error) {
// Close implements the [Upstream] interface for *dnsOverQUIC.
func (p *dnsOverQUIC) Close() (err error) {
runtime.SetFinalizer(p, nil)
p.connector.close()
p.connector.closeConn()

return err
}
Expand Down Expand Up @@ -242,25 +231,6 @@ func (p *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
return p.bytesPool
}

// getQUICConfig returns the QUIC config in a thread-safe manner. Note, that
// this method returns a pointer, it is forbidden to change its properties.
func (p *dnsOverQUIC) getQUICConfig() (c *quic.Config) {
p.quicConfigMu.Lock()
defer p.quicConfigMu.Unlock()

return p.quicConfig
}

// resetQUICConfig re-creates the tokens store as we may need to use a new one
// if we failed to connect.
func (p *dnsOverQUIC) resetQUICConfig() {
p.quicConfigMu.Lock()
defer p.quicConfigMu.Unlock()

p.quicConfig = p.quicConfig.Clone()
p.quicConfig.TokenStore = newQUICTokenStore()
}

// openStream opens a new QUIC stream for the specified connection.
func (p *dnsOverQUIC) openStream(conn quic.Connection) (stream quic.Stream, err error) {
ctx, cancel := p.withDeadline(context.Background())
Expand All @@ -279,10 +249,10 @@ func (p *dnsOverQUIC) openStream(conn quic.Connection) (stream quic.Stream, err
}

// type check
var _ quicConnHandler = (*dnsOverQUIC)(nil)
var _ quicConnOpener = (*dnsOverQUIC)(nil)

// openConnection dials a new QUIC connection.
func (p *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
func (p *dnsOverQUIC) openConnection(conf *quic.Config) (conn quic.Connection, err error) {
dialContext, err := p.getDialer()
if err != nil {
return nil, fmt.Errorf("bootstrapping %s: %w", p.addr, err)
Expand Down Expand Up @@ -312,39 +282,14 @@ func (p *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
ctx, cancel := p.withDeadline(context.Background())
defer cancel()

conn, err = quic.DialAddrEarly(ctx, addr, p.tlsConf.Clone(), p.getQUICConfig())
conn, err = quic.DialAddrEarly(ctx, addr, p.tlsConf.Clone(), conf)
if err != nil {
return nil, fmt.Errorf("dialing quic connection to %s: %w", p.addr, err)
}

return conn, nil
}

// closeConnWithError closes the active connection with error to make sure that
// new queries were processed in another connection. We can do that in the case
// of a fatal error.
func (p *dnsOverQUIC) closeConnWithError(conn quic.Connection, err error) {
if conn == nil {
// Do nothing, there's no active conn anyways.
return
}

code := QUICCodeNoError
if err != nil {
code = QUICCodeInternalError
}

if errors.Is(err, quic.Err0RTTRejected) {
// Reset the TokenStore only if 0-RTT was rejected.
p.resetQUICConfig()
}

err = conn.CloseWithError(code, "")
if err != nil {
log.Error("dnsproxy: failed to close the conn: %v", err)
}
}

// readMsg reads the incoming DNS message from the QUIC stream.
func (p *dnsOverQUIC) readMsg(stream quic.Stream) (m *dns.Msg, err error) {
pool := p.getBytesPool()
Expand Down
9 changes: 4 additions & 5 deletions upstream/quic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,13 @@ func TestDNSOverQUIC_closingConns(t *testing.T) {
checkUpstream(t, u, upsURL)

uq := testutil.RequireTypeAssert[*dnsOverQUIC](t, u)
hdlr := uq.connector.connHandler
uq.connector.connHandler = &testConnHandler{
OnOpenConnection: func() (quic.Connection, error) {
hdlr := uq.connector.opener
uq.connector.opener = &testConnOpener{
OnOpenConnection: func(conf *quic.Config) (quic.Connection, error) {
beforeExchange.Wait()

return hdlr.openConnection()
return hdlr.openConnection(conf)
},
OnCloseConn: hdlr.closeConnWithError,
}

uq.connector.reset()
Expand Down
115 changes: 75 additions & 40 deletions upstream/quicconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,60 @@ import (
"sync"

"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/quic-go/quic-go"
)

// quicConnOpener is used to open a QUIC connection.
type quicConnOpener interface {
// openConnection opens a QUIC connection with the specified configuration.
openConnection(conf *quic.Config) (conn quic.Connection, err error)
}

// ready is a semantic alias for a sign of a ready action.
type ready = struct{}

type quicConnHandler interface {
openConnection() (conn quic.Connection, err error)
closeConnWithError(conn quic.Connection, err error)
}

// quicConnector is used to establish a single connection on several demands.
type quicConnector struct {
conn quic.Connection
err error
connHandler quicConnHandler
reopenReady chan ready
resetReady chan ready
mu *sync.RWMutex
// conf is the QUIC configuration that is used for establishing connections
// to the upstream. This configuration includes the TokenStore that needs
// to be stored for re-creatin the connection on 0-RTT rejection.
conf *quic.Config

// mu protects the last establishment result.
mu *sync.RWMutex

// conn is the last established connection. It is nil if the connection is
// not established yet, closed or last connection establishment failed.
conn quic.Connection

// err is the error that occurred during the last connection establishment.
err error

// opener is used to open a new QUIC connection.
opener quicConnOpener

// openReady signs that the connection is ready to be established and either
// the last connection establishment failed or the connection was reset.
openReady chan ready

// resetReady signs that the connection can be closed for future
// establishment.
resetReady chan ready
}

// newQUICConnector creates a new quicConnector.
func newQUICConnector(hdlr quicConnHandler) (qc *quicConnector) {
func newQUICConnector(opener quicConnOpener, conf *quic.Config) (qc *quicConnector) {
qc = &quicConnector{
conn: nil,
err: errors.Error("not initialized"),
connHandler: hdlr,
reopenReady: make(chan ready, 1),
resetReady: make(chan ready, 1),
mu: &sync.RWMutex{},
conf: conf,
mu: &sync.RWMutex{},
conn: nil,
err: errors.Error("not initialized"),
opener: opener,
openReady: make(chan ready, 1),
resetReady: make(chan ready, 1),
}
qc.reopenReady <- ready{}
qc.openReady <- ready{}

return qc
}
Expand All @@ -44,7 +66,8 @@ func newQUICConnector(hdlr quicConnHandler) (qc *quicConnector) {
func (qc *quicConnector) reset() {
select {
case <-qc.resetReady:
qc.reopenReady <- ready{}
qc.closeConn()
qc.openReady <- ready{}
default:
// Already reset.
}
Expand All @@ -55,43 +78,55 @@ func (qc *quicConnector) reset() {
// to get will try to establish the connection again.
func (qc *quicConnector) get() (conn quic.Connection, err error) {
select {
case <-qc.reopenReady:
case <-qc.openReady:
qc.mu.Lock()
defer qc.mu.Unlock()

return qc.reopen()
qc.conn, qc.err = qc.opener.openConnection(qc.conf)
if qc.err != nil {
qc.openReady <- ready{}
} else {
qc.resetReady <- ready{}
}

return qc.conn, qc.err
default:
return qc.current()
}
}

func (qc *quicConnector) reopen() (conn quic.Connection, err error) {
if qc.conn != nil {
qc.connHandler.closeConnWithError(qc.conn, qc.err)
}

qc.conn, qc.err = qc.connHandler.openConnection()
if qc.err != nil {
qc.reopenReady <- ready{}
} else {
qc.resetReady <- ready{}
}

return qc.conn, qc.err
}

// current returns the last established connection and connecting error.
func (qc *quicConnector) current() (conn quic.Connection, err error) {
qc.mu.RLock()
defer qc.mu.RUnlock()

return qc.conn, qc.err
}

func (qc *quicConnector) close() {
// closeConn closes the connection with the specified error.
func (qc *quicConnector) closeConn() {
qc.mu.Lock()
defer qc.mu.Unlock()

if qc.conn != nil {
qc.connHandler.closeConnWithError(qc.conn, qc.err)
if qc.conn == nil {
return
}

code := QUICCodeNoError
if qc.err != nil {
code = QUICCodeInternalError
}

if errors.Is(qc.err, quic.Err0RTTRejected) {
// Reset the TokenStore only if 0-RTT was rejected.
qc.conf = qc.conf.Clone()
qc.conf.TokenStore = newQUICTokenStore()
}

err := qc.conn.CloseWithError(code, "")
if err != nil {
log.Error("dnsproxy: closing quic conn: %v", err)
}

qc.conn = nil
}
Loading

0 comments on commit 72a08d7

Please sign in to comment.