From 1373218519ab9d8ed7ae2229438e3470e019ce83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 18:54:58 +0100 Subject: [PATCH 1/8] coreapi: swarm interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/swarm.go | 37 +++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 core/coreapi/interface/swarm.go diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go new file mode 100644 index 00000000000..1ec260e07a6 --- /dev/null +++ b/core/coreapi/interface/swarm.go @@ -0,0 +1,37 @@ +package iface + +import ( + "time" + + "context" + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" +) + +// PeerInfo contains information about a peer +type PeerInfo interface { + // ID returns PeerID + ID() peer.ID + + // Address returns the multiaddress via which we are connected with the peer + Address() ma.Multiaddr + + // Latency returns last known round trip time to the peer + Latency() time.Duration + + // Streams returns list of streams established with the peer + // TODO: should this return multicodecs? + Streams() []string +} + +// SwarmAPI specifies the interface to libp2p swarm +type SwarmAPI interface { + // Connect to a given address + Connect(context.Context, ma.Multiaddr) error + + // Disconnect from a given address + Disconnect(context.Context, ma.Multiaddr) error + + // Peers returns the list of peers we are connected to + Peers(context.Context) ([]PeerInfo, error) +} From 8358c8d0410c9da896ade974c9d08212b1f2b39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:02:57 +0100 Subject: [PATCH 2/8] coreapi: implement swarm api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/coreapi.go | 5 + core/coreapi/interface/coreapi.go | 3 + core/coreapi/interface/swarm.go | 6 +- core/coreapi/swarm.go | 147 ++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 3 deletions(-) create mode 100644 core/coreapi/swarm.go diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index bb7afd61a51..8a0410a2dae 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -67,3 +67,8 @@ func (api *CoreAPI) Pin() coreiface.PinAPI { func (api *CoreAPI) Dht() coreiface.DhtAPI { return (*DhtAPI)(api) } + +// Swarm returns the SwarmAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) Swarm() coreiface.SwarmAPI { + return &SwarmAPI{api} +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 0053d472e6b..0b153b6f9a1 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -34,6 +34,9 @@ type CoreAPI interface { // Dht returns an implementation of Dht API Dht() DhtAPI + // Swarm returns an implementation of Swarm API + Swarm() SwarmAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 1ec260e07a6..1f0b1216f63 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -1,9 +1,9 @@ package iface import ( + "context" "time" - "context" ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" ) @@ -17,11 +17,11 @@ type PeerInfo interface { Address() ma.Multiaddr // Latency returns last known round trip time to the peer - Latency() time.Duration + Latency(context.Context) (time.Duration, error) // Streams returns list of streams established with the peer // TODO: should this return multicodecs? - Streams() []string + Streams(context.Context) ([]string, error) } // SwarmAPI specifies the interface to libp2p swarm diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go new file mode 100644 index 00000000000..4af0ce61aa9 --- /dev/null +++ b/core/coreapi/swarm.go @@ -0,0 +1,147 @@ +package coreapi + +import ( + "context" + "errors" + "fmt" + "time" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + + iaddr "gx/ipfs/QmQViVWBHbU6HmYjXcdNq7tVASCNgdg64ZGcauuDkLCivW/go-ipfs-addr" + swarm "gx/ipfs/QmSwZMWwFZSUpe5muU2xgTUwppH24KfMwdPXiwbEp2c6G5/go-libp2p-swarm" + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" + net "gx/ipfs/QmXfkENeeBvh3zYA51MaSdGUdBjhQ99cP5WQe8zgr6wchG/go-libp2p-net" + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" +) + +type SwarmAPI struct { + *CoreAPI +} + +type connInfo struct { + api *CoreAPI + conn net.Conn + + addr ma.Multiaddr + peer peer.ID + muxer string +} + +func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + snet, ok := api.node.PeerHost.Network().(*swarm.Network) + if !ok { + return fmt.Errorf("peerhost network was not swarm") + } + + swrm := snet.Swarm() + + ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) + if err != nil { + return err + } + + pi := pstore.PeerInfo{ + ID: ia.ID(), + Addrs: []ma.Multiaddr{ia.Transport()}, + } + + swrm.Backoff().Clear(pi.ID) + + return api.node.PeerHost.Connect(ctx, pi) +} + +func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) + if err != nil { + return err + } + + taddr := ia.Transport() + + found := false + conns := api.node.PeerHost.Network().ConnsToPeer(ia.ID()) + for _, conn := range conns { + if !conn.RemoteMultiaddr().Equal(taddr) { + continue + } + + if err := conn.Close(); err != nil { + return err + } + found = true + break + } + + if !found { + return errors.New("conn not found") + } + + return nil +} + +func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + conns := api.node.PeerHost.Network().Conns() + + var out []coreiface.PeerInfo + for _, c := range conns { + pid := c.RemotePeer() + addr := c.RemoteMultiaddr() + + ci := &connInfo{ + api: api.CoreAPI, + conn: c, + + addr: addr, + peer: pid, + } + + swcon, ok := c.(*swarm.Conn) + if ok { + ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) + } + + out = append(out, ci) + } + + return out, nil +} + +func (ci *connInfo) ID() peer.ID { + return ci.ID() +} + +func (ci *connInfo) Address() ma.Multiaddr { + return ci.addr +} + +func (ci *connInfo) Latency(context.Context) (time.Duration, error) { + return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil +} + +func (ci *connInfo) Streams(context.Context) ([]string, error) { + streams, err := ci.conn.GetStreams() + if err != nil { + return nil, err + } + + out := make([]string, len(streams)) + for i, s := range streams { + out[i] = string(s.Protocol()) + } + + return out, nil +} From e7f493b5696f427fd29c5de58ac960ab55a9ac56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 5 Apr 2018 20:22:49 +0200 Subject: [PATCH 3/8] fix infinite loop in connInfo.ID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/swarm.go | 4 ++-- core/coreapi/swarm.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 1f0b1216f63..92817e6f4e9 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -4,8 +4,8 @@ import ( "context" "time" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" ) // PeerInfo contains information about a peer diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index 4af0ce61aa9..3a9ffcf3692 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -8,12 +8,12 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - iaddr "gx/ipfs/QmQViVWBHbU6HmYjXcdNq7tVASCNgdg64ZGcauuDkLCivW/go-ipfs-addr" - swarm "gx/ipfs/QmSwZMWwFZSUpe5muU2xgTUwppH24KfMwdPXiwbEp2c6G5/go-libp2p-swarm" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" - pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" - net "gx/ipfs/QmXfkENeeBvh3zYA51MaSdGUdBjhQ99cP5WQe8zgr6wchG/go-libp2p-net" - peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" + swarm "gx/ipfs/QmeDpqUwwdye8ABKVMPXKuWwPVURFdqTqssbTUB39E2Nwd/go-libp2p-swarm" + iaddr "gx/ipfs/QmePSRaGafvmURQwQkHPDBJsaGwKXC1WpBBHVCQxdr8FPn/go-ipfs-addr" ) type SwarmAPI struct { @@ -121,7 +121,7 @@ func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) { } func (ci *connInfo) ID() peer.ID { - return ci.ID() + return ci.peer } func (ci *connInfo) Address() ma.Multiaddr { From 44bab585a5a79ff8f7a084996dcb012f6cd600a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 11 Sep 2018 13:36:55 +0200 Subject: [PATCH 4/8] coreapi swarm: swarm refactor fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/swarm.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index 3a9ffcf3692..bc208e26ae9 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -34,13 +34,11 @@ func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error { return coreiface.ErrOffline } - snet, ok := api.node.PeerHost.Network().(*swarm.Network) + swrm, ok := api.node.PeerHost.Network().(*swarm.Swarm) if !ok { return fmt.Errorf("peerhost network was not swarm") } - swrm := snet.Swarm() - ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) if err != nil { return err @@ -109,10 +107,13 @@ func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) { peer: pid, } - swcon, ok := c.(*swarm.Conn) - if ok { - ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) - } + /* + // FIXME(steb): + swcon, ok := c.(*swarm.Conn) + if ok { + ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) + } + */ out = append(out, ci) } @@ -133,10 +134,7 @@ func (ci *connInfo) Latency(context.Context) (time.Duration, error) { } func (ci *connInfo) Streams(context.Context) ([]string, error) { - streams, err := ci.conn.GetStreams() - if err != nil { - return nil, err - } + streams := ci.conn.GetStreams() out := make([]string, len(streams)) for i, s := range streams { From 30d42f4550ec80c9d77f0d0cd1421f622fcffcce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 11 Sep 2018 20:50:15 +0200 Subject: [PATCH 5/8] swarm cmd: port to new cmd lib MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/log.go | 22 +++ core/commands/root.go | 2 +- core/commands/swarm.go | 322 ++++++++++++-------------------- core/coreapi/interface/swarm.go | 2 +- core/coreapi/swarm.go | 10 +- 5 files changed, 151 insertions(+), 207 deletions(-) diff --git a/core/commands/log.go b/core/commands/log.go index 028061d1376..b4be7354edf 100644 --- a/core/commands/log.go +++ b/core/commands/log.go @@ -1,10 +1,12 @@ package commands import ( + "bytes" "fmt" "io" cmds "github.com/ipfs/go-ipfs/commands" + e "github.com/ipfs/go-ipfs/core/commands/e" "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" @@ -110,3 +112,23 @@ Outputs event log messages (not other log messages) as they are generated. res.SetOutput(r) }, } + +func stringListMarshaler(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + list, ok := v.(*stringList) + if !ok { + return nil, e.TypeErr(list, v) + } + + buf := new(bytes.Buffer) + for _, s := range list.Strings { + buf.WriteString(s) + buf.WriteString("\n") + } + + return buf, nil +} diff --git a/core/commands/root.go b/core/commands/root.go index 0268136f610..c141b61c2e3 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -136,7 +136,7 @@ var rootSubcommands = map[string]*cmds.Command{ "p2p": lgc.NewCommand(P2PCmd), "refs": lgc.NewCommand(RefsCmd), "resolve": ResolveCmd, - "swarm": lgc.NewCommand(SwarmCmd), + "swarm": SwarmCmd, "tar": lgc.NewCommand(TarCmd), "file": lgc.NewCommand(unixfs.UnixFSCmd), "update": lgc.NewCommand(ExternalBinary()), diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 685a8113b59..6779f8aee18 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -1,7 +1,6 @@ package commands import ( - "bytes" "errors" "fmt" "io" @@ -9,18 +8,20 @@ import ( "sort" "strings" - cmds "github.com/ipfs/go-ipfs/commands" - e "github.com/ipfs/go-ipfs/core/commands/e" - repo "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/commands" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/commands/e" + "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/fsrepo" - swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" + "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" mafilter "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" - cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" + "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr" - config "gx/ipfs/QmVBUpxsHh53rNcufqxMpLAmz37eGyLJUaexDy1W9YkiNk/go-ipfs-config" + "gx/ipfs/QmVBUpxsHh53rNcufqxMpLAmz37eGyLJUaexDy1W9YkiNk/go-ipfs-config" + "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" ) @@ -64,23 +65,20 @@ var swarmPeersCmd = &cmds.Command{ cmdkit.BoolOption("latency", "Also list information about latency to each peer"), cmdkit.BoolOption("direction", "Also list information about the direction of connection"), }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } - verbose, _, _ := req.Option("verbose").Bool() - latency, _, _ := req.Option("latency").Bool() - streams, _, _ := req.Option("streams").Bool() - direction, _, _ := req.Option("direction").Bool() + verbose, _ := req.Options["verbose"].(bool) + latency, _ := req.Options["latency"].(bool) + streams, _ := req.Options["streams"].(bool) + direction, _ := req.Options["direction"].(bool) conns := n.PeerHost.Network().Conns() var out connInfos @@ -125,50 +123,43 @@ var swarmPeersCmd = &cmds.Command{ } sort.Sort(&out) - res.SetOutput(&out) + return cmds.EmitOnce(res, &out) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { ci, ok := v.(*connInfos) if !ok { - return nil, e.TypeErr(ci, v) + return e.TypeErr(ci, v) } - buf := new(bytes.Buffer) pipfs := ma.ProtocolWithCode(ma.P_IPFS).Name for _, info := range ci.Peers { ids := fmt.Sprintf("/%s/%s", pipfs, info.Peer) if strings.HasSuffix(info.Addr, ids) { - fmt.Fprintf(buf, "%s", info.Addr) + fmt.Fprintf(w, "%s", info.Addr) } else { - fmt.Fprintf(buf, "%s%s", info.Addr, ids) + fmt.Fprintf(w, "%s%s", info.Addr, ids) } if info.Latency != "" { - fmt.Fprintf(buf, " %s", info.Latency) + fmt.Fprintf(w, " %s", info.Latency) } if info.Direction != inet.DirUnknown { - fmt.Fprintf(buf, " %s", directionString(info.Direction)) + fmt.Fprintf(w, " %s", directionString(info.Direction)) } - - fmt.Fprintln(buf) + fmt.Fprintln(w) for _, s := range info.Streams { if s.Protocol == "" { s.Protocol = "" } - fmt.Fprintf(buf, " %s\n", s.Protocol) + fmt.Fprintf(w, " %s\n", s.Protocol) } } - return buf, nil - }, + return nil + }), }, Type: connInfos{}, } @@ -237,17 +228,14 @@ var swarmAddrsCmd = &cmds.Command{ "local": swarmAddrsLocalCmd, "listen": swarmAddrsListenCmd, }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } addrs := make(map[string][]string) @@ -260,18 +248,13 @@ var swarmAddrsCmd = &cmds.Command{ sort.Sort(sort.StringSlice(addrs[s])) } - res.SetOutput(&addrMap{Addrs: addrs}) + return cmds.EmitOnce(res, &addrMap{Addrs: addrs}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { m, ok := v.(*addrMap) if !ok { - return nil, e.TypeErr(m, v) + return e.TypeErr(m, v) } // sort the ids first @@ -281,16 +264,15 @@ var swarmAddrsCmd = &cmds.Command{ } sort.Sort(sort.StringSlice(ids)) - buf := new(bytes.Buffer) for _, p := range ids { paddrs := m.Addrs[p] - buf.WriteString(fmt.Sprintf("%s (%d)\n", p, len(paddrs))) + fmt.Fprintf(w, "%s (%d)\n", p, len(paddrs)) for _, addr := range paddrs { - buf.WriteString("\t" + addr + "\n") + fmt.Fprintf(w, "\t"+addr+"\n") } } - return buf, nil - }, + return nil + }), }, Type: addrMap{}, } @@ -305,20 +287,17 @@ var swarmAddrsLocalCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("id", "Show peer ID in addresses."), }, - Run: func(req cmds.Request, res cmds.Response) { - iCtx := req.InvocContext() - n, err := iCtx.GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } - showid, _, _ := req.Option("id").Bool() + showid, _ := req.Options["id"].(bool) id := n.Identity.Pretty() var addrs []string @@ -330,11 +309,11 @@ var swarmAddrsLocalCmd = &cmds.Command{ addrs = append(addrs, saddr) } sort.Sort(sort.StringSlice(addrs)) - res.SetOutput(&stringList{addrs}) + return cmds.EmitOnce(res, &stringList{addrs}) }, Type: stringList{}, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, } @@ -345,24 +324,20 @@ var swarmAddrsListenCmd = &cmds.Command{ 'ipfs swarm addrs listen' lists all interface addresses the node is listening on. `, }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } var addrs []string maddrs, err := n.PeerHost.Network().InterfaceListenAddresses() if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } for _, addr := range maddrs { @@ -370,11 +345,11 @@ var swarmAddrsListenCmd = &cmds.Command{ } sort.Sort(sort.StringSlice(addrs)) - res.SetOutput(&stringList{addrs}) + return cmds.EmitOnce(res, &stringList{addrs}) }, Type: stringList{}, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, } @@ -392,33 +367,27 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Address of peer to connect to.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - ctx := req.Context() - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - addrs := req.Arguments() + addrs := req.Arguments if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } // FIXME(steb): Nasty swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal) - return + return fmt.Errorf("peerhost network was not swarm") } pis, err := peersWithAddresses(addrs) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output := make([]string, len(pis)) @@ -427,18 +396,17 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 output[i] = "connect " + pi.ID.Pretty() - err := n.PeerHost.Connect(ctx, pi) + err := n.PeerHost.Connect(req.Context, pi) if err != nil { - res.SetError(fmt.Errorf("%s failure: %s", output[i], err), cmdkit.ErrNormal) - return + return fmt.Errorf("%s failure: %s", output[i], err) } output[i] += " success" } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{addrs}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -459,24 +427,21 @@ it will reconnect. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Address of peer to disconnect from.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - addrs := req.Arguments() + addrs := req.Arguments if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return ErrNotOnline } iaddrs, err := parseAddresses(addrs) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output := make([]string, len(iaddrs)) @@ -516,34 +481,14 @@ it will reconnect. } } } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{output}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } -func stringListMarshaler(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - - list, ok := v.(*stringList) - if !ok { - return nil, e.TypeErr(list, v) - } - - buf := new(bytes.Buffer) - for _, s := range list.Strings { - buf.WriteString(s) - buf.WriteString("\n") - } - - return buf, nil -} - // parseAddresses is a function that takes in a slice of string peer addresses // (multiaddr + peerid) and returns slices of multiaddrs and peerids. func parseAddresses(addrs []string) (iaddrs []iaddr.IPFSAddr, err error) { @@ -608,38 +553,34 @@ Filters default to those specified under the "Swarm.AddrFilters" config key. "add": swarmFiltersAddCmd, "rm": swarmFiltersRmCmd, }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } // FIXME(steb) swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } var output []string for _, f := range swrm.Filters.Filters() { s, err := mafilter.ConvertIPNet(f) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output = append(output, s) } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{output}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -656,63 +597,54 @@ add your filters to the ipfs config file. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Multiaddr to filter.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } // FIXME(steb) swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } - if len(req.Arguments()) == 0 { - res.SetError(errors.New("no filters to add"), cmdkit.ErrClient) - return + if len(req.Arguments) == 0 { + return errors.New("no filters to add") } - r, err := fsrepo.Open(req.InvocContext().ConfigRoot) + r, err := fsrepo.Open(env.(*commands.Context).ConfigRoot) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } defer r.Close() cfg, err := r.Config() if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - for _, arg := range req.Arguments() { + for _, arg := range req.Arguments { mask, err := mafilter.NewMask(arg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } swrm.Filters.AddDialFilter(mask) } - added, err := filtersAdd(r, cfg, req.Arguments()) + added, err := filtersAdd(r, cfg, req.Arguments) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - + return err } - res.SetOutput(&stringList{added}) + return cmds.EmitOnce(res, &stringList{added}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -729,37 +661,32 @@ remove your filters from the ipfs config file. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Multiaddr filter to remove.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } - r, err := fsrepo.Open(req.InvocContext().ConfigRoot) + r, err := fsrepo.Open(env.(*commands.Context).ConfigRoot) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } defer r.Close() cfg, err := r.Config() if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if req.Arguments()[0] == "all" || req.Arguments()[0] == "*" { + if req.Arguments[0] == "all" || req.Arguments[0] == "*" { fs := swrm.Filters.Filters() for _, f := range fs { swrm.Filters.Remove(f) @@ -767,35 +694,30 @@ remove your filters from the ipfs config file. removed, err := filtersRemoveAll(r, cfg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - res.SetOutput(&stringList{removed}) - - return + return cmds.EmitOnce(res, &stringList{removed}) } - for _, arg := range req.Arguments() { + for _, arg := range req.Arguments { mask, err := mafilter.NewMask(arg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } swrm.Filters.Remove(mask) } - removed, err := filtersRemove(r, cfg, req.Arguments()) + removed, err := filtersRemove(r, cfg, req.Arguments) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - res.SetOutput(&stringList{removed}) + return cmds.EmitOnce(res, &stringList{removed}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 92817e6f4e9..2492f26967f 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -4,8 +4,8 @@ import ( "context" "time" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" ) // PeerInfo contains information about a peer diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index bc208e26ae9..044473b6636 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -8,12 +8,12 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" + iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - net "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net" - pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" - swarm "gx/ipfs/QmeDpqUwwdye8ABKVMPXKuWwPVURFdqTqssbTUB39E2Nwd/go-libp2p-swarm" - iaddr "gx/ipfs/QmePSRaGafvmURQwQkHPDBJsaGwKXC1WpBBHVCQxdr8FPn/go-ipfs-addr" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" + net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" ) type SwarmAPI struct { From df9f10189210804bb4bdc9b99d0e796a20f701e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 17 Sep 2018 16:45:59 +0200 Subject: [PATCH 6/8] coreapi swarm: rewire connect/disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/swarm.go | 63 +++++---------------------------- core/coreapi/interface/swarm.go | 23 +++++++----- core/coreapi/swarm.go | 51 ++++++++++++-------------- 3 files changed, 46 insertions(+), 91 deletions(-) diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 6779f8aee18..b02f379f879 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -368,23 +368,13 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 cmdkit.StringArg("address", true, true, "Address of peer to connect to.").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } addrs := req.Arguments - if n.PeerHost == nil { - return err - } - - // FIXME(steb): Nasty - swrm, ok := n.PeerHost.Network().(*swarm.Swarm) - if !ok { - return fmt.Errorf("peerhost network was not swarm") - } - pis, err := peersWithAddresses(addrs) if err != nil { return err @@ -392,18 +382,16 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 output := make([]string, len(pis)) for i, pi := range pis { - swrm.Backoff().Clear(pi.ID) - output[i] = "connect " + pi.ID.Pretty() - err := n.PeerHost.Connect(req.Context, pi) + err := api.Swarm().Connect(req.Context, pi) if err != nil { return fmt.Errorf("%s failure: %s", output[i], err) } output[i] += " success" } - return cmds.EmitOnce(res, &stringList{addrs}) + return cmds.EmitOnce(res, &stringList{output}) }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeEncoder(stringListEncoder), @@ -428,57 +416,24 @@ it will reconnect. cmdkit.StringArg("address", true, true, "Address of peer to disconnect from.").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - addrs := req.Arguments - - if n.PeerHost == nil { - return ErrNotOnline - } - - iaddrs, err := parseAddresses(addrs) + iaddrs, err := parseAddresses(req.Arguments) if err != nil { return err } output := make([]string, len(iaddrs)) for i, addr := range iaddrs { - taddr := addr.Transport() - id := addr.ID() - output[i] = "disconnect " + id.Pretty() - - net := n.PeerHost.Network() + output[i] = "disconnect " + addr.ID().Pretty() - if taddr == nil { - if net.Connectedness(id) != inet.Connected { - output[i] += " failure: not connected" - } else if err := net.ClosePeer(id); err != nil { - output[i] += " failure: " + err.Error() - } else { - output[i] += " success" - } + if err := api.Swarm().Disconnect(req.Context, addr.Multiaddr()); err != nil { + output[i] += " failure: " + err.Error() } else { - found := false - for _, conn := range net.ConnsToPeer(id) { - if !conn.RemoteMultiaddr().Equal(taddr) { - continue - } - - if err := conn.Close(); err != nil { - output[i] += " failure: " + err.Error() - } else { - output[i] += " success" - } - found = true - break - } - - if !found { - output[i] += " failure: conn not found" - } + output[i] += " success" } } return cmds.EmitOnce(res, &stringList{output}) diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 2492f26967f..7bd009f16f9 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -2,14 +2,22 @@ package iface import ( "context" + "errors" "time" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" ) -// PeerInfo contains information about a peer -type PeerInfo interface { +var ( + ErrNotConnected = errors.New("not connected") + ErrConnNotFound = errors.New("conn not found") + ) + +// ConnectionInfo contains information about a peer +type ConnectionInfo interface { // ID returns PeerID ID() peer.ID @@ -20,18 +28,17 @@ type PeerInfo interface { Latency(context.Context) (time.Duration, error) // Streams returns list of streams established with the peer - // TODO: should this return multicodecs? - Streams(context.Context) ([]string, error) + Streams(context.Context) ([]protocol.ID, error) } // SwarmAPI specifies the interface to libp2p swarm type SwarmAPI interface { - // Connect to a given address - Connect(context.Context, ma.Multiaddr) error + // Connect to a given peer + Connect(context.Context, pstore.PeerInfo) error // Disconnect from a given address Disconnect(context.Context, ma.Multiaddr) error // Peers returns the list of peers we are connected to - Peers(context.Context) ([]PeerInfo, error) + Peers(context.Context) ([]ConnectionInfo, error) } diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index 044473b6636..de072c8cdee 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -2,7 +2,6 @@ package coreapi import ( "context" - "errors" "fmt" "time" @@ -11,8 +10,10 @@ import ( swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" + inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" ) @@ -29,7 +30,7 @@ type connInfo struct { muxer string } -func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error { +func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error { if api.node.PeerHost == nil { return coreiface.ErrOffline } @@ -39,16 +40,6 @@ func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error { return fmt.Errorf("peerhost network was not swarm") } - ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) - if err != nil { - return err - } - - pi := pstore.PeerInfo{ - ID: ia.ID(), - Addrs: []ma.Multiaddr{ia.Transport()}, - } - swrm.Backoff().Clear(pi.ID) return api.node.PeerHost.Connect(ctx, pi) @@ -65,36 +56,38 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { } taddr := ia.Transport() + id := ia.ID() + net := api.node.PeerHost.Network() - found := false - conns := api.node.PeerHost.Network().ConnsToPeer(ia.ID()) - for _, conn := range conns { - if !conn.RemoteMultiaddr().Equal(taddr) { - continue + if taddr == nil { + if net.Connectedness(id) != inet.Connected { + return coreiface.ErrNotConnected + } else if err := net.ClosePeer(id); err != nil { + return err } + } else { + for _, conn := range net.ConnsToPeer(id) { + if !conn.RemoteMultiaddr().Equal(taddr) { + continue + } - if err := conn.Close(); err != nil { - return err + return conn.Close() } - found = true - break - } - if !found { - return errors.New("conn not found") + return coreiface.ErrConnNotFound } return nil } -func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) { +func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) { if api.node.PeerHost == nil { return nil, coreiface.ErrOffline } conns := api.node.PeerHost.Network().Conns() - var out []coreiface.PeerInfo + var out []coreiface.ConnectionInfo for _, c := range conns { pid := c.RemotePeer() addr := c.RemoteMultiaddr() @@ -133,12 +126,12 @@ func (ci *connInfo) Latency(context.Context) (time.Duration, error) { return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil } -func (ci *connInfo) Streams(context.Context) ([]string, error) { +func (ci *connInfo) Streams(context.Context) ([]protocol.ID, error) { streams := ci.conn.GetStreams() - out := make([]string, len(streams)) + out := make([]protocol.ID, len(streams)) for i, s := range streams { - out[i] = string(s.Protocol()) + out[i] = s.Protocol() } return out, nil From 6fa2ab0d411689073ed45ee9c17303d9d6e60e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 17 Sep 2018 19:53:15 +0200 Subject: [PATCH 7/8] coreapi swarm: rewire address listing cmds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/swarm.go | 84 ++++++++++++++++----------------- core/coreapi/interface/key.go | 3 ++ core/coreapi/interface/swarm.go | 10 +++- core/coreapi/key.go | 4 ++ core/coreapi/swarm.go | 42 +++++++++++++++++ test/sharness/lib/test-lib.sh | 5 +- 6 files changed, 102 insertions(+), 46 deletions(-) diff --git a/core/commands/swarm.go b/core/commands/swarm.go index b02f379f879..09fe464b03a 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -21,7 +21,7 @@ import ( "gx/ipfs/QmVBUpxsHh53rNcufqxMpLAmz37eGyLJUaexDy1W9YkiNk/go-ipfs-config" "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" ) @@ -66,45 +66,39 @@ var swarmPeersCmd = &cmds.Command{ cmdkit.BoolOption("direction", "Also list information about the direction of connection"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - if n.PeerHost == nil { - return err - } - verbose, _ := req.Options["verbose"].(bool) latency, _ := req.Options["latency"].(bool) streams, _ := req.Options["streams"].(bool) direction, _ := req.Options["direction"].(bool) - conns := n.PeerHost.Network().Conns() + conns, err := api.Swarm().Peers(req.Context) + if err != nil { + return err + } + var out connInfos for _, c := range conns { - pid := c.RemotePeer() - addr := c.RemoteMultiaddr() ci := connInfo{ - Addr: addr.String(), - Peer: pid.Pretty(), + Addr: c.Address().String(), + Peer: c.ID().Pretty(), } - /* - // FIXME(steb): - swcon, ok := c.(*swarm.Conn) - if ok { - ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) - } - */ - if verbose || direction { // set direction - ci.Direction = c.Stat().Direction + ci.Direction = c.Direction() } if verbose || latency { - lat := n.Peerstore.LatencyEWMA(pid) + lat, err := c.Latency(req.Context) + if err != nil { + return err + } + if lat == 0 { ci.Latency = "n/a" } else { @@ -112,10 +106,13 @@ var swarmPeersCmd = &cmds.Command{ } } if verbose || streams { - strs := c.GetStreams() + strs, err := c.Streams(req.Context) + if err != nil { + return err + } for _, s := range strs { - ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s.Protocol())}) + ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s)}) } } sort.Sort(&ci) @@ -229,26 +226,25 @@ var swarmAddrsCmd = &cmds.Command{ "listen": swarmAddrsListenCmd, }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - if n.PeerHost == nil { + addrs, err := api.Swarm().KnownAddrs(req.Context) + if err != nil { return err } - addrs := make(map[string][]string) - ps := n.PeerHost.Network().Peerstore() - for _, p := range ps.Peers() { + out := make(map[string][]string) + for p, paddrs := range addrs { s := p.Pretty() - for _, a := range ps.Addrs(p) { - addrs[s] = append(addrs[s], a.String()) + for _, a := range paddrs { + out[s] = append(out[s], a.String()) } - sort.Sort(sort.StringSlice(addrs[s])) } - return cmds.EmitOnce(res, &addrMap{Addrs: addrs}) + return cmds.EmitOnce(res, &addrMap{Addrs: out}) }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { @@ -288,23 +284,27 @@ var swarmAddrsLocalCmd = &cmds.Command{ cmdkit.BoolOption("id", "Show peer ID in addresses."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - if n.PeerHost == nil { + showid, _ := req.Options["id"].(bool) + self, err := api.Key().Self(req.Context) + if err != nil { return err } - showid, _ := req.Options["id"].(bool) - id := n.Identity.Pretty() + maddrs, err := api.Swarm().LocalAddrs(req.Context) + if err != nil { + return err + } var addrs []string - for _, addr := range n.PeerHost.Addrs() { + for _, addr := range maddrs { saddr := addr.String() if showid { - saddr = path.Join(saddr, "ipfs", id) + saddr = path.Join(saddr, "ipfs", self.ID().Pretty()) } addrs = append(addrs, saddr) } @@ -325,17 +325,13 @@ var swarmAddrsListenCmd = &cmds.Command{ `, }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - if n.PeerHost == nil { - return err - } - var addrs []string - maddrs, err := n.PeerHost.Network().InterfaceListenAddresses() + maddrs, err := api.Swarm().ListenAddrs(req.Context) if err != nil { return err } diff --git a/core/coreapi/interface/key.go b/core/coreapi/interface/key.go index 4305ae20d20..cc6dc890044 100644 --- a/core/coreapi/interface/key.go +++ b/core/coreapi/interface/key.go @@ -33,6 +33,9 @@ type KeyAPI interface { // List lists keys stored in keystore List(ctx context.Context) ([]Key, error) + // Self returns the 'main' node key + Self(ctx context.Context) (Key, error) + // Remove removes keys from keystore. Returns ipns path of the removed key Remove(ctx context.Context, name string) (Key, error) } diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 7bd009f16f9..caa6a70e327 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -9,12 +9,13 @@ import ( "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" + net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" ) var ( ErrNotConnected = errors.New("not connected") ErrConnNotFound = errors.New("conn not found") - ) +) // ConnectionInfo contains information about a peer type ConnectionInfo interface { @@ -24,6 +25,9 @@ type ConnectionInfo interface { // Address returns the multiaddress via which we are connected with the peer Address() ma.Multiaddr + // Direction returns which way the connection was established + Direction() net.Direction + // Latency returns last known round trip time to the peer Latency(context.Context) (time.Duration, error) @@ -41,4 +45,8 @@ type SwarmAPI interface { // Peers returns the list of peers we are connected to Peers(context.Context) ([]ConnectionInfo, error) + + KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) + LocalAddrs(context.Context) ([]ma.Multiaddr, error) + ListenAddrs(context.Context) ([]ma.Multiaddr, error) } diff --git a/core/coreapi/key.go b/core/coreapi/key.go index 9bc4e1aef19..9fe207785ba 100644 --- a/core/coreapi/key.go +++ b/core/coreapi/key.go @@ -216,3 +216,7 @@ func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, erro return &key{"", pid}, nil } + +func (api *KeyAPI) Self(ctx context.Context) (coreiface.Key, error) { + return &key{"self", api.node.Identity}, nil +} diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index de072c8cdee..b780682b25e 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -3,6 +3,7 @@ package coreapi import ( "context" "fmt" + "sort" "time" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" @@ -24,6 +25,7 @@ type SwarmAPI struct { type connInfo struct { api *CoreAPI conn net.Conn + dir net.Direction addr ma.Multiaddr peer peer.ID @@ -80,6 +82,41 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { return nil } +func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + addrs := make(map[peer.ID][]ma.Multiaddr) + ps := api.node.PeerHost.Network().Peerstore() + for _, p := range ps.Peers() { + for _, a := range ps.Addrs(p) { + addrs[p] = append(addrs[p], a) + } + sort.Slice(addrs[p], func(i, j int) bool { + return addrs[p][i].String() < addrs[p][j].String() + }) + } + + return addrs, nil +} + +func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + return api.node.PeerHost.Addrs(), nil +} + +func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + return api.node.PeerHost.Network().InterfaceListenAddresses() +} + func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) { if api.node.PeerHost == nil { return nil, coreiface.ErrOffline @@ -95,6 +132,7 @@ func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) ci := &connInfo{ api: api.CoreAPI, conn: c, + dir: c.Stat().Direction, addr: addr, peer: pid, @@ -122,6 +160,10 @@ func (ci *connInfo) Address() ma.Multiaddr { return ci.addr } +func (ci *connInfo) Direction() net.Direction { + return ci.dir +} + func (ci *connInfo) Latency(context.Context) (time.Duration, error) { return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil } diff --git a/test/sharness/lib/test-lib.sh b/test/sharness/lib/test-lib.sh index 8a54a6abf14..05cfc57d1fa 100644 --- a/test/sharness/lib/test-lib.sh +++ b/test/sharness/lib/test-lib.sh @@ -199,8 +199,11 @@ test_set_address_vars() { ' if ipfs swarm addrs local >/dev/null 2>&1; then + test_expect_success "get swarm addresses" ' + ipfs swarm addrs local > addrs_out + ' + test_expect_success "set swarm address vars" ' - ipfs swarm addrs local > addrs_out && SWARM_MADDR=$(grep "127.0.0.1" addrs_out) && SWARM_PORT=$(port_from_maddr $SWARM_MADDR) ' From b2f6c3e3ed145725c7c9a1e9b6cdb705c8c87bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 2 Oct 2018 12:31:50 +0200 Subject: [PATCH 8/8] coreapi swarm: missing docs, review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/swarm.go | 4 ++-- core/coreapi/interface/swarm.go | 9 +++++++-- core/coreapi/swarm.go | 12 ++++-------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 09fe464b03a..295f955f430 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -94,7 +94,7 @@ var swarmPeersCmd = &cmds.Command{ } if verbose || latency { - lat, err := c.Latency(req.Context) + lat, err := c.Latency() if err != nil { return err } @@ -106,7 +106,7 @@ var swarmPeersCmd = &cmds.Command{ } } if verbose || streams { - strs, err := c.Streams(req.Context) + strs, err := c.Streams() if err != nil { return err } diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index caa6a70e327..8b464e5c1f0 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -29,10 +29,10 @@ type ConnectionInfo interface { Direction() net.Direction // Latency returns last known round trip time to the peer - Latency(context.Context) (time.Duration, error) + Latency() (time.Duration, error) // Streams returns list of streams established with the peer - Streams(context.Context) ([]protocol.ID, error) + Streams() ([]protocol.ID, error) } // SwarmAPI specifies the interface to libp2p swarm @@ -46,7 +46,12 @@ type SwarmAPI interface { // Peers returns the list of peers we are connected to Peers(context.Context) ([]ConnectionInfo, error) + // KnownAddrs returns the list of all addresses this node is aware of KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) + + // LocalAddrs returns the list of announced listening addresses LocalAddrs(context.Context) ([]ma.Multiaddr, error) + + // ListenAddrs returns the list of all listening addresses ListenAddrs(context.Context) ([]ma.Multiaddr, error) } diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go index b780682b25e..47641c9dc1f 100644 --- a/core/coreapi/swarm.go +++ b/core/coreapi/swarm.go @@ -2,7 +2,6 @@ package coreapi import ( "context" - "fmt" "sort" "time" @@ -37,13 +36,10 @@ func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error { return coreiface.ErrOffline } - swrm, ok := api.node.PeerHost.Network().(*swarm.Swarm) - if !ok { - return fmt.Errorf("peerhost network was not swarm") + if swrm, ok := api.node.PeerHost.Network().(*swarm.Swarm); ok { + swrm.Backoff().Clear(pi.ID) } - swrm.Backoff().Clear(pi.ID) - return api.node.PeerHost.Connect(ctx, pi) } @@ -164,11 +160,11 @@ func (ci *connInfo) Direction() net.Direction { return ci.dir } -func (ci *connInfo) Latency(context.Context) (time.Duration, error) { +func (ci *connInfo) Latency() (time.Duration, error) { return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil } -func (ci *connInfo) Streams(context.Context) ([]protocol.ID, error) { +func (ci *connInfo) Streams() ([]protocol.ID, error) { streams := ci.conn.GetStreams() out := make([]protocol.ID, len(streams))