Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Add Provide RPC #37

Merged
merged 11 commits into from
Aug 10, 2022
13 changes: 11 additions & 2 deletions client/contentrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -18,8 +19,16 @@ func NewContentRoutingClient(c DelegatedRoutingClient) *ContentRoutingClient {
return &ContentRoutingClient{client: c}
}

func (c *ContentRoutingClient) Provide(context.Context, cid.Cid, bool) error {
return routing.ErrNotSupported
func (c *ContentRoutingClient) Provide(ctx context.Context, key cid.Cid, announce bool) error {
// If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
if !announce {
return nil
}

_, err := c.client.Provide(ctx, key, 24*time.Hour)
return err
}

func (c *ContentRoutingClient) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
Expand Down
6 changes: 5 additions & 1 deletion client/contentrouting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (t TestDelegatedRoutingClient) PutIPNSAsync(ctx context.Context, id []byte,
panic("not supported")
}

func (t TestDelegatedRoutingClient) Provide(ctx context.Context, key cid.Cid, provider peer.AddrInfo, ttl time.Duration) (<-chan time.Duration, error) {
func (t TestDelegatedRoutingClient) ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error) {
panic("not supported")
}

func (t TestDelegatedRoutingClient) Provide(ctx context.Context, key cid.Cid, tl time.Duration) (time.Duration, error) {
panic("not supported")
}

Expand Down
20 changes: 19 additions & 1 deletion client/findproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package client

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/edelweiss/values"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
record "github.com/libp2p/go-libp2p-record"
"github.com/multiformats/go-multiaddr"
Expand All @@ -23,18 +25,34 @@ type DelegatedRoutingClient interface {
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key cid.Cid, provider peer.AddrInfo, ttl time.Duration) (<-chan time.Duration, error)
Provide(ctx context.Context, key cid.Cid, ttl time.Duration) (time.Duration, error)
ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
}

type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
}

var _ DelegatedRoutingClient = (*Client)(nil)

func NewClient(c proto.DelegatedRouting_Client) *Client {
return &Client{client: c, validator: ipns.Validator{}}
}

// SetIdentity sets an identity for providing content. the `Provide` methods will not work without it set.
func (c *Client) SetIdentity(p *Provider, identity crypto.PrivKey) error {
willscott marked this conversation as resolved.
Show resolved Hide resolved
if !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return errors.New("identity does not match provider")
}
c.provider = p
c.identity = identity
return nil
}

func (fp *Client) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
resps, err := fp.client.FindProviders(ctx, cidsToFindProvidersRequest(key))
if err != nil {
Expand Down
115 changes: 102 additions & 13 deletions client/provide.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (tp *TransferProtocol) ToProto() proto.TransferProtocol {
}
}

func parseProtocol(tp *proto.TransferProtocol) TransferProtocol {
func parseProtocol(tp *proto.TransferProtocol) (TransferProtocol, error) {
if tp.Bitswap != nil {
return TransferProtocol{Codec: multicodec.TransportBitswap}
return TransferProtocol{Codec: multicodec.TransportBitswap}, nil
} else if tp.GraphSyncFILv1 != nil {
pl := GraphSyncFILv1{
PieceCID: cid.Cid(tp.GraphSyncFILv1.PieceCID),
Expand All @@ -88,26 +88,26 @@ func parseProtocol(tp *proto.TransferProtocol) TransferProtocol {
}
plBytes, err := cbor.Marshal(&pl)
if err != nil {
willscott marked this conversation as resolved.
Show resolved Hide resolved
return TransferProtocol{}
return TransferProtocol{}, err
}
return TransferProtocol{
Codec: multicodec.TransportGraphsyncFilecoinv1,
Payload: plBytes,
}
}, nil
}
return TransferProtocol{}
return TransferProtocol{}, nil
}

// ProvideRequest is a message indicating a provider can provide a Key for a given TTL
type ProvideRequest struct {
Key cid.Cid
Provider
*Provider
Timestamp int64
AdvisoryTTL time.Duration
Signature []byte
}

var provideSchema, _ = ipld.LoadSchemaBytes([]byte(`
var provideSchema, provideSchemaErr = ipld.LoadSchemaBytes([]byte(`
willscott marked this conversation as resolved.
Show resolved Hide resolved
type ProvideRequest struct {
Key &Any
Provider Provider
Expand Down Expand Up @@ -147,6 +147,10 @@ func (pr *ProvideRequest) Sign(key crypto.PrivKey) error {
pr.Timestamp = time.Now().Unix()
pr.Signature = []byte{}

if key == nil {
return errors.New("no key provided")
}

sid, err := peer.IDFromPrivateKey(key)
if err != nil {
return err
Expand All @@ -160,6 +164,10 @@ func (pr *ProvideRequest) Sign(key crypto.PrivKey) error {
bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes),
}

if provideSchemaErr != nil {
return provideSchemaErr
}

node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...)
nodeRepr := node.Representation()
outBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -190,6 +198,10 @@ func (pr *ProvideRequest) Verify() error {
bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes),
}

if provideSchemaErr != nil {
return provideSchemaErr
}

node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...)
nodeRepr := node.Representation()
outBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -220,9 +232,13 @@ func (pr *ProvideRequest) IsSigned() bool {
}

func ParseProvideRequest(req *proto.ProvideRequest) (*ProvideRequest, error) {
prov, err := parseProvider(&req.Provider)
if err != nil {
return nil, err
}
pr := ProvideRequest{
Key: cid.Cid(req.Key),
Provider: parseProvider(&req.Provider),
Provider: prov,
AdvisoryTTL: time.Duration(req.AdvisoryTTL),
Timestamp: int64(req.Timestamp),
Signature: req.Signature,
Expand All @@ -234,24 +250,97 @@ func ParseProvideRequest(req *proto.ProvideRequest) (*ProvideRequest, error) {
return &pr, nil
}

func parseProvider(p *proto.Provider) Provider {
func parseProvider(p *proto.Provider) (*Provider, error) {
prov := Provider{
Peer: parseProtoNodeToAddrInfo(p.ProviderNode)[0],
ProviderProto: make([]TransferProtocol, 0),
}
for _, tp := range p.ProviderProto {
prov.ProviderProto = append(prov.ProviderProto, parseProtocol(&tp))
proto, err := parseProtocol(&tp)
if err != nil {
return nil, err
}
prov.ProviderProto = append(prov.ProviderProto, proto)
}
return prov
return &prov, nil
}

type ProvideAsyncResult struct {
AdvisoryTTL time.Duration
Err error
}

// Provide makes a provide request to a delegated router
func (fp *Client) Provide(ctx context.Context, req *ProvideRequest) (<-chan ProvideAsyncResult, error) {
func (fp *Client) Provide(ctx context.Context, key cid.Cid, ttl time.Duration) (time.Duration, error) {
req := ProvideRequest{
Key: key,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
}
if err := req.Sign(fp.identity); err != nil {
return 0, err
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
return 0, err
}

var d time.Duration
var set bool
for resp := range record {
if resp.Err == nil {
set = true
if resp.AdvisoryTTL > d {
d = resp.AdvisoryTTL
}
} else if resp.Err != nil {
err = resp.Err
}
}

if set {
return d, nil
} else if err == nil {
return 0, fmt.Errorf("no response")
}
return 0, err
}

func (fp *Client) ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error) {
req := ProvideRequest{
Key: key,
Provider: fp.provider,
AdvisoryTTL: ttl,
Timestamp: time.Now().Unix(),
}
ch := make(chan time.Duration, 1)

if err := req.Sign(fp.identity); err != nil {
close(ch)
return ch, err
}

record, err := fp.ProvideSignedRecord(ctx, &req)
if err != nil {
close(ch)
return ch, err
}
go func() {
defer close(ch)
for resp := range record {
if resp.Err != nil {
logger.Infof("dropping partial provide failure (%v)", err)
} else {
ch <- resp.AdvisoryTTL
}
}
}()
return ch, nil
}

// ProvideAsync makes a provide request to a delegated router
func (fp *Client) ProvideSignedRecord(ctx context.Context, req *ProvideRequest) (<-chan ProvideAsyncResult, error) {
if !req.IsSigned() {
return nil, errors.New("request is not signed")
}
Expand Down
26 changes: 13 additions & 13 deletions test/provide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-delegated-routing/client"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

Expand All @@ -28,27 +29,26 @@ func TestProvideRoundtrip(t *testing.T) {

testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY)
testCid := cid.NewCidV1(cid.Raw, testMH)
req := client.ProvideRequest{
Key: testCid,
Provider: client.Provider{
Peer: peer.AddrInfo{ID: pID},
},
AdvisoryTTL: time.Hour,
}
if _, err = c.Provide(context.Background(), &req); err == nil {

if _, err = c.Provide(context.Background(), testCid, time.Hour); err == nil {
t.Fatal("should get sync error on unsigned provide request.")
}

if err = req.Sign(priv); err != nil {
if err := c.SetIdentity(&client.Provider{
Peer: peer.AddrInfo{
ID: pID,
Addrs: []multiaddr.Multiaddr{},
},
ProviderProto: []client.TransferProtocol{},
}, priv); err != nil {
t.Fatal(err)
}
rc, err := c.Provide(context.Background(), &req)
rc, err := c.Provide(context.Background(), testCid, 2*time.Hour)
if err != nil {
t.Fatal(err)
}

res := <-rc
if res.Err != nil {
t.Fatal(err)
if rc != time.Hour {
t.Fatal("should have gotten back the the fixed server ttl")
}
}