From 46d005a9d3aeae62263bc6ea7d576bcd69e60102 Mon Sep 17 00:00:00 2001 From: Sukun Date: Wed, 26 Apr 2023 17:12:51 +0530 Subject: [PATCH 1/9] quic: prioritise listen connections for reuse --- p2p/transport/quicreuse/reuse.go | 54 +++++++++++++++++++++------ p2p/transport/quicreuse/reuse_test.go | 20 ++++++++++ 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 684a7e0b10..c305e36d5c 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -75,13 +75,15 @@ type reuse struct { routes routing.Router unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn // global contains connections that are listening on 0.0.0.0 / :: - global map[int]*reuseConn + global map[int]*reuseConn + globalDial map[int]*reuseConn } func newReuse() *reuse { r := &reuse{ unicast: make(map[string]map[int]*reuseConn), global: make(map[int]*reuseConn), + globalDial: make(map[int]*reuseConn), closeChan: make(chan struct{}), gcStopChan: make(chan struct{}), } @@ -95,6 +97,9 @@ func (r *reuse) gc() { for _, conn := range r.global { conn.Close() } + for _, conn := range r.globalDial { + conn.Close() + } for _, conns := range r.unicast { for _, conn := range conns { conn.Close() @@ -119,6 +124,12 @@ func (r *reuse) gc() { delete(r.global, key) } } + for key, conn := range r.globalDial { + if conn.ShouldGarbageCollect(now) { + conn.Close() + delete(r.globalDial, key) + } + } for ukey, conns := range r.unicast { for key, conn := range conns { if conn.ShouldGarbageCollect(now) { @@ -189,6 +200,11 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { return conn, nil } + // Use a connection we've previously dialed from + for _, conn := range r.globalDial { + return conn, nil + } + // We don't have a connection that we can use for dialing. // Dial a new connection from a random port. var addr *net.UDPAddr @@ -203,29 +219,43 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { return nil, err } rconn := newReuseConn(conn) - r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn + r.globalDial[conn.LocalAddr().(*net.UDPAddr).Port] = rconn return rconn, nil } func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { - conn, err := net.ListenUDP(network, laddr) - if err != nil { - return nil, err + r.mutex.Lock() + defer r.mutex.Unlock() + + var rconn *reuseConn + var localAddr *net.UDPAddr + + // reuse the connection if we've dialed out of this port already + if laddr.IP.IsUnspecified() { + if _, ok := r.globalDial[laddr.Port]; ok { + rconn = r.globalDial[laddr.Port] + localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) + } + } + if rconn == nil { + conn, err := net.ListenUDP(network, laddr) + if err != nil { + return nil, err + } + localAddr = conn.LocalAddr().(*net.UDPAddr) + rconn = newReuseConn(conn) } - localAddr := conn.LocalAddr().(*net.UDPAddr) - rconn := newReuseConn(conn) rconn.IncreaseCount() - r.mutex.Lock() - defer r.mutex.Unlock() - // Deal with listen on a global address if localAddr.IP.IsUnspecified() { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.global[localAddr.Port] = rconn - return rconn, err + // delete the entry from dial map in case we are reusing this connection + delete(r.globalDial, localAddr.Port) + return rconn, nil } // Deal with listen on a unicast address @@ -239,7 +269,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.unicast[localAddr.IP.String()][localAddr.Port] = rconn - return rconn, err + return rconn, nil } func (r *reuse) Close() error { diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index 36a109c80d..74761c1526 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -26,6 +26,11 @@ func closeAllConns(reuse *reuse) { conn.DecreaseCount() } } + for _, conn := range reuse.globalDial { + for conn.GetCount() > 0 { + conn.DecreaseCount() + } + } for _, conns := range reuse.unicast { for _, conn := range conns { for conn.GetCount() > 0 { @@ -110,6 +115,21 @@ func TestReuseConnectionWhenDialing(t *testing.T) { require.Equal(t, conn.GetCount(), 2) } +func TestReuseConnectionWhenListening(t *testing.T) { + reuse := newReuse() + cleanup(t, reuse) + + raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") + require.NoError(t, err) + conn, err := reuse.Dial("udp4", raddr) + require.NoError(t, err) + laddr := &net.UDPAddr{IP: net.IPv4zero, Port: conn.UDPConn.LocalAddr().(*net.UDPAddr).Port} + lconn, err := reuse.Listen("udp4", laddr) + require.NoError(t, err) + require.Equal(t, lconn.GetCount(), 2) + require.Equal(t, conn.GetCount(), 2) +} + func TestReuseListenOnSpecificInterface(t *testing.T) { if platformHasRoutingTables() { t.Skip("this test only works on platforms that support routing tables") From 8efdc6a48cfd71cd34fa30b643f65a42691fbdfb Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 27 Apr 2023 17:15:58 +0530 Subject: [PATCH 2/9] add test for checking reuse in listen after dial --- p2p/transport/quicreuse/reuse_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index 74761c1526..badd731698 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -130,6 +130,30 @@ func TestReuseConnectionWhenListening(t *testing.T) { require.Equal(t, conn.GetCount(), 2) } +func TestReuseConnectionWhenDialBeforeListen(t *testing.T) { + reuse := newReuse() + cleanup(t, reuse) + + // dial any address + raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") + require.NoError(t, err) + _, err = reuse.Dial("udp4", raddr) + require.NoError(t, err) + + // open a listener + laddr := &net.UDPAddr{IP: net.IPv4zero, Port: 10000} + lconn, err := reuse.Listen("udp4", laddr) + require.NoError(t, err) + + // new dials should go via the listener connection + raddr, err = net.ResolveUDPAddr("udp4", "1.1.1.1:1235") + require.NoError(t, err) + conn, err := reuse.Dial("udp4", raddr) + require.NoError(t, err) + require.Equal(t, conn, lconn) + require.Equal(t, conn.GetCount(), 2) +} + func TestReuseListenOnSpecificInterface(t *testing.T) { if platformHasRoutingTables() { t.Skip("this test only works on platforms that support routing tables") From ec470e2eda9db55022b5cfc4131df0eec3ed31ac Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 27 Apr 2023 17:54:08 +0530 Subject: [PATCH 3/9] rename globalDial to fallback --- p2p/transport/quicreuse/reuse.go | 37 +++++++++++++++------------ p2p/transport/quicreuse/reuse_test.go | 2 +- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index c305e36d5c..c36c8596cb 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -75,17 +75,20 @@ type reuse struct { routes routing.Router unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn // global contains connections that are listening on 0.0.0.0 / :: - global map[int]*reuseConn - globalDial map[int]*reuseConn + global map[int]*reuseConn + // globalFallback contains connections that we've dialed out from. connections + // are reused from this map if no connection is available in the global + // map. These connections are listening on 0.0.0.0 / :: + globalFallback map[int]*reuseConn } func newReuse() *reuse { r := &reuse{ - unicast: make(map[string]map[int]*reuseConn), - global: make(map[int]*reuseConn), - globalDial: make(map[int]*reuseConn), - closeChan: make(chan struct{}), - gcStopChan: make(chan struct{}), + unicast: make(map[string]map[int]*reuseConn), + global: make(map[int]*reuseConn), + globalFallback: make(map[int]*reuseConn), + closeChan: make(chan struct{}), + gcStopChan: make(chan struct{}), } go r.gc() return r @@ -97,7 +100,7 @@ func (r *reuse) gc() { for _, conn := range r.global { conn.Close() } - for _, conn := range r.globalDial { + for _, conn := range r.globalFallback { conn.Close() } for _, conns := range r.unicast { @@ -124,10 +127,10 @@ func (r *reuse) gc() { delete(r.global, key) } } - for key, conn := range r.globalDial { + for key, conn := range r.globalFallback { if conn.ShouldGarbageCollect(now) { conn.Close() - delete(r.globalDial, key) + delete(r.globalFallback, key) } } for ukey, conns := range r.unicast { @@ -201,7 +204,7 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { } // Use a connection we've previously dialed from - for _, conn := range r.globalDial { + for _, conn := range r.globalFallback { return conn, nil } @@ -219,7 +222,7 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { return nil, err } rconn := newReuseConn(conn) - r.globalDial[conn.LocalAddr().(*net.UDPAddr).Port] = rconn + r.globalFallback[conn.LocalAddr().(*net.UDPAddr).Port] = rconn return rconn, nil } @@ -230,10 +233,10 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { var rconn *reuseConn var localAddr *net.UDPAddr - // reuse the connection if we've dialed out of this port already + // reuse the fallback connection if we've dialed out from this port already if laddr.IP.IsUnspecified() { - if _, ok := r.globalDial[laddr.Port]; ok { - rconn = r.globalDial[laddr.Port] + if _, ok := r.globalFallback[laddr.Port]; ok { + rconn = r.globalFallback[laddr.Port] localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) } } @@ -253,8 +256,8 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). r.global[localAddr.Port] = rconn - // delete the entry from dial map in case we are reusing this connection - delete(r.globalDial, localAddr.Port) + // delete the entry from fallback map in case we are reusing this connection + delete(r.globalFallback, localAddr.Port) return rconn, nil } diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index badd731698..b3506a70e4 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -26,7 +26,7 @@ func closeAllConns(reuse *reuse) { conn.DecreaseCount() } } - for _, conn := range reuse.globalDial { + for _, conn := range reuse.globalFallback { for conn.GetCount() > 0 { conn.DecreaseCount() } From 53854877cf036199ef0868b164f06f7aea2058bf Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 9 May 2023 14:37:16 +0530 Subject: [PATCH 4/9] improve naming. fix gc test --- p2p/transport/quicreuse/connmgr_test.go | 2 +- p2p/transport/quicreuse/reuse.go | 47 ++++++++++++------------- p2p/transport/quicreuse/reuse_test.go | 15 +++++--- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/p2p/transport/quicreuse/connmgr_test.go b/p2p/transport/quicreuse/connmgr_test.go index 16677c50fe..b5399e8318 100644 --- a/p2p/transport/quicreuse/connmgr_test.go +++ b/p2p/transport/quicreuse/connmgr_test.go @@ -25,7 +25,7 @@ func checkClosed(t *testing.T, cm *ConnManager) { continue } r.mutex.Lock() - for _, conn := range r.global { + for _, conn := range r.globalListeners { require.Zero(t, conn.GetCount()) } for _, conns := range r.unicast { diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index c36c8596cb..a9b765f000 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -74,21 +74,21 @@ type reuse struct { routes routing.Router unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn - // global contains connections that are listening on 0.0.0.0 / :: - global map[int]*reuseConn - // globalFallback contains connections that we've dialed out from. connections - // are reused from this map if no connection is available in the global + // globalListeners contains connections that are listening on 0.0.0.0 / :: + globalListeners map[int]*reuseConn + // globalDialers contains connections that we've dialed out from. connections + // are reused from this map if no connection is available in the globalListeners // map. These connections are listening on 0.0.0.0 / :: - globalFallback map[int]*reuseConn + globalDialers map[int]*reuseConn } func newReuse() *reuse { r := &reuse{ - unicast: make(map[string]map[int]*reuseConn), - global: make(map[int]*reuseConn), - globalFallback: make(map[int]*reuseConn), - closeChan: make(chan struct{}), - gcStopChan: make(chan struct{}), + unicast: make(map[string]map[int]*reuseConn), + globalListeners: make(map[int]*reuseConn), + globalDialers: make(map[int]*reuseConn), + closeChan: make(chan struct{}), + gcStopChan: make(chan struct{}), } go r.gc() return r @@ -97,10 +97,10 @@ func newReuse() *reuse { func (r *reuse) gc() { defer func() { r.mutex.Lock() - for _, conn := range r.global { + for _, conn := range r.globalListeners { conn.Close() } - for _, conn := range r.globalFallback { + for _, conn := range r.globalDialers { conn.Close() } for _, conns := range r.unicast { @@ -121,16 +121,16 @@ func (r *reuse) gc() { case <-ticker.C: now := time.Now() r.mutex.Lock() - for key, conn := range r.global { + for key, conn := range r.globalListeners { if conn.ShouldGarbageCollect(now) { conn.Close() - delete(r.global, key) + delete(r.globalListeners, key) } } - for key, conn := range r.globalFallback { + for key, conn := range r.globalDialers { if conn.ShouldGarbageCollect(now) { conn.Close() - delete(r.globalFallback, key) + delete(r.globalDialers, key) } } for ukey, conns := range r.unicast { @@ -199,12 +199,12 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { // Use a connection listening on 0.0.0.0 (or ::). // Again, we don't care about the port number. - for _, conn := range r.global { + for _, conn := range r.globalListeners { return conn, nil } // Use a connection we've previously dialed from - for _, conn := range r.globalFallback { + for _, conn := range r.globalDialers { return conn, nil } @@ -222,7 +222,7 @@ func (r *reuse) dialLocked(network string, source *net.IP) (*reuseConn, error) { return nil, err } rconn := newReuseConn(conn) - r.globalFallback[conn.LocalAddr().(*net.UDPAddr).Port] = rconn + r.globalDialers[conn.LocalAddr().(*net.UDPAddr).Port] = rconn return rconn, nil } @@ -235,9 +235,10 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { // reuse the fallback connection if we've dialed out from this port already if laddr.IP.IsUnspecified() { - if _, ok := r.globalFallback[laddr.Port]; ok { - rconn = r.globalFallback[laddr.Port] + if _, ok := r.globalDialers[laddr.Port]; ok { + rconn = r.globalDialers[laddr.Port] localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) + delete(r.globalDialers, localAddr.Port) } } if rconn == nil { @@ -255,9 +256,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { if localAddr.IP.IsUnspecified() { // The kernel already checked that the laddr is not already listen // so we need not check here (when we create ListenUDP). - r.global[localAddr.Port] = rconn - // delete the entry from fallback map in case we are reusing this connection - delete(r.globalFallback, localAddr.Port) + r.globalListeners[localAddr.Port] = rconn return rconn, nil } diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index b3506a70e4..836ccae485 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -21,12 +21,12 @@ func (c *reuseConn) GetCount() int { func closeAllConns(reuse *reuse) { reuse.mutex.Lock() - for _, conn := range reuse.global { + for _, conn := range reuse.globalListeners { for conn.GetCount() > 0 { conn.DecreaseCount() } } - for _, conn := range reuse.globalFallback { + for _, conn := range reuse.globalDialers { for conn.GetCount() > 0 { conn.DecreaseCount() } @@ -201,9 +201,15 @@ func TestReuseGarbageCollect(t *testing.T) { numGlobals := func() int { reuse.mutex.Lock() defer reuse.mutex.Unlock() - return len(reuse.global) + return len(reuse.globalListeners) + len(reuse.globalDialers) } + raddr, err := net.ResolveUDPAddr("udp4", "1.2.3.4:1234") + require.NoError(t, err) + dconn, err := reuse.Dial("udp4", raddr) + require.NoError(t, err) + require.Equal(t, dconn.GetCount(), 1) + addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") require.NoError(t, err) lconn, err := reuse.Listen("udp4", addr) @@ -212,13 +218,14 @@ func TestReuseGarbageCollect(t *testing.T) { closeTime := time.Now() lconn.DecreaseCount() + dconn.DecreaseCount() for { num := numGlobals() if closeTime.Add(maxUnusedDuration).Before(time.Now()) { break } - require.Equal(t, num, 1) + require.Equal(t, num, 2) time.Sleep(2 * time.Millisecond) } require.Eventually(t, func() bool { return numGlobals() == 0 }, 4*garbageCollectInterval, 10*time.Millisecond) From 223d971d0caa0cd0ce0cd980067a04512b031ecf Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 9 May 2023 16:00:10 +0530 Subject: [PATCH 5/9] reuse dial connections on listen requests for port 0 --- p2p/transport/quicreuse/reuse.go | 16 ++++++++++++---- p2p/transport/quicreuse/reuse_test.go | 13 ++++++++++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index a9b765f000..b7532929ab 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -76,9 +76,9 @@ type reuse struct { unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn // globalListeners contains connections that are listening on 0.0.0.0 / :: globalListeners map[int]*reuseConn - // globalDialers contains connections that we've dialed out from. connections - // are reused from this map if no connection is available in the globalListeners - // map. These connections are listening on 0.0.0.0 / :: + // globalDialers contains connections that we've dialed out from. These connections are listening on 0.0.0.0 / :: + // On Dial, connections are reused from this map if no connection is available in the globalListeners + // On Listen, connections are reused from this map if the requested port is 0 globalDialers map[int]*reuseConn } @@ -235,7 +235,15 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { // reuse the fallback connection if we've dialed out from this port already if laddr.IP.IsUnspecified() { - if _, ok := r.globalDialers[laddr.Port]; ok { + if laddr.Port == 0 { + // use any connection we have dialed out of + for _, conn := range r.globalDialers { + rconn = conn + localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) + delete(r.globalDialers, localAddr.Port) + break + } + } else if _, ok := r.globalDialers[laddr.Port]; ok { rconn = r.globalDialers[laddr.Port] localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) delete(r.globalDialers, localAddr.Port) diff --git a/p2p/transport/quicreuse/reuse_test.go b/p2p/transport/quicreuse/reuse_test.go index 836ccae485..0cd62f0d51 100644 --- a/p2p/transport/quicreuse/reuse_test.go +++ b/p2p/transport/quicreuse/reuse_test.go @@ -137,11 +137,11 @@ func TestReuseConnectionWhenDialBeforeListen(t *testing.T) { // dial any address raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") require.NoError(t, err) - _, err = reuse.Dial("udp4", raddr) + rconn, err := reuse.Dial("udp4", raddr) require.NoError(t, err) // open a listener - laddr := &net.UDPAddr{IP: net.IPv4zero, Port: 10000} + laddr := &net.UDPAddr{IP: net.IPv4zero, Port: 1234} lconn, err := reuse.Listen("udp4", laddr) require.NoError(t, err) @@ -152,6 +152,13 @@ func TestReuseConnectionWhenDialBeforeListen(t *testing.T) { require.NoError(t, err) require.Equal(t, conn, lconn) require.Equal(t, conn.GetCount(), 2) + + // a listener on an unspecified port should reuse the dialer + laddr2 := &net.UDPAddr{IP: net.IPv4zero, Port: 0} + lconn2, err := reuse.Listen("udp4", laddr2) + require.NoError(t, err) + require.Equal(t, lconn2, rconn) + require.Equal(t, lconn2.GetCount(), 2) } func TestReuseListenOnSpecificInterface(t *testing.T) { @@ -210,7 +217,7 @@ func TestReuseGarbageCollect(t *testing.T) { require.NoError(t, err) require.Equal(t, dconn.GetCount(), 1) - addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") + addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:1234") require.NoError(t, err) lconn, err := reuse.Listen("udp4", addr) require.NoError(t, err) From 64b308512e7def9408ad9824c6a5bd3388d82c0a Mon Sep 17 00:00:00 2001 From: Sukun Date: Tue, 9 May 2023 16:18:46 +0530 Subject: [PATCH 6/9] Update p2p/transport/quicreuse/reuse.go Co-authored-by: Marten Seemann --- p2p/transport/quicreuse/reuse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index b7532929ab..c5168c6084 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -78,7 +78,7 @@ type reuse struct { globalListeners map[int]*reuseConn // globalDialers contains connections that we've dialed out from. These connections are listening on 0.0.0.0 / :: // On Dial, connections are reused from this map if no connection is available in the globalListeners - // On Listen, connections are reused from this map if the requested port is 0 + // On Listen, connections are reused from this map if the requested port is 0, and then moved to globalListeners globalDialers map[int]*reuseConn } From 1ae17120d806caeab7ecfd185ae00756f84e551f Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 9 May 2023 16:34:39 +0530 Subject: [PATCH 7/9] fix comment --- p2p/transport/quicreuse/reuse.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index c5168c6084..957e2f7635 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -233,19 +233,25 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { var rconn *reuseConn var localAddr *net.UDPAddr - // reuse the fallback connection if we've dialed out from this port already + // Check if we can reuse a connection we have already dialed out from. + // We reuse a connection from globalDialers when the requested port is 0 or the requested + // port is already in the globalDialers. + // If we are reusing a connection from globalDialers, we move the globalDialers entry to + // globalListeners if laddr.IP.IsUnspecified() { if laddr.Port == 0 { - // use any connection we have dialed out of + // the requested port is 0, we can reuse any connection for _, conn := range r.globalDialers { rconn = conn localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) + // this entry will be added to the globalListeners map below delete(r.globalDialers, localAddr.Port) break } } else if _, ok := r.globalDialers[laddr.Port]; ok { rconn = r.globalDialers[laddr.Port] localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) + // this entry will be added to the globalListeners map below delete(r.globalDialers, localAddr.Port) } } From a6a9da1fd40412562715e19f5964dfc2eb35392e Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 9 May 2023 16:41:29 +0530 Subject: [PATCH 8/9] cleanup reuse logic --- p2p/transport/quicreuse/reuse.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 957e2f7635..3023d31253 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -244,26 +244,29 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { for _, conn := range r.globalDialers { rconn = conn localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) - // this entry will be added to the globalListeners map below delete(r.globalDialers, localAddr.Port) break } } else if _, ok := r.globalDialers[laddr.Port]; ok { rconn = r.globalDialers[laddr.Port] localAddr = rconn.UDPConn.LocalAddr().(*net.UDPAddr) - // this entry will be added to the globalListeners map below delete(r.globalDialers, localAddr.Port) } - } - if rconn == nil { - conn, err := net.ListenUDP(network, laddr) - if err != nil { - return nil, err + // found a match + if rconn != nil { + rconn.IncreaseCount() + r.globalListeners[localAddr.Port] = rconn + return rconn, nil } - localAddr = conn.LocalAddr().(*net.UDPAddr) - rconn = newReuseConn(conn) } + conn, err := net.ListenUDP(network, laddr) + if err != nil { + return nil, err + } + localAddr = conn.LocalAddr().(*net.UDPAddr) + rconn = newReuseConn(conn) + rconn.IncreaseCount() // Deal with listen on a global address From 4a245c42b2cbf12433e6fc1fa3cc0b0ee084fc3f Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 9 May 2023 16:43:23 +0530 Subject: [PATCH 9/9] fix scoping --- p2p/transport/quicreuse/reuse.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/transport/quicreuse/reuse.go b/p2p/transport/quicreuse/reuse.go index 3023d31253..cc90038efe 100644 --- a/p2p/transport/quicreuse/reuse.go +++ b/p2p/transport/quicreuse/reuse.go @@ -230,15 +230,15 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { r.mutex.Lock() defer r.mutex.Unlock() - var rconn *reuseConn - var localAddr *net.UDPAddr - // Check if we can reuse a connection we have already dialed out from. // We reuse a connection from globalDialers when the requested port is 0 or the requested // port is already in the globalDialers. // If we are reusing a connection from globalDialers, we move the globalDialers entry to // globalListeners if laddr.IP.IsUnspecified() { + var rconn *reuseConn + var localAddr *net.UDPAddr + if laddr.Port == 0 { // the requested port is 0, we can reuse any connection for _, conn := range r.globalDialers { @@ -264,8 +264,8 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { if err != nil { return nil, err } - localAddr = conn.LocalAddr().(*net.UDPAddr) - rconn = newReuseConn(conn) + localAddr := conn.LocalAddr().(*net.UDPAddr) + rconn := newReuseConn(conn) rconn.IncreaseCount()