diff --git a/core/commands/log.go b/core/commands/log.go index 028061d1376..b4be7354edf 100644 --- a/core/commands/log.go +++ b/core/commands/log.go @@ -1,10 +1,12 @@ package commands import ( + "bytes" "fmt" "io" cmds "github.com/ipfs/go-ipfs/commands" + e "github.com/ipfs/go-ipfs/core/commands/e" "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" @@ -110,3 +112,23 @@ Outputs event log messages (not other log messages) as they are generated. res.SetOutput(r) }, } + +func stringListMarshaler(res cmds.Response) (io.Reader, error) { + v, err := unwrapOutput(res.Output()) + if err != nil { + return nil, err + } + + list, ok := v.(*stringList) + if !ok { + return nil, e.TypeErr(list, v) + } + + buf := new(bytes.Buffer) + for _, s := range list.Strings { + buf.WriteString(s) + buf.WriteString("\n") + } + + return buf, nil +} diff --git a/core/commands/root.go b/core/commands/root.go index 900530a216b..4c53c1226f8 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -137,7 +137,7 @@ var rootSubcommands = map[string]*cmds.Command{ "p2p": lgc.NewCommand(P2PCmd), "refs": lgc.NewCommand(RefsCmd), "resolve": ResolveCmd, - "swarm": lgc.NewCommand(SwarmCmd), + "swarm": SwarmCmd, "tar": lgc.NewCommand(TarCmd), "file": lgc.NewCommand(unixfs.UnixFSCmd), "update": lgc.NewCommand(ExternalBinary()), diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 685a8113b59..295f955f430 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -1,7 +1,6 @@ package commands import ( - "bytes" "errors" "fmt" "io" @@ -9,16 +8,18 @@ import ( "sort" "strings" - cmds "github.com/ipfs/go-ipfs/commands" - e "github.com/ipfs/go-ipfs/core/commands/e" - repo "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/commands" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/commands/e" + "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/fsrepo" - swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" + "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" mafilter "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter" - cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" + "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr" - config "gx/ipfs/QmVBUpxsHh53rNcufqxMpLAmz37eGyLJUaexDy1W9YkiNk/go-ipfs-config" + "gx/ipfs/QmVBUpxsHh53rNcufqxMpLAmz37eGyLJUaexDy1W9YkiNk/go-ipfs-config" + "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" @@ -64,49 +65,40 @@ var swarmPeersCmd = &cmds.Command{ cmdkit.BoolOption("latency", "Also list information about latency to each peer"), cmdkit.BoolOption("direction", "Also list information about the direction of connection"), }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return - } + verbose, _ := req.Options["verbose"].(bool) + latency, _ := req.Options["latency"].(bool) + streams, _ := req.Options["streams"].(bool) + direction, _ := req.Options["direction"].(bool) - verbose, _, _ := req.Option("verbose").Bool() - latency, _, _ := req.Option("latency").Bool() - streams, _, _ := req.Option("streams").Bool() - direction, _, _ := req.Option("direction").Bool() + conns, err := api.Swarm().Peers(req.Context) + if err != nil { + return err + } - conns := n.PeerHost.Network().Conns() var out connInfos for _, c := range conns { - pid := c.RemotePeer() - addr := c.RemoteMultiaddr() ci := connInfo{ - Addr: addr.String(), - Peer: pid.Pretty(), + Addr: c.Address().String(), + Peer: c.ID().Pretty(), } - /* - // FIXME(steb): - swcon, ok := c.(*swarm.Conn) - if ok { - ci.Muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) - } - */ - if verbose || direction { // set direction - ci.Direction = c.Stat().Direction + ci.Direction = c.Direction() } if verbose || latency { - lat := n.Peerstore.LatencyEWMA(pid) + lat, err := c.Latency() + if err != nil { + return err + } + if lat == 0 { ci.Latency = "n/a" } else { @@ -114,10 +106,13 @@ var swarmPeersCmd = &cmds.Command{ } } if verbose || streams { - strs := c.GetStreams() + strs, err := c.Streams() + if err != nil { + return err + } for _, s := range strs { - ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s.Protocol())}) + ci.Streams = append(ci.Streams, streamInfo{Protocol: string(s)}) } } sort.Sort(&ci) @@ -125,50 +120,43 @@ var swarmPeersCmd = &cmds.Command{ } sort.Sort(&out) - res.SetOutput(&out) + return cmds.EmitOnce(res, &out) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { ci, ok := v.(*connInfos) if !ok { - return nil, e.TypeErr(ci, v) + return e.TypeErr(ci, v) } - buf := new(bytes.Buffer) pipfs := ma.ProtocolWithCode(ma.P_IPFS).Name for _, info := range ci.Peers { ids := fmt.Sprintf("/%s/%s", pipfs, info.Peer) if strings.HasSuffix(info.Addr, ids) { - fmt.Fprintf(buf, "%s", info.Addr) + fmt.Fprintf(w, "%s", info.Addr) } else { - fmt.Fprintf(buf, "%s%s", info.Addr, ids) + fmt.Fprintf(w, "%s%s", info.Addr, ids) } if info.Latency != "" { - fmt.Fprintf(buf, " %s", info.Latency) + fmt.Fprintf(w, " %s", info.Latency) } if info.Direction != inet.DirUnknown { - fmt.Fprintf(buf, " %s", directionString(info.Direction)) + fmt.Fprintf(w, " %s", directionString(info.Direction)) } - - fmt.Fprintln(buf) + fmt.Fprintln(w) for _, s := range info.Streams { if s.Protocol == "" { s.Protocol = "" } - fmt.Fprintf(buf, " %s\n", s.Protocol) + fmt.Fprintf(w, " %s\n", s.Protocol) } } - return buf, nil - }, + return nil + }), }, Type: connInfos{}, } @@ -237,41 +225,32 @@ var swarmAddrsCmd = &cmds.Command{ "local": swarmAddrsLocalCmd, "listen": swarmAddrsListenCmd, }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + addrs, err := api.Swarm().KnownAddrs(req.Context) + if err != nil { + return err } - addrs := make(map[string][]string) - ps := n.PeerHost.Network().Peerstore() - for _, p := range ps.Peers() { + out := make(map[string][]string) + for p, paddrs := range addrs { s := p.Pretty() - for _, a := range ps.Addrs(p) { - addrs[s] = append(addrs[s], a.String()) + for _, a := range paddrs { + out[s] = append(out[s], a.String()) } - sort.Sort(sort.StringSlice(addrs[s])) } - res.SetOutput(&addrMap{Addrs: addrs}) + return cmds.EmitOnce(res, &addrMap{Addrs: out}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { m, ok := v.(*addrMap) if !ok { - return nil, e.TypeErr(m, v) + return e.TypeErr(m, v) } // sort the ids first @@ -281,16 +260,15 @@ var swarmAddrsCmd = &cmds.Command{ } sort.Sort(sort.StringSlice(ids)) - buf := new(bytes.Buffer) for _, p := range ids { paddrs := m.Addrs[p] - buf.WriteString(fmt.Sprintf("%s (%d)\n", p, len(paddrs))) + fmt.Fprintf(w, "%s (%d)\n", p, len(paddrs)) for _, addr := range paddrs { - buf.WriteString("\t" + addr + "\n") + fmt.Fprintf(w, "\t"+addr+"\n") } } - return buf, nil - }, + return nil + }), }, Type: addrMap{}, } @@ -305,36 +283,37 @@ var swarmAddrsLocalCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("id", "Show peer ID in addresses."), }, - Run: func(req cmds.Request, res cmds.Response) { - iCtx := req.InvocContext() - n, err := iCtx.GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + showid, _ := req.Options["id"].(bool) + self, err := api.Key().Self(req.Context) + if err != nil { + return err } - showid, _, _ := req.Option("id").Bool() - id := n.Identity.Pretty() + maddrs, err := api.Swarm().LocalAddrs(req.Context) + if err != nil { + return err + } var addrs []string - for _, addr := range n.PeerHost.Addrs() { + for _, addr := range maddrs { saddr := addr.String() if showid { - saddr = path.Join(saddr, "ipfs", id) + saddr = path.Join(saddr, "ipfs", self.ID().Pretty()) } addrs = append(addrs, saddr) } sort.Sort(sort.StringSlice(addrs)) - res.SetOutput(&stringList{addrs}) + return cmds.EmitOnce(res, &stringList{addrs}) }, Type: stringList{}, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, } @@ -345,24 +324,16 @@ var swarmAddrsListenCmd = &cmds.Command{ 'ipfs swarm addrs listen' lists all interface addresses the node is listening on. `, }, - Run: func(req cmds.Request, res cmds.Response) { - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } var addrs []string - maddrs, err := n.PeerHost.Network().InterfaceListenAddresses() + maddrs, err := api.Swarm().ListenAddrs(req.Context) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } for _, addr := range maddrs { @@ -370,11 +341,11 @@ var swarmAddrsListenCmd = &cmds.Command{ } sort.Sort(sort.StringSlice(addrs)) - res.SetOutput(&stringList{addrs}) + return cmds.EmitOnce(res, &stringList{addrs}) }, Type: stringList{}, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, } @@ -392,53 +363,34 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3 Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Address of peer to connect to.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - ctx := req.Context() - - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - addrs := req.Arguments() - - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } - // FIXME(steb): Nasty - swrm, ok := n.PeerHost.Network().(*swarm.Swarm) - if !ok { - res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal) - return - } + addrs := req.Arguments pis, err := peersWithAddresses(addrs) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output := make([]string, len(pis)) for i, pi := range pis { - swrm.Backoff().Clear(pi.ID) - output[i] = "connect " + pi.ID.Pretty() - err := n.PeerHost.Connect(ctx, pi) + err := api.Swarm().Connect(req.Context, pi) if err != nil { - res.SetError(fmt.Errorf("%s failure: %s", output[i], err), cmdkit.ErrNormal) - return + return fmt.Errorf("%s failure: %s", output[i], err) } output[i] += " success" } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{output}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -459,91 +411,35 @@ it will reconnect. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Address of peer to disconnect from.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - addrs := req.Arguments() - - if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrClient) - return + return err } - iaddrs, err := parseAddresses(addrs) + iaddrs, err := parseAddresses(req.Arguments) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output := make([]string, len(iaddrs)) for i, addr := range iaddrs { - taddr := addr.Transport() - id := addr.ID() - output[i] = "disconnect " + id.Pretty() + output[i] = "disconnect " + addr.ID().Pretty() - net := n.PeerHost.Network() - - if taddr == nil { - if net.Connectedness(id) != inet.Connected { - output[i] += " failure: not connected" - } else if err := net.ClosePeer(id); err != nil { - output[i] += " failure: " + err.Error() - } else { - output[i] += " success" - } + if err := api.Swarm().Disconnect(req.Context, addr.Multiaddr()); err != nil { + output[i] += " failure: " + err.Error() } else { - found := false - for _, conn := range net.ConnsToPeer(id) { - if !conn.RemoteMultiaddr().Equal(taddr) { - continue - } - - if err := conn.Close(); err != nil { - output[i] += " failure: " + err.Error() - } else { - output[i] += " success" - } - found = true - break - } - - if !found { - output[i] += " failure: conn not found" - } + output[i] += " success" } } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{output}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } -func stringListMarshaler(res cmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } - - list, ok := v.(*stringList) - if !ok { - return nil, e.TypeErr(list, v) - } - - buf := new(bytes.Buffer) - for _, s := range list.Strings { - buf.WriteString(s) - buf.WriteString("\n") - } - - return buf, nil -} - // parseAddresses is a function that takes in a slice of string peer addresses // (multiaddr + peerid) and returns slices of multiaddrs and peerids. func parseAddresses(addrs []string) (iaddrs []iaddr.IPFSAddr, err error) { @@ -608,38 +504,34 @@ Filters default to those specified under the "Swarm.AddrFilters" config key. "add": swarmFiltersAddCmd, "rm": swarmFiltersRmCmd, }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } // FIXME(steb) swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } var output []string for _, f := range swrm.Filters.Filters() { s, err := mafilter.ConvertIPNet(f) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } output = append(output, s) } - res.SetOutput(&stringList{output}) + return cmds.EmitOnce(res, &stringList{output}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -656,63 +548,54 @@ add your filters to the ipfs config file. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Multiaddr to filter.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } // FIXME(steb) swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } - if len(req.Arguments()) == 0 { - res.SetError(errors.New("no filters to add"), cmdkit.ErrClient) - return + if len(req.Arguments) == 0 { + return errors.New("no filters to add") } - r, err := fsrepo.Open(req.InvocContext().ConfigRoot) + r, err := fsrepo.Open(env.(*commands.Context).ConfigRoot) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } defer r.Close() cfg, err := r.Config() if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - for _, arg := range req.Arguments() { + for _, arg := range req.Arguments { mask, err := mafilter.NewMask(arg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } swrm.Filters.AddDialFilter(mask) } - added, err := filtersAdd(r, cfg, req.Arguments()) + added, err := filtersAdd(r, cfg, req.Arguments) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - + return err } - res.SetOutput(&stringList{added}) + return cmds.EmitOnce(res, &stringList{added}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } @@ -729,37 +612,32 @@ remove your filters from the ipfs config file. Arguments: []cmdkit.Argument{ cmdkit.StringArg("address", true, true, "Multiaddr filter to remove.").EnableStdin(), }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } if n.PeerHost == nil { - res.SetError(ErrNotOnline, cmdkit.ErrNormal) - return + return ErrNotOnline } swrm, ok := n.PeerHost.Network().(*swarm.Swarm) if !ok { - res.SetError(errors.New("failed to cast network to swarm network"), cmdkit.ErrNormal) - return + return errors.New("failed to cast network to swarm network") } - r, err := fsrepo.Open(req.InvocContext().ConfigRoot) + r, err := fsrepo.Open(env.(*commands.Context).ConfigRoot) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } defer r.Close() cfg, err := r.Config() if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - if req.Arguments()[0] == "all" || req.Arguments()[0] == "*" { + if req.Arguments[0] == "all" || req.Arguments[0] == "*" { fs := swrm.Filters.Filters() for _, f := range fs { swrm.Filters.Remove(f) @@ -767,35 +645,30 @@ remove your filters from the ipfs config file. removed, err := filtersRemoveAll(r, cfg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - res.SetOutput(&stringList{removed}) - - return + return cmds.EmitOnce(res, &stringList{removed}) } - for _, arg := range req.Arguments() { + for _, arg := range req.Arguments { mask, err := mafilter.NewMask(arg) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } swrm.Filters.Remove(mask) } - removed, err := filtersRemove(r, cfg, req.Arguments()) + removed, err := filtersRemove(r, cfg, req.Arguments) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - res.SetOutput(&stringList{removed}) + return cmds.EmitOnce(res, &stringList{removed}) }, - Marshalers: cmds.MarshalerMap{ - cmds.Text: stringListMarshaler, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeEncoder(stringListEncoder), }, Type: stringList{}, } diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index bb7afd61a51..8a0410a2dae 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -67,3 +67,8 @@ func (api *CoreAPI) Pin() coreiface.PinAPI { func (api *CoreAPI) Dht() coreiface.DhtAPI { return (*DhtAPI)(api) } + +// Swarm returns the SwarmAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) Swarm() coreiface.SwarmAPI { + return &SwarmAPI{api} +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 0053d472e6b..0b153b6f9a1 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -34,6 +34,9 @@ type CoreAPI interface { // Dht returns an implementation of Dht API Dht() DhtAPI + // Swarm returns an implementation of Swarm API + Swarm() SwarmAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/key.go b/core/coreapi/interface/key.go index 4305ae20d20..cc6dc890044 100644 --- a/core/coreapi/interface/key.go +++ b/core/coreapi/interface/key.go @@ -33,6 +33,9 @@ type KeyAPI interface { // List lists keys stored in keystore List(ctx context.Context) ([]Key, error) + // Self returns the 'main' node key + Self(ctx context.Context) (Key, error) + // Remove removes keys from keystore. Returns ipns path of the removed key Remove(ctx context.Context, name string) (Key, error) } diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go new file mode 100644 index 00000000000..8b464e5c1f0 --- /dev/null +++ b/core/coreapi/interface/swarm.go @@ -0,0 +1,57 @@ +package iface + +import ( + "context" + "errors" + "time" + + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" + net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" +) + +var ( + ErrNotConnected = errors.New("not connected") + ErrConnNotFound = errors.New("conn not found") +) + +// ConnectionInfo contains information about a peer +type ConnectionInfo interface { + // ID returns PeerID + ID() peer.ID + + // Address returns the multiaddress via which we are connected with the peer + Address() ma.Multiaddr + + // Direction returns which way the connection was established + Direction() net.Direction + + // Latency returns last known round trip time to the peer + Latency() (time.Duration, error) + + // Streams returns list of streams established with the peer + Streams() ([]protocol.ID, error) +} + +// SwarmAPI specifies the interface to libp2p swarm +type SwarmAPI interface { + // Connect to a given peer + Connect(context.Context, pstore.PeerInfo) error + + // Disconnect from a given address + Disconnect(context.Context, ma.Multiaddr) error + + // Peers returns the list of peers we are connected to + Peers(context.Context) ([]ConnectionInfo, error) + + // KnownAddrs returns the list of all addresses this node is aware of + KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) + + // LocalAddrs returns the list of announced listening addresses + LocalAddrs(context.Context) ([]ma.Multiaddr, error) + + // ListenAddrs returns the list of all listening addresses + ListenAddrs(context.Context) ([]ma.Multiaddr, error) +} diff --git a/core/coreapi/key.go b/core/coreapi/key.go index 9bc4e1aef19..9fe207785ba 100644 --- a/core/coreapi/key.go +++ b/core/coreapi/key.go @@ -216,3 +216,7 @@ func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, erro return &key{"", pid}, nil } + +func (api *KeyAPI) Self(ctx context.Context) (coreiface.Key, error) { + return &key{"self", api.node.Identity}, nil +} diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go new file mode 100644 index 00000000000..47641c9dc1f --- /dev/null +++ b/core/coreapi/swarm.go @@ -0,0 +1,176 @@ +package coreapi + +import ( + "context" + "sort" + "time" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + + swarm "gx/ipfs/QmPQoCVRHaGD25VffyB7DFV5qP65hFSQJdSDy75P1vYBKe/go-libp2p-swarm" + iaddr "gx/ipfs/QmSzdvo9aPzLj4HXWTcgGAp8N84tZc8LbLmFZFwUb1dpWk/go-ipfs-addr" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" + inet "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" + net "gx/ipfs/QmfDPh144WGBqRxZb1TGDHerbMnZATrHZggAPw7putNnBq/go-libp2p-net" +) + +type SwarmAPI struct { + *CoreAPI +} + +type connInfo struct { + api *CoreAPI + conn net.Conn + dir net.Direction + + addr ma.Multiaddr + peer peer.ID + muxer string +} + +func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + if swrm, ok := api.node.PeerHost.Network().(*swarm.Swarm); ok { + swrm.Backoff().Clear(pi.ID) + } + + return api.node.PeerHost.Connect(ctx, pi) +} + +func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) + if err != nil { + return err + } + + taddr := ia.Transport() + id := ia.ID() + net := api.node.PeerHost.Network() + + if taddr == nil { + if net.Connectedness(id) != inet.Connected { + return coreiface.ErrNotConnected + } else if err := net.ClosePeer(id); err != nil { + return err + } + } else { + for _, conn := range net.ConnsToPeer(id) { + if !conn.RemoteMultiaddr().Equal(taddr) { + continue + } + + return conn.Close() + } + + return coreiface.ErrConnNotFound + } + + return nil +} + +func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + addrs := make(map[peer.ID][]ma.Multiaddr) + ps := api.node.PeerHost.Network().Peerstore() + for _, p := range ps.Peers() { + for _, a := range ps.Addrs(p) { + addrs[p] = append(addrs[p], a) + } + sort.Slice(addrs[p], func(i, j int) bool { + return addrs[p][i].String() < addrs[p][j].String() + }) + } + + return addrs, nil +} + +func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + return api.node.PeerHost.Addrs(), nil +} + +func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + return api.node.PeerHost.Network().InterfaceListenAddresses() +} + +func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + conns := api.node.PeerHost.Network().Conns() + + var out []coreiface.ConnectionInfo + for _, c := range conns { + pid := c.RemotePeer() + addr := c.RemoteMultiaddr() + + ci := &connInfo{ + api: api.CoreAPI, + conn: c, + dir: c.Stat().Direction, + + addr: addr, + peer: pid, + } + + /* + // FIXME(steb): + swcon, ok := c.(*swarm.Conn) + if ok { + ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) + } + */ + + out = append(out, ci) + } + + return out, nil +} + +func (ci *connInfo) ID() peer.ID { + return ci.peer +} + +func (ci *connInfo) Address() ma.Multiaddr { + return ci.addr +} + +func (ci *connInfo) Direction() net.Direction { + return ci.dir +} + +func (ci *connInfo) Latency() (time.Duration, error) { + return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil +} + +func (ci *connInfo) Streams() ([]protocol.ID, error) { + streams := ci.conn.GetStreams() + + out := make([]protocol.ID, len(streams)) + for i, s := range streams { + out[i] = s.Protocol() + } + + return out, nil +} diff --git a/test/sharness/lib/test-lib.sh b/test/sharness/lib/test-lib.sh index 8a54a6abf14..05cfc57d1fa 100644 --- a/test/sharness/lib/test-lib.sh +++ b/test/sharness/lib/test-lib.sh @@ -199,8 +199,11 @@ test_set_address_vars() { ' if ipfs swarm addrs local >/dev/null 2>&1; then + test_expect_success "get swarm addresses" ' + ipfs swarm addrs local > addrs_out + ' + test_expect_success "set swarm address vars" ' - ipfs swarm addrs local > addrs_out && SWARM_MADDR=$(grep "127.0.0.1" addrs_out) && SWARM_PORT=$(port_from_maddr $SWARM_MADDR) '