Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
use the Resource Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Dec 31, 2021
1 parent 47b90e0 commit 3bdf846
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(raddr string, p string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(port string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
20 changes: 18 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
)

type conn struct {
sess quic.Session
transport tpt.Transport
rcmgr network.ResourceManager

localPeer peer.ID
privKey ic.PrivKey
Expand All @@ -38,14 +40,28 @@ func (c *conn) IsClosed() bool {

// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) {
scope, err := c.rcmgr.OpenStream(c.remotePeerID, network.DirOutbound)
if err != nil {
return nil, err
}
qstr, err := c.sess.OpenStreamSync(ctx)
if err != nil {
scope.Done()
return nil, err
}
return &stream{Stream: qstr}, err
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (mux.MuxedStream, error) {
qstr, err := c.sess.AcceptStream(context.Background())
return &stream{Stream: qstr}, err
if err != nil {
return nil, err
}
if _, err := c.rcmgr.OpenStream(c.remotePeerID, network.DirInbound); err != nil {
return nil, err
}
return &stream{Stream: qstr}, nil
}

// LocalPeer returns our peer ID
Expand Down
38 changes: 19 additions & 19 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ var _ = Describe("Connection", func() {
})

It("handshakes on IPv4", func() {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand All @@ -95,13 +95,13 @@ var _ = Describe("Connection", func() {
})

It("handshakes on IPv6", func() {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip6/::1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand All @@ -121,13 +121,13 @@ var _ = Describe("Connection", func() {
})

It("opens and accepts streams", func() {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand All @@ -152,12 +152,12 @@ var _ = Describe("Connection", func() {
It("fails if the peer ID doesn't match", func() {
thirdPartyID, _ := createPeer()

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
// dial, but expect the wrong peer ID
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), thirdPartyID)
Expand All @@ -179,7 +179,7 @@ var _ = Describe("Connection", func() {
It("gates accepted connections", func() {
cg := NewMockConnectionGater(mockCtrl)
cg.EXPECT().InterceptAccept(gomock.Any())
serverTransport, err := NewTransport(serverKey, nil, cg)
serverTransport, err := NewTransport(serverKey, nil, cg, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -193,7 +193,7 @@ var _ = Describe("Connection", func() {
Expect(err).ToNot(HaveOccurred())
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
// make sure that connection attempts fails
Expand All @@ -214,7 +214,7 @@ var _ = Describe("Connection", func() {
})

It("gates secured connections", func() {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -223,7 +223,7 @@ var _ = Describe("Connection", func() {
cg := NewMockConnectionGater(mockCtrl)
cg.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any())

clientTransport, err := NewTransport(clientKey, nil, cg)
clientTransport, err := NewTransport(clientKey, nil, cg, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()

Expand All @@ -243,12 +243,12 @@ var _ = Describe("Connection", func() {
It("dials to two servers at the same time", func() {
serverID2, serverKey2 := createPeer()

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln1 := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln1.Close()
serverTransport2, err := NewTransport(serverKey2, nil, nil)
serverTransport2, err := NewTransport(serverKey2, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport2.(io.Closer).Close()
ln2 := runServer(serverTransport2, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -275,7 +275,7 @@ var _ = Describe("Connection", func() {
}
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
c1, err := clientTransport.Dial(context.Background(), ln1.Multiaddr(), serverID)
Expand Down Expand Up @@ -305,7 +305,7 @@ var _ = Describe("Connection", func() {
})

It("sends stateless resets", func() {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer serverTransport.(io.Closer).Close()
ln := runServer(serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -322,7 +322,7 @@ var _ = Describe("Connection", func() {
defer proxy.Close()

// establish a connection
clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer clientTransport.(io.Closer).Close()
proxyAddr, err := toQuicMultiaddr(proxy.LocalAddr())
Expand Down Expand Up @@ -365,7 +365,7 @@ var _ = Describe("Connection", func() {
})

It("hole punches", func() {
t1, err := NewTransport(serverKey, nil, nil)
t1, err := NewTransport(serverKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer t1.(io.Closer).Close()
laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -381,7 +381,7 @@ var _ = Describe("Connection", func() {
}
}()

t2, err := NewTransport(clientKey, nil, nil)
t2, err := NewTransport(clientKey, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
defer t2.(io.Closer).Close()
ln2, err := t2.Listen(laddr)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/google/gopacket v1.1.17
github.com/ipfs/go-log/v2 v2.4.0
github.com/klauspost/compress v1.11.7
github.com/libp2p/go-libp2p-core v0.10.0
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231090304-48c94b6fddec
github.com/libp2p/go-libp2p-tls v0.3.0
github.com/libp2p/go-netroute v0.1.3
github.com/lucas-clemente/quic-go v0.24.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.10.0 h1:jFy7v5Muq58GTeYkPhGzIH8Qq4BFfziqc0ixPd/pP9k=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231090304-48c94b6fddec h1:3TxVyBAlNFAJ1stkc/uoGwFqbXlvBg7bQH05MHMU2dQ=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231090304-48c94b6fddec/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
Expand Down
4 changes: 2 additions & 2 deletions integrationtests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func readKeys(hostKeyFile, peerKeyFile string) (crypto.PrivKey, crypto.PubKey, e
}

func runServer(hostKey crypto.PrivKey, peerKey crypto.PubKey, addr ma.Multiaddr, test string) error {
tr, err := libp2pquic.NewTransport(hostKey, nil, nil)
tr, err := libp2pquic.NewTransport(hostKey, nil, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func runServer(hostKey crypto.PrivKey, peerKey crypto.PubKey, addr ma.Multiaddr,
}

func runClient(hostKey crypto.PrivKey, serverKey crypto.PubKey, addr ma.Multiaddr, test string) error {
tr, err := libp2pquic.NewTransport(hostKey, nil, nil)
tr, err := libp2pquic.NewTransport(hostKey, nil, nil, nil)
if err != nil {
return err
}
Expand Down
18 changes: 15 additions & 3 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net"

ic "github.com/libp2p/go-libp2p-core/crypto"
n "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

Expand All @@ -23,14 +23,18 @@ type listener struct {
quicListener quic.Listener
conn *reuseConn
transport *transport
rcmgr network.ResourceManager
privKey ic.PrivKey
localPeer peer.ID
localMultiaddr ma.Multiaddr
}

var _ tpt.Listener = &listener{}

func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity) (tpt.Listener, error) {
func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivKey, identity *p2ptls.Identity, rcmgr network.ResourceManager) (tpt.Listener, error) {
if rcmgr == nil {
panic("nil rcmgr")
}
var tlsConf tls.Config
tlsConf.GetConfigForClient = func(_ *tls.ClientHelloInfo) (*tls.Config, error) {
// return a tls.Config that verifies the peer's certificate chain.
Expand All @@ -52,6 +56,7 @@ func newListener(rconn *reuseConn, t *transport, localPeer peer.ID, key ic.PrivK
conn: rconn,
quicListener: ln,
transport: t,
rcmgr: rcmgr,
privKey: key,
localPeer: localPeer,
localMultiaddr: localMultiaddr,
Expand All @@ -70,7 +75,13 @@ func (l *listener) Accept() (tpt.CapableConn, error) {
sess.CloseWithError(0, err.Error())
continue
}
if l.transport.gater != nil && !(l.transport.gater.InterceptAccept(conn) && l.transport.gater.InterceptSecured(n.DirInbound, conn.remotePeerID, conn)) {
connScope, err := l.rcmgr.OpenConnection(network.DirInbound, false)
if err != nil {
sess.CloseWithError(0, err.Error())
continue
}
if l.transport.gater != nil && !(l.transport.gater.InterceptAccept(conn) && l.transport.gater.InterceptSecured(network.DirInbound, conn.remotePeerID, conn)) {
connScope.Done()
sess.CloseWithError(errorCodeConnectionGating, "connection gated")
continue
}
Expand Down Expand Up @@ -116,6 +127,7 @@ func (l *listener) setupConn(sess quic.Session) (*conn, error) {
return &conn{
sess: sess,
transport: l.transport,
rcmgr: l.rcmgr,
localPeer: l.localPeer,
localMultiaddr: l.localMultiaddr,
privKey: l.privKey,
Expand Down
2 changes: 1 addition & 1 deletion listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var _ = Describe("Listener", func() {
Expect(err).ToNot(HaveOccurred())
key, err := ic.UnmarshalRsaPrivateKey(x509.MarshalPKCS1PrivateKey(rsaKey))
Expect(err).ToNot(HaveOccurred())
t, err = NewTransport(key, nil, nil)
t, err = NewTransport(key, nil, nil, nil)
Expect(err).ToNot(HaveOccurred())
})

Expand Down
Loading

0 comments on commit 3bdf846

Please sign in to comment.