Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle disconnected peer #117

Closed
iulianpascalau opened this issue Oct 17, 2018 · 14 comments
Closed

Handle disconnected peer #117

iulianpascalau opened this issue Oct 17, 2018 · 14 comments
Labels
kind/bug A bug in existing code (including security flaws)

Comments

@iulianpascalau
Copy link
Contributor

Hello

I have the following use case:
peer A connects to peer B. Peer B do not wishes to retain the connection to peer A and closes the connection (calling net.Conn.Close() on peer A newly connection). After a while, peer A tries to connect again to peer B and the following message appear on peer B:
09:54:07.886 ERROR pubsub: already have connection to peer: <peer.ID 16*E91Gqs> pubsub.go:205

Searching through the source code, type PubSubNotif only handles the Connected event. In my above-mentioned use case I have used floodsub as a mean to disseminate data. FloodSub uses PubSub which has a channel called peerDead which is never used.

I have also tried using GossipSub with the same error being displayed.

Am I missing something? Big thanks!

@vyzo
Copy link
Collaborator

vyzo commented Oct 17, 2018

We wait for the stream handler to report the peer as dead, but if there are no messages it will not happen.
This is a bug.

@vyzo vyzo added the kind/bug A bug in existing code (including security flaws) label Oct 17, 2018
@vyzo
Copy link
Collaborator

vyzo commented Oct 17, 2018

@iulianpascalau
Copy link
Contributor Author

Ok, missed that file. Thanks.

@vyzo
Copy link
Collaborator

vyzo commented Oct 17, 2018

Do you have a small repro case? That would help figure out what is happening.

@iulianpascalau
Copy link
Contributor Author

Will do

@iulianpascalau
Copy link
Contributor Author

Sorry for the long code posted.

The test code provided creates 1+noOfNodes, the first (mainNode) keeping only 4 connections. On the go routine marked with data race condition the mainNode closes the first connection it has. The log error appeared only after introducing this go routine.

Further tests (not presented here) with the package go-libp2p-kad-dht that also bootstrap nodes produced the log error.

package testing

import (
	"context"
	"crypto/ecdsa"
	"fmt"
	"math/rand"
	"testing"
	"time"

	"github.com/btcsuite/btcd/btcec"
	"github.com/ipfs/go-ipfs-addr"
	"github.com/libp2p/go-conn-security-multistream"
	"github.com/libp2p/go-floodsub"
	"github.com/libp2p/go-libp2p-crypto"
	cr "github.com/libp2p/go-libp2p-crypto"
	"github.com/libp2p/go-libp2p-host"
	"github.com/libp2p/go-libp2p-metrics"
	"github.com/libp2p/go-libp2p-net"
	"github.com/libp2p/go-libp2p-peer"
	"github.com/libp2p/go-libp2p-peerstore"
	"github.com/libp2p/go-libp2p-peerstore/pstoremem"
	"github.com/libp2p/go-libp2p-secio"
	"github.com/libp2p/go-libp2p-swarm"
	"github.com/libp2p/go-libp2p-transport-upgrader"
	"github.com/libp2p/go-libp2p/p2p/host/basic"
	"github.com/libp2p/go-tcp-transport"
	"github.com/multiformats/go-multiaddr"
	"github.com/whyrusleeping/go-smux-multistream"
	"github.com/whyrusleeping/go-smux-yamux"
)

type notifier struct{

}

// Listen is called when network starts listening on an addr
func (n *notifier) Listen(netw net.Network, ma multiaddr.Multiaddr) {}

// ListenClose is called when network starts listening on an addr
func (n *notifier) ListenClose(netw net.Network, ma multiaddr.Multiaddr) {}

// Connected is called when a connection opened
func (n *notifier) Connected(netw net.Network, conn net.Conn) {
	//retain max 4 connections
	if (len(netw.Conns()) > 4){
		conn.Close()
		fmt.Printf("Connection refused for peer: %v!\n", conn.RemotePeer().Pretty())
	}
}

// Disconnected is called when a connection closed
func (cn *notifier) Disconnected(netw net.Network, conn net.Conn) {}

// OpenedStream is called when a stream opened
func (cn *notifier) OpenedStream(netw net.Network, stream net.Stream) {}

// ClosedStream is called when a stream was closed
func (cn *notifier) ClosedStream(netw net.Network, stream net.Stream) {}


type Messenger struct {
	ctx     context.Context
	p2pNode host.Host
	pubsub  *floodsub.PubSub
}

//helper func
func genUpgrader(n *swarm.Swarm) *stream.Upgrader {
	id := n.LocalPeer()
	pk := n.Peerstore().PrivKey(id)
	secMuxer := new(csms.SSMuxer)
	secMuxer.AddTransport(secio.ID, &secio.Transport{
		LocalID:    id,
		PrivateKey: pk,
	})

	stMuxer := multistream.NewBlankTransport()
	stMuxer.AddTransport("/yamux/1.0.0", sm_yamux.DefaultTransport)

	return &stream.Upgrader{
		Secure:  secMuxer,
		Muxer:   stMuxer,
		Filters: n.Filters,
	}
}

//helper func
func genSwarm(ctx context.Context, sKey crypto.PrivKey, pKey crypto.PubKey, id peer.ID, addr multiaddr.Multiaddr) (*swarm.Swarm, error) {
	ps := peerstore.NewPeerstore(pstoremem.NewKeyBook(), pstoremem.NewAddrBook(), pstoremem.NewPeerMetadata())
	ps.AddPubKey(id, pKey)
	ps.AddPrivKey(id, sKey)
	s := swarm.NewSwarm(ctx, id, ps, metrics.NewBandwidthCounter())

	tcpTransport := tcp.NewTCPTransport(genUpgrader(s))
	tcpTransport.DisableReuseport = false

	err := s.AddTransport(tcpTransport)
	if err != nil {
		return nil, err
	}

	err = s.Listen(addr)
	if err != nil {
		return nil, err
	}

	s.Peerstore().AddAddrs(id, s.ListenAddresses(), peerstore.PermanentAddrTTL)

	return s, nil
}

// NewMessenger creates a new messenger having as seed only the port
func NewMessenger(port int, refuseConns bool) (*Messenger, error) {
	mesgr := Messenger{ctx: context.Background()}

	//create private, public keys + id
	r := rand.New(rand.NewSource(int64(port)))

	prvKey, err := ecdsa.GenerateKey(btcec.S256(), r)

	if err != nil {
		panic(err)
	}

	k := (*cr.Secp256k1PrivateKey)(prvKey)

	sKey := k
	pKey := k.GetPublic()
	id, err := peer.IDFromPublicKey(pKey)
	if err != nil{
		return nil, err
	}

	addr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))

	//create host
	netw, err := genSwarm(mesgr.ctx, sKey, pKey, id, addr)
	if err != nil {
		return nil, err
	}
	mesgr.p2pNode = basichost.New(netw)

	//create pubsub
	pubsub, err := floodsub.NewFloodSub(mesgr.ctx, mesgr.p2pNode) //floodsub.NewGossipSub(ctx, node.p2pNode)
	if err != nil {
		return nil, err
	}
	mesgr.pubsub = pubsub

	if refuseConns{
		mesgr.p2pNode.Network().Notify(&notifier{})
	}

	return &mesgr, nil
}

func (mesgr *Messenger)ConnectToAddr(addr string) error{
	pinfo, err := mesgr.parseAddressIpfs(addr)
	if err != nil {
		return err
	}

	if err := mesgr.p2pNode.Connect(context.Background(), *pinfo); err != nil {
		return err
	}

	return nil
}

// ParseAddressIpfs translates the string containing the address of the node to a PeerInfo object
func (mesgr *Messenger) parseAddressIpfs(address string) (*peerstore.PeerInfo, error) {
	addr, err := ipfsaddr.ParseString(address)
	if err != nil {
		return nil, err
	}

	pinfo, err := peerstore.InfoFromP2pAddr(addr.Multiaddr())
	if err != nil {
		return nil, err
	}

	return pinfo, nil
}

func (mesgr *Messenger) FirstAddress() string{
	return mesgr.p2pNode.Addrs()[0].String() + "/ipfs/" + mesgr.p2pNode.ID().Pretty()
}

func TestPubSub(t *testing.T){
	//create node1 that refuses connections
	nodeMain, err := NewMessenger(4000, true)
	if err != nil {
		t.Fail()
	}

	noOfNodes := 1

	subscriberMain, err := nodeMain.pubsub.Subscribe("test")
	if err != nil {
		t.Fail()
	}

	go func() {
		for {
			msg, _ := subscriberMain.Next(nodeMain.ctx)

			fmt.Printf("Got message: %v\n", msg.GetData())
		}
	}()

	//create a data race condition that asynchronously close connection
	go func(){
		for{
			if len(nodeMain.p2pNode.Network().Conns()) > 0{
				nodeMain.p2pNode.Network().Conns()[0].Close()
			}

			time.Sleep(time.Millisecond * 100)

		}
	}()


	nodes := make([]*Messenger, 0)

	for i := 0; i < noOfNodes; i++{
		node, err := NewMessenger(4001 + i, false)
		if err != nil{
			t.Fail()
		}

		subscriber, err := node.pubsub.Subscribe("test")
		if err != nil {
			t.Fail()
		}

		go func() {
			for {
				msg, _ := subscriber.Next(node.ctx)

				fmt.Printf("Got message: %v\n", msg.GetData())
			}
		}()

		nodes = append(nodes, node)
	}


	//try ten times
	for i := 0; i < 10; i++{
		for j := 0; j < len(nodes); j++{
			nodes[j].ConnectToAddr(nodeMain.FirstAddress())
		}

		time.Sleep(time.Second)
	}
}

@vyzo
Copy link
Collaborator

vyzo commented Oct 17, 2018

You can greatly simplify by using the libp2p constructor for making hosts: libp2p.New.

@Stebalien
Copy link
Member

This is a very noisy bug...

@iulianpascalau
Copy link
Contributor Author

I was wondering, as a fix for this issue, wouldn't have been better just to have a single channel for adding/removing the peers (newPeers will be merged with peerDead and the channel type being of a composite struct that will hold Peer.ID [for peerDead], inet.Stream [for newPeers] and an indicator for operation)? In this way the new channel actually would give a chronological sequence of add-remove-add-remove-and so on.

@vyzo
Copy link
Collaborator

vyzo commented Oct 25, 2018

I think we also have a race in that we declare the peer dead both from the reader and the writer.
It is possible that the reader dies, the peer reconnects, and then we send a message with the old writer (that fails) and declares the peer dead again.
Seems like we have to rework our dead peer logic.

@vyzo
Copy link
Collaborator

vyzo commented Dec 14, 2018

Oops, wrong issue; reopened.

@vyzo
Copy link
Collaborator

vyzo commented Dec 14, 2018

@iulianpascalau can you try again? We just merged a fix for handling multiple connections in #132.

@iulianpascalau
Copy link
Contributor Author

Hello, fetched the new version directly from the master branch, ran the same test 10 times, error log did not appear in any of the tests.
We can consider this issue also closed.
Big thanks!

@vyzo
Copy link
Collaborator

vyzo commented Dec 14, 2018

Excellent!

@vyzo vyzo closed this as completed Dec 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug A bug in existing code (including security flaws)
Projects
None yet
Development

No branches or pull requests

3 participants