Skip to content

Commit

Permalink
Merge pull request #62 from filecoin-project/masih/find_candidates_async
Browse files Browse the repository at this point in the history
Implement asyc find candidate over IPNI `ndjson` APIs
  • Loading branch information
hannahhoward authored Feb 7, 2023
2 parents 8ef9302 + bc6dd19 commit cb1eb2f
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 48 deletions.
19 changes: 19 additions & 0 deletions cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ type explicitCandidateFinder struct {
provider peer.AddrInfo
}

func (e explicitCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid) (<-chan types.FindCandidatesResult, error) {
rs, err := e.FindCandidates(ctx, c)
if err != nil {
return nil, err
}
switch len(rs) {
case 0:
return nil, nil
default:
rch := make(chan types.FindCandidatesResult, len(rs))
for _, r := range rs {
rch <- types.FindCandidatesResult{
Candidate: r,
}
}
return rch, nil
}
}

func (e explicitCandidateFinder) FindCandidates(_ context.Context, c cid.Cid) ([]types.RetrievalCandidate, error) {
return []types.RetrievalCandidate{
{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ require (
github.com/ipni/storetheindex v0.5.4
github.com/libp2p/go-libp2p v0.23.2
github.com/multiformats/go-multiaddr v0.7.0
github.com/multiformats/go-multicodec v0.6.0
github.com/multiformats/go-multihash v0.2.1
github.com/prometheus/client_golang v1.14.0
github.com/rvagg/go-prioritywaitqueue v1.0.3
github.com/stretchr/testify v1.8.1
Expand Down Expand Up @@ -129,8 +131,6 @@ require (
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multicodec v0.6.0 // indirect
github.com/multiformats/go-multihash v0.2.1 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
Expand Down
10 changes: 8 additions & 2 deletions pkg/eventrecorder/eventrecorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"golang.org/x/net/context"
)

var testCid1 cid.Cid = mustCid("bafybeihrqe2hmfauph5yfbd6ucv7njqpiy4tvbewlvhzjl4bhnyiu6h7pm")
var testCid1 = mustCid("bafybeihrqe2hmfauph5yfbd6ucv7njqpiy4tvbewlvhzjl4bhnyiu6h7pm")

func TestEventRecorder(t *testing.T) {
var req datamodel.Node
Expand Down Expand Up @@ -63,6 +63,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}

Expand Down Expand Up @@ -102,6 +103,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}

Expand Down Expand Up @@ -135,6 +137,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}
qt.Assert(t, req.Length(), qt.Equals, int64(1))
Expand Down Expand Up @@ -165,6 +168,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}

Expand Down Expand Up @@ -196,6 +200,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}

Expand All @@ -222,6 +227,7 @@ func TestEventRecorder(t *testing.T) {

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-receivedChan:
}

Expand All @@ -245,7 +251,7 @@ func TestEventRecorder(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
er := eventrecorder.NewEventRecorder(ctx, "test-instance", fmt.Sprintf("%s/test-path/here", ts.URL), authHeaderValue)
id, err := types.NewRetrievalID()
Expand Down
177 changes: 137 additions & 40 deletions pkg/indexerlookup/candidatefinder.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,187 @@
package indexerlookup

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"time"
"path"

"github.com/filecoin-project/index-provider/metadata"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-log/v2"
"github.com/ipni/storetheindex/api/v0/finder/model"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)

var (
_ retriever.CandidateFinder = (*IndexerCandidateFinder)(nil)

logger = log.Logger("indexerlookup")
)

type IndexerCandidateFinder struct {
c *http.Client
baseUrl string
*options
}

func NewCandidateFinder(url string) *IndexerCandidateFinder {
return &IndexerCandidateFinder{
c: &http.Client{
Timeout: time.Minute,
},
baseUrl: url,
func NewCandidateFinder(o ...Option) (*IndexerCandidateFinder, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
}
return &IndexerCandidateFinder{
options: opts,
}, nil
}

func (idxf *IndexerCandidateFinder) sendRequest(req *http.Request) (*model.FindResponse, error) {
req.Header.Set("Content-Type", "application/json")
resp, err := idxf.c.Do(req)
func (idxf *IndexerCandidateFinder) sendJsonRequest(req *http.Request) (*model.FindResponse, error) {
req.Header.Set("Accept", "application/json")
resp, err := idxf.httpClient.Do(req)
if err != nil {
logger.Debugw("Failed to perform json lookup", "err", err)
return nil, err
}
// Handle failed requests
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return &model.FindResponse{}, nil
switch resp.StatusCode {
case http.StatusOK:
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Debugw("Failed to read response JSON response body", "err", err)
return nil, err
}
return nil, fmt.Errorf("batch find query failed: %v", http.StatusText(resp.StatusCode))
return model.UnmarshalFindResponse(b)
case http.StatusNotFound:
return &model.FindResponse{}, nil
default:
return nil, fmt.Errorf("batch find query failed: %s", http.StatusText(resp.StatusCode))
}

defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return model.UnmarshalFindResponse(b)
}

func (idxf *IndexerCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
u := fmt.Sprint(idxf.baseUrl, "/multihash/", cid.Hash().B58String())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
req, err := idxf.newFindHttpRequest(ctx, cid)
if err != nil {
return nil, err
}

parsedResp, err := idxf.sendRequest(req)
parsedResp, err := idxf.sendJsonRequest(req)
if err != nil {
return nil, err
}
hash := string(cid.Hash())
// turn parsedResp into records.
var matches []types.RetrievalCandidate

indices := rand.Perm(len(parsedResp.MultihashResults))
for _, i := range indices {
multihashResult := parsedResp.MultihashResults[i]

if !(string(multihashResult.Multihash) == hash) {
if !bytes.Equal(cid.Hash(), multihashResult.Multihash) {
continue
}
for _, val := range multihashResult.ProviderResults {
// filter out any results that aren't filecoin graphsync
var dtm metadata.GraphsyncFilecoinV1
if err := dtm.UnmarshalBinary(val.Metadata); err != nil {
continue
if hasTransportGraphsyncFilecoinv1(val) {
matches = append(matches, types.RetrievalCandidate{
RootCid: cid,
MinerPeer: val.Provider,
})
}

matches = append(matches, types.RetrievalCandidate{
RootCid: cid,
MinerPeer: val.Provider,
})
}
}
return matches, nil
}

func hasTransportGraphsyncFilecoinv1(pr model.ProviderResult) bool {
if len(pr.Metadata) == 0 {
return false
}
// Metadata may contain more than one protocol, sorted by ascending order of their protocol ID.
// Therefore, decode the metadata as metadata.Metadata, then check if it supports Graphsync.
// See: https://github.com/ipni/specs/blob/main/IPNI.md#metadata
dtm := metadata.Default.New()
if err := dtm.UnmarshalBinary(pr.Metadata); err != nil {
logger.Debugw("Failed to unmarshal metadata", "err", err)
return false
}
return dtm.Get(multicodec.TransportGraphsyncFilecoinv1) != nil
}

func (idxf *IndexerCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid) (<-chan types.FindCandidatesResult, error) {
req, err := idxf.newFindHttpRequest(ctx, c)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/x-ndjson")
resp, err := idxf.httpClient.Do(req)
if err != nil {
logger.Debugw("Failed to perform streaming lookup", "err", err)
return nil, err
}
switch resp.StatusCode {
case http.StatusOK:
defer resp.Body.Close()
return idxf.decodeProviderResultStream(ctx, c, resp.Body)
case http.StatusNotFound:
return nil, nil
default:
return nil, fmt.Errorf("batch find query failed: %v", http.StatusText(resp.StatusCode))
}
}

func (idxf *IndexerCandidateFinder) newFindHttpRequest(ctx context.Context, c cid.Cid) (*http.Request, error) {
endpoint := idxf.findByMultihashEndpoint(c.Hash())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
if idxf.httpUserAgent != "" {
req.Header.Set("User-Agent", idxf.httpUserAgent)
}
return req, nil
}

func (idxf *IndexerCandidateFinder) decodeProviderResultStream(ctx context.Context, c cid.Cid, from io.Reader) (<-chan types.FindCandidatesResult, error) {
rch := make(chan types.FindCandidatesResult, idxf.asyncResultsChanBuffer)
go func() {
defer close(rch)
scanner := bufio.NewScanner(from)
for {
var r types.FindCandidatesResult
select {
case <-ctx.Done():
r.Err = ctx.Err()
rch <- r
return
default:
if scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var pr model.ProviderResult
if r.Err = json.Unmarshal(line, &pr); r.Err != nil {
rch <- r
return
}
r.Candidate.MinerPeer = pr.Provider
r.Candidate.RootCid = c
rch <- r
} else if r.Err = scanner.Err(); r.Err != nil {
rch <- r
return
}
}
}
}()
return rch, nil
}

func (idxf *IndexerCandidateFinder) findByMultihashEndpoint(mh multihash.Multihash) string {
// TODO: Replace with URL.JoinPath once minimum go version in CI is updated to 1.19; like this:
// return idxf.httpEndpoint.JoinPath("multihash", mh.B58String()).String()
return idxf.httpEndpoint.String() + path.Join("/multihash", mh.B58String())
}
Loading

0 comments on commit cb1eb2f

Please sign in to comment.