diff --git a/go.mod b/go.mod index d4d477ea..5b83e6af 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,20 @@ module github.com/libp2p/go-libp2p-swarm go 1.16 require ( + github.com/golang/mock v1.6.0 github.com/ipfs/go-log/v2 v2.5.0 github.com/libp2p/go-conn-security-multistream v0.3.0 - github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 + github.com/libp2p/go-libp2p-core v0.14.0 github.com/libp2p/go-libp2p-peerstore v0.6.0 - github.com/libp2p/go-libp2p-quic-transport v0.13.0 - github.com/libp2p/go-libp2p-testing v0.5.0 - github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 - github.com/libp2p/go-libp2p-yamux v0.5.0 - github.com/libp2p/go-stream-muxer-multistream v0.3.0 - github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5 + github.com/libp2p/go-libp2p-quic-transport v0.15.2 + github.com/libp2p/go-libp2p-testing v0.7.0 + github.com/libp2p/go-libp2p-transport-upgrader v0.7.0 + github.com/libp2p/go-libp2p-yamux v0.8.0 + github.com/libp2p/go-stream-muxer-multistream v0.4.0 + github.com/libp2p/go-tcp-transport v0.5.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multiaddr-fmt v0.1.0 + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/stretchr/testify v1.7.0 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 ) diff --git a/go.sum b/go.sum index 7a2ca70f..a0b0f77f 100644 --- a/go.sum +++ b/go.sum @@ -220,7 +220,6 @@ github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1Y github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY= github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs= github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= -github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g= github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4= github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= @@ -264,7 +263,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-conn-security-multistream v0.3.0 h1:9UCIKlBL1hC9u7nkMXpD1nkc/T53PKMAn3/k9ivBAVc= @@ -273,34 +271,31 @@ github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0= -github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= -github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 h1:b/pMmgc5EV+dqSc+MjkX5xPa1nV6EKiOb0L0XT03Lic= -github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE= -github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc= -github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g= +github.com/libp2p/go-libp2p-core v0.14.0 h1:0kYSgiK/D7Eo28GTuRXo5YHsWwAisVpFCqCVPUd/vJs= +github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsbrhG7q6pGrHtBg8= +github.com/libp2p/go-libp2p-mplex v0.5.0 h1:vt3k4E4HSND9XH4Z8rUpacPJFSAgLOv6HDvG8W9Ks9E= +github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M= github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw= -github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc= +github.com/libp2p/go-libp2p-quic-transport v0.15.2 h1:wHBEceRy+1/8Ec8dAIyr+/P7L2YefIGprPVy5LrMM+k= +github.com/libp2p/go-libp2p-quic-transport v0.15.2/go.mod h1:wv4uGwjcqe8Mhjj7N/Ic0aKjA+/10UnMlSzLO0yRpYQ= github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc= -github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0= -github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE= github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A= +github.com/libp2p/go-libp2p-testing v0.7.0 h1:9bfyhNINizxuLrKsenzGaZalXRXIaAEmx1BP/PzF1gM= +github.com/libp2p/go-libp2p-testing v0.7.0/go.mod h1:OLbdn9DbgdMwv00v+tlp1l3oe2Cl+FAjoWIA2pa0X6E= github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8= github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY= -github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 h1:eD/QJCpcImYOUl6MdBuxMByVaEe5VMm463zJG6oUg9o= -github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0/go.mod h1:ByIyNe8asQhgcyIHetb4f+UgV+hDrA8pQ3L/TgNs+RI= -github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys= -github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po= +github.com/libp2p/go-libp2p-transport-upgrader v0.7.0 h1:ADnLrL7fC4Vy7HPjk9oGof7nDeTqGXuof85Ar6kin9Q= +github.com/libp2p/go-libp2p-transport-upgrader v0.7.0/go.mod h1:GIR2aTRp1J5yjVlkUoFqMkdobfob6RnAwYg/RZPhrzg= +github.com/libp2p/go-libp2p-yamux v0.8.0 h1:APQYlttIj+Rr5sfa6siojwsi0ZwcIh/exHIUl9hZr6o= +github.com/libp2p/go-libp2p-yamux v0.8.0/go.mod h1:yTkPgN2ib8FHyU1ZcVD7aelzyAqXXwEPbyx+aSKm9h8= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= -github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU= -github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ= -github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= +github.com/libp2p/go-mplex v0.4.0 h1:Ukkez9/4EOX5rTw4sHefNJp10dksftAA05ZgyjplUbM= +github.com/libp2p/go-mplex v0.4.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E= github.com/libp2p/go-msgio v0.0.6 h1:lQ7Uc0kS1wb1EfRxO2Eir/RJoHkHn7t6o+EiwsYIKJA= github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA= github.com/libp2p/go-netroute v0.1.3/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk= @@ -316,16 +311,16 @@ github.com/libp2p/go-reuseport-transport v0.1.0/go.mod h1:vev0C0uMkzriDY59yFHD9v github.com/libp2p/go-sockaddr v0.0.2/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= github.com/libp2p/go-sockaddr v0.1.0 h1:Y4s3/jNoryVRKEBrkJ576F17CPOaMIzUeCsg7dlTDj0= github.com/libp2p/go-sockaddr v0.1.0/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k= -github.com/libp2p/go-stream-muxer-multistream v0.3.0 h1:TqnSHPJEIqDEO7h1wZZ0p3DXdvDSiLHQidKKUGZtiOY= -github.com/libp2p/go-stream-muxer-multistream v0.3.0/go.mod h1:yDh8abSIzmZtqtOt64gFJUXEryejzNb0lisTt+fAMJA= -github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5 h1:/x3GSszKipn1nlKY0C5at59fBLYyJeObd5gm32DrobM= -github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5/go.mod h1:YPwlF5gW5BnFikKoQBuJeQkPXAn+z2wTzDpJKamkgjY= -github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI= -github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= +github.com/libp2p/go-stream-muxer-multistream v0.4.0 h1:HsM/9OdtqnIzjVXcxTXjmqKrj3gJ8kacaOJwJS1ipaY= +github.com/libp2p/go-stream-muxer-multistream v0.4.0/go.mod h1:nb+dGViZleRP4XcyHuZSVrJCBl55nRBOMmiSL/dyziw= +github.com/libp2p/go-tcp-transport v0.5.0 h1:3ZPW8HAuyRAuFzyabE0hSrCXKKSWzROnZZX7DtcIatY= +github.com/libp2p/go-tcp-transport v0.5.0/go.mod h1:UPPL0DIjQqiWRwVAb+CEQlaAG0rp/mCqJfIhFcLHc4Y= +github.com/libp2p/go-yamux/v3 v3.0.1 h1:lIdxHGVZ+y/EHgCrqGNt4Q+Mk9qu26MbOWH/yRw+Ihk= +github.com/libp2p/go-yamux/v3 v3.0.1/go.mod h1:s2LsDhHbh+RfCsQoICSYt58U2f8ijtPANFD8BmE74Bo= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= -github.com/lucas-clemente/quic-go v0.23.0 h1:5vFnKtZ6nHDFsc/F3uuiF4T3y/AXaQdxjUqiVw26GZE= -github.com/lucas-clemente/quic-go v0.23.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0= +github.com/lucas-clemente/quic-go v0.24.0 h1:ToR7SIIEdrgOhgVTHvPgdVRJfgVy+N0wQAagH7L4d5g= +github.com/lucas-clemente/quic-go v0.24.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -597,7 +592,6 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -615,7 +609,6 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9E go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= -go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= @@ -631,7 +624,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/swarm.go b/swarm.go index 47617907..9df18403 100644 --- a/swarm.go +++ b/swarm.go @@ -76,6 +76,13 @@ func WithDialTimeoutLocal(t time.Duration) Option { } } +func WithResourceManager(m network.ResourceManager) Option { + return func(s *Swarm) error { + s.rcmgr = m + return nil + } +} + // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -88,6 +95,8 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup + rcmgr network.ResourceManager + local peer.ID peers peerstore.Peerstore @@ -156,6 +165,9 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, return nil, err } } + if s.rcmgr == nil { + s.rcmgr = network.NullResourceManager + } s.dsync = newDialSync(s.dialWorkerLoop) s.limiter = newDialLimiter(s.dialAddr) @@ -586,6 +598,10 @@ func (s *Swarm) String() string { return fmt.Sprintf("", s.LocalPeer()) } +func (s *Swarm) ResourceManager() network.ResourceManager { + return s.rcmgr +} + // Swarm is a Network. var _ network.Network = (*Swarm)(nil) var _ transport.TransportNetwork = (*Swarm)(nil) diff --git a/swarm_conn.go b/swarm_conn.go index 388e9e76..5099edde 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -9,7 +9,6 @@ import ( "time" ic "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" @@ -42,6 +41,8 @@ type Conn struct { stat network.ConnStats } +var _ network.Conn = &Conn{} + func (c *Conn) ID() string { // format: - return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id) @@ -93,6 +94,7 @@ func (c *Conn) removeStream(s *Stream) { c.stat.NumStreams-- delete(c.streams.m, s) c.streams.Unlock() + s.scope.Done() } // listens for new streams. @@ -109,9 +111,14 @@ func (c *Conn) start() { if err != nil { return } + scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound) + if err != nil { + ts.Reset() + continue + } c.swarm.refs.Add(1) go func() { - s, err := c.addStream(ts, network.DirInbound) + s, err := c.addStream(ts, network.DirInbound, scope) // Don't defer this. We don't want to block // swarm shutdown on the connection handler. @@ -186,19 +193,23 @@ func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) { } } + scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirOutbound) + if err != nil { + return nil, err + } ts, err := c.conn.OpenStream(ctx) - if err != nil { return nil, err } - return c.addStream(ts, network.DirOutbound) + return c.addStream(ts, network.DirOutbound, scope) } -func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, error) { +func (c *Conn) addStream(ts network.MuxedStream, dir network.Direction, scope network.StreamManagementScope) (*Stream, error) { c.streams.Lock() // Are we still online? if c.streams.m == nil { c.streams.Unlock() + scope.Done() ts.Reset() return nil, ErrConnClosed } @@ -207,6 +218,7 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er s := &Stream{ stream: ts, conn: c, + scope: scope, stat: network.Stats{ Direction: dir, Opened: time.Now(), @@ -244,3 +256,7 @@ func (c *Conn) GetStreams() []network.Stream { } return streams } + +func (c *Conn) Scope() network.ConnScope { + return c.conn.Scope() +} diff --git a/swarm_stream.go b/swarm_stream.go index b0063b1b..5e5c9653 100644 --- a/swarm_stream.go +++ b/swarm_stream.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "time" - "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" ) @@ -19,8 +18,9 @@ var _ network.Stream = &Stream{} type Stream struct { id uint64 - stream mux.MuxedStream + stream network.MuxedStream conn *Conn + scope network.StreamManagementScope closeOnce sync.Once @@ -131,8 +131,13 @@ func (s *Stream) Protocol() protocol.ID { // This doesn't actually *do* anything other than record the fact that we're // speaking the given protocol over this stream. It's still up to the user to // negotiate the protocol. This is usually done by the Host. -func (s *Stream) SetProtocol(p protocol.ID) { +func (s *Stream) SetProtocol(p protocol.ID) error { + if err := s.scope.SetProtocol(p); err != nil { + return err + } + s.protocol.Store(p) + return nil } // SetDeadline sets the read and write deadlines for this stream. @@ -154,3 +159,7 @@ func (s *Stream) SetWriteDeadline(t time.Time) error { func (s *Stream) Stat() network.Stats { return s.stat } + +func (s *Stream) Scope() network.StreamScope { + return s.scope +} diff --git a/swarm_test.go b/swarm_test.go index 0c5c341d..714941f5 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -11,17 +11,22 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-core/protocol" + + swarm "github.com/libp2p/go-libp2p-swarm" + . "github.com/libp2p/go-libp2p-swarm/testing" + "github.com/libp2p/go-libp2p-core/control" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - swarm "github.com/libp2p/go-libp2p-swarm" - . "github.com/libp2p/go-libp2p-swarm/testing" - logging "github.com/ipfs/go-log/v2" + mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -453,3 +458,84 @@ func TestStreamCount(t *testing.T) { str.Close() require.Equal(t, countStreams(), 8) } + +func TestResourceManager(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rcmgr1 := mocknetwork.NewMockResourceManager(ctrl) + s1 := GenSwarm(t, OptResourceManager(rcmgr1)) + defer s1.Close() + + rcmgr2 := mocknetwork.NewMockResourceManager(ctrl) + s2 := GenSwarm(t, OptResourceManager(rcmgr2)) + defer s2.Close() + connectSwarms(t, context.Background(), []*swarm.Swarm{s1, s2}) + + strChan := make(chan network.Stream) + s2.SetStreamHandler(func(str network.Stream) { strChan <- str }) + + streamScope1 := mocknetwork.NewMockStreamManagementScope(ctrl) + rcmgr1.EXPECT().OpenStream(s2.LocalPeer(), network.DirOutbound).Return(streamScope1, nil) + streamScope2 := mocknetwork.NewMockStreamManagementScope(ctrl) + rcmgr2.EXPECT().OpenStream(s1.LocalPeer(), network.DirInbound).Return(streamScope2, nil) + str, err := s1.NewStream(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + str.Write([]byte("foobar")) + + p := protocol.ID("proto") + streamScope1.EXPECT().SetProtocol(p) + require.NoError(t, str.SetProtocol(p)) + + sstr := <-strChan + streamScope2.EXPECT().Done() + require.NoError(t, sstr.Close()) + streamScope1.EXPECT().Done() + require.NoError(t, str.Close()) +} + +func TestResourceManagerNewStream(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rcmgr1 := mocknetwork.NewMockResourceManager(ctrl) + s1 := GenSwarm(t, OptResourceManager(rcmgr1)) + defer s1.Close() + + s2 := GenSwarm(t) + defer s2.Close() + + connectSwarms(t, context.Background(), []*swarm.Swarm{s1, s2}) + + rerr := errors.New("denied") + rcmgr1.EXPECT().OpenStream(s2.LocalPeer(), network.DirOutbound).Return(nil, rerr) + _, err := s1.NewStream(context.Background(), s2.LocalPeer()) + require.ErrorIs(t, err, rerr) +} + +func TestResourceManagerAcceptStream(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + rcmgr1 := mocknetwork.NewMockResourceManager(ctrl) + s1 := GenSwarm(t, OptResourceManager(rcmgr1)) + defer s1.Close() + + rcmgr2 := mocknetwork.NewMockResourceManager(ctrl) + s2 := GenSwarm(t, OptResourceManager(rcmgr2)) + defer s2.Close() + s2.SetStreamHandler(func(str network.Stream) { t.Fatal("didn't expect to accept a stream") }) + + connectSwarms(t, context.Background(), []*swarm.Swarm{s1, s2}) + + streamScope := mocknetwork.NewMockStreamManagementScope(ctrl) + rcmgr1.EXPECT().OpenStream(s2.LocalPeer(), network.DirOutbound).Return(streamScope, nil) + streamScope.EXPECT().Done() + rcmgr2.EXPECT().OpenStream(s1.LocalPeer(), network.DirInbound).Return(nil, errors.New("nope")) + str, err := s1.NewStream(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + _, err = str.Write([]byte("foobar")) + require.NoError(t, err) + _, err = str.Read([]byte{0}) + require.EqualError(t, err, "stream reset") +} diff --git a/testing/testing.go b/testing/testing.go index 4af3925f..e1ab9913 100644 --- a/testing/testing.go +++ b/testing/testing.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/sec/insecure" "github.com/libp2p/go-libp2p-core/transport" + "github.com/libp2p/go-tcp-transport" csms "github.com/libp2p/go-conn-security-multistream" "github.com/libp2p/go-libp2p-peerstore/pstoremem" @@ -22,8 +23,6 @@ import ( tptu "github.com/libp2p/go-libp2p-transport-upgrader" yamux "github.com/libp2p/go-libp2p-yamux" msmux "github.com/libp2p/go-stream-muxer-multistream" - "github.com/libp2p/go-tcp-transport" - ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -35,6 +34,7 @@ type config struct { disableQUIC bool dialTimeout time.Duration connectionGater connmgr.ConnectionGater + rcmgr network.ResourceManager sk crypto.PrivKey } @@ -68,6 +68,12 @@ func OptConnGater(cg connmgr.ConnectionGater) Option { } } +func OptResourceManager(rcmgr network.ResourceManager) Option { + return func(_ *testing.T, c *config) { + c.rcmgr = rcmgr + } +} + // OptPeerPrivateKey configures the peer private key which is then used to derive the public key and peer ID. func OptPeerPrivateKey(sk crypto.PrivKey) Option { return func(_ *testing.T, c *config) { @@ -127,6 +133,9 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { if cfg.connectionGater != nil { swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater)) } + if cfg.rcmgr != nil { + swarmOpts = append(swarmOpts, swarm.WithResourceManager(cfg.rcmgr)) + } if cfg.dialTimeout != 0 { swarmOpts = append(swarmOpts, swarm.WithDialTimeout(cfg.dialTimeout)) } @@ -140,7 +149,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { if cfg.disableReuseport { tcpOpts = append(tcpOpts, tcp.DisableReuseport()) } - tcpTransport, err := tcp.NewTCPTransport(upgrader, tcpOpts...) + tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...) require.NoError(t, err) if err := s.AddTransport(tcpTransport); err != nil { t.Fatal(err) @@ -152,7 +161,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { } } if !cfg.disableQUIC { - quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater) + quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater, nil) if err != nil { t.Fatal(err) }