Skip to content

Commit

Permalink
netbs: Add an integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 8, 2022
1 parent 53e43a4 commit 888f97a
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,11 @@ workflows:
suite: itest-deals_publish
target: "./itests/deals_publish_test.go"

- test:
name: test-itest-deals_remote_retrieval
suite: itest-deals_remote_retrieval
target: "./itests/deals_remote_retrieval_test.go"

- test:
name: test-itest-deals_retry_deal_no_funds
suite: itest-deals_retry_deal_no_funds
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.5.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -214,7 +215,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
Expand Down
104 changes: 104 additions & 0 deletions itests/deals_remote_retrieval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package itests

import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"path"
"testing"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/ipld/go-car"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/itests/kit"
)

func TestNetStoreRetrieval(t *testing.T) {
kit.QuietMiningLogs()

blocktime := 5 * time.Millisecond
ctx := context.Background()

full, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blocktime)

time.Sleep(5 * time.Second)

// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)

rseed := 7

dh := kit.NewDealHarness(t, full, miner, miner)
dealCid, res, _ := dh.MakeOnlineDeal(context.Background(), kit.MakeFullDealParams{
Rseed: rseed,
StartEpoch: dealStartEpoch,
UseCARFileForStorageDeal: true,
})

// create deal store
id := uuid.New()
rstore := bstore.NewMemorySync()

au, err := url.Parse(full.ListenURL)
require.NoError(t, err)

switch au.Scheme {
case "http":
au.Scheme = "ws"
case "https":
au.Scheme = "wss"
}

au.Path = path.Join(au.Path, "/rest/v0/store/"+id.String())

conn, _, err := websocket.DefaultDialer.Dial(au.String(), nil)
require.NoError(t, err)

_ = bstore.HandleNetBstoreWS(ctx, rstore, conn)

dh.PerformRetrievalWithOrder(ctx, dealCid, res.Root, false, func(offer api.QueryOffer, address address.Address) api.RetrievalOrder {
order := offer.Order(address)

order.RemoteStore = &id

return order
})

// check blockstore blocks
carv1FilePath, _ := kit.CreateRandomCARv1(t, rseed, 200)
cb, err := os.ReadFile(carv1FilePath)
require.NoError(t, err)

cr, err := car.NewCarReader(bytes.NewReader(cb))
require.NoError(t, err)

var blocks int
for {
cb, err := cr.Next()
if err == io.EOF {
fmt.Println("blocks: ", blocks)
return
}
require.NoError(t, err)

sb, err := rstore.Get(ctx, cb.Cid())
require.NoError(t, err)
require.EqualValues(t, cb.RawData(), sb.RawData())

blocks++
}
}
16 changes: 15 additions & 1 deletion itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand Down Expand Up @@ -308,6 +309,12 @@ func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
}

func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, offers ...api.QueryOffer) (path string) {
return dh.PerformRetrievalWithOrder(ctx, deal, root, carExport, func(offer api.QueryOffer, a address.Address) api.RetrievalOrder {
return offer.Order(a)
}, offers...)
}

func (dh *DealHarness) PerformRetrievalWithOrder(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, makeOrder func(api.QueryOffer, address.Address) api.RetrievalOrder, offers ...api.QueryOffer) (path string) {
var offer api.QueryOffer
if len(offers) == 0 {
// perform retrieval.
Expand All @@ -331,7 +338,9 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root
updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx)
require.NoError(dh.t, err)

retrievalRes, err := dh.client.ClientRetrieve(ctx, offer.Order(caddr))
order := makeOrder(offer, caddr)

retrievalRes, err := dh.client.ClientRetrieve(ctx, order)
require.NoError(dh.t, err)
consumeEvents:
for {
Expand All @@ -357,6 +366,11 @@ consumeEvents:
}
cancel()

if order.RemoteStore != nil {
// if we're retrieving into a remote store, skip export
return ""
}

require.NoError(dh.t, dh.client.ClientExport(ctx,
api.ExportRef{
Root: root,
Expand Down
1 change: 1 addition & 0 deletions itests/kit/node_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TestFullNode struct {
// ListenAddr is the address on which an API server is listening, if an
// API server is created for this Node.
ListenAddr multiaddr.Multiaddr
ListenURL string
DefaultKey *key.Key

options nodeOpts
Expand Down
2 changes: 1 addition & 1 deletion itests/kit/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
t.Cleanup(stop)
f.ListenAddr, f.FullNode = maddr, cl
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl

return f
}
Expand Down

0 comments on commit 888f97a

Please sign in to comment.