Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
add support for the resource manager
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 1, 2022
1 parent 0531f9f commit 03c310e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 22 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ require (
github.com/ipfs/go-log/v2 v2.4.0
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-conn-security-multistream v0.3.0
github.com/libp2p/go-libp2p-core v0.13.0
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-quic-transport v0.13.0
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220101100156-1e9d06560d6a
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce
github.com/libp2p/go-libp2p-yamux v0.5.0
github.com/libp2p/go-stream-muxer-multistream v0.3.0
github.com/libp2p/go-tcp-transport v0.4.0
github.com/libp2p/go-tcp-transport v0.4.1-0.20220101094718-c6ace780f863
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/stretchr/testify v1.7.0
Expand Down
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,24 @@ github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.0 h1:IFG/s8dN6JN2OTrXX9eq2wNU/Zlz2KLdwZUp5FplgXI=
github.com/libp2p/go-libp2p-core v0.13.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478 h1:V4qhaW1u7hJ54vditELylooLRTTcc7wmirh0yPgcxaA=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw=
github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc=
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220101100156-1e9d06560d6a h1:HITWbMM1PmUJqYs2DK3iT09SIz7Bp85CC3yEkZ/ApQs=
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220101100156-1e9d06560d6a/go.mod h1:PnciuaUDq/F8OHLiyNGeof9c6PIJQbEHxEbhK/ceNMw=
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc=
github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE=
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 h1:GfMCU+2aGGEm1zW3UcOz6wYSn8tXQalFfVfcww99i5A=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0/go.mod h1:1e07y1ZSZdHo9HPbuU8IztM1Cj+DR5twgycb4pnRzRo=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce h1:c/1P03KzVcucwj8CaREviCNpYHLYfyuKQzFLT80BnZ4=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce/go.mod h1:gRl3evtqJSAjlPvzYGVp3zBPWd0/ni/DUhB3wA67Krg=
github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys=
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
Expand All @@ -321,14 +320,14 @@ github.com/libp2p/go-sockaddr v0.1.0 h1:Y4s3/jNoryVRKEBrkJ576F17CPOaMIzUeCsg7dlT
github.com/libp2p/go-sockaddr v0.1.0/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
github.com/libp2p/go-stream-muxer-multistream v0.3.0 h1:TqnSHPJEIqDEO7h1wZZ0p3DXdvDSiLHQidKKUGZtiOY=
github.com/libp2p/go-stream-muxer-multistream v0.3.0/go.mod h1:yDh8abSIzmZtqtOt64gFJUXEryejzNb0lisTt+fAMJA=
github.com/libp2p/go-tcp-transport v0.4.0 h1:VDyg4j6en3OuXf90gfDQh5Sy9KowO9udnd0OU8PP6zg=
github.com/libp2p/go-tcp-transport v0.4.0/go.mod h1:0y52Rwrn4076xdJYu/51/qJIdxz+EWDAOG2S45sV3VI=
github.com/libp2p/go-tcp-transport v0.4.1-0.20220101094718-c6ace780f863 h1:otblp7EBZUfzrz5TyD8kRSehvT5AWiD18lfnFkvpreE=
github.com/libp2p/go-tcp-transport v0.4.1-0.20220101094718-c6ace780f863/go.mod h1:ftGTKY2jtyDjT6jdjHYxLQO7tPnnPrr3Z28qeSvcLiU=
github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI=
github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lucas-clemente/quic-go v0.23.0 h1:5vFnKtZ6nHDFsc/F3uuiF4T3y/AXaQdxjUqiVw26GZE=
github.com/lucas-clemente/quic-go v0.23.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0=
github.com/lucas-clemente/quic-go v0.24.0 h1:ToR7SIIEdrgOhgVTHvPgdVRJfgVy+N0wQAagH7L4d5g=
github.com/lucas-clemente/quic-go v0.24.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down Expand Up @@ -747,7 +746,6 @@ golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
16 changes: 16 additions & 0 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ func WithDialTimeoutLocal(t time.Duration) Option {
}
}

func WithResourceManager(m network.ResourceManager) Option {
return func(s *Swarm) error {
s.rcmgr = m
return nil
}
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
Expand All @@ -88,6 +95,8 @@ type Swarm struct {
// down before continuing.
refs sync.WaitGroup

rcmgr network.ResourceManager

local peer.ID
peers peerstore.Peerstore

Expand Down Expand Up @@ -156,6 +165,9 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
return nil, err
}
}
if s.rcmgr == nil {
s.rcmgr = network.NullResourceManager
}

s.dsync = newDialSync(s.dialWorkerLoop)
s.limiter = newDialLimiter(s.dialAddr)
Expand Down Expand Up @@ -586,6 +598,10 @@ func (s *Swarm) String() string {
return fmt.Sprintf("<Swarm %s>", s.LocalPeer())
}

func (s *Swarm) ResourceManager() network.ResourceManager {
return s.rcmgr
}

// Swarm is a Network.
var _ network.Network = (*Swarm)(nil)
var _ transport.TransportNetwork = (*Swarm)(nil)
24 changes: 20 additions & 4 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Conn struct {
stat network.ConnStats
}

var _ network.Conn = &Conn{}

func (c *Conn) ID() string {
// format: <first 10 chars of peer id>-<global conn ordinal>
return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id)
Expand Down Expand Up @@ -109,9 +111,14 @@ func (c *Conn) start() {
if err != nil {
return
}
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound)
if err != nil {
ts.Reset()
continue
}
c.swarm.refs.Add(1)
go func() {
s, err := c.addStream(ts, network.DirInbound)
s, err := c.addStream(ts, network.DirInbound, scope)

// Don't defer this. We don't want to block
// swarm shutdown on the connection handler.
Expand Down Expand Up @@ -186,19 +193,23 @@ func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) {
}
}

scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirOutbound)
if err != nil {
return nil, err
}
ts, err := c.conn.OpenStream(ctx)

if err != nil {
return nil, err
}
return c.addStream(ts, network.DirOutbound)
return c.addStream(ts, network.DirOutbound, scope)
}

func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, error) {
func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction, scope network.StreamManagementScope) (*Stream, error) {
c.streams.Lock()
// Are we still online?
if c.streams.m == nil {
c.streams.Unlock()
scope.Done()
ts.Reset()
return nil, ErrConnClosed
}
Expand All @@ -207,6 +218,7 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
s := &Stream{
stream: ts,
conn: c,
scope: scope,
stat: network.Stats{
Direction: dir,
Opened: time.Now(),
Expand Down Expand Up @@ -244,3 +256,7 @@ func (c *Conn) GetStreams() []network.Stream {
}
return streams
}

func (c *Conn) Scope() network.ConnectionScope {
return c.conn.Scope()
}
5 changes: 5 additions & 0 deletions swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Stream struct {

stream mux.MuxedStream
conn *Conn
scope network.StreamScope

closeOnce sync.Once

Expand Down Expand Up @@ -154,3 +155,7 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
func (s *Stream) Stat() network.Stats {
return s.stat
}

func (s *Stream) Scope() network.StreamScope {
return s.scope
}
4 changes: 2 additions & 2 deletions testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
if cfg.disableReuseport {
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
}
tcpTransport, err := tcp.NewTCPTransport(upgrader, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand All @@ -154,7 +154,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
}
}
if !cfg.disableQUIC {
quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater)
quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater, nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 03c310e

Please sign in to comment.