Skip to content

Commit

Permalink
removed combined blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jul 3, 2021
1 parent 40b30d1 commit 303d5d6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 46 deletions.
88 changes: 42 additions & 46 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ type API struct {

Imports dtypes.ClientImportMgr

CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
DataTransfer dtypes.ClientDataTransfer
Host host.Host
DataTransfer dtypes.ClientDataTransfer
Host host.Host

// TODO How do we inject the Repo Path here ?
}
Expand Down Expand Up @@ -125,28 +124,14 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt
return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
}
} else if params.Data.TransferType == storagemarket.TTGraphsync {
importIDs, err := a.imgr().List()
c, err := a.imgr().CARV2FilePathFor(params.Data.Root)
if err != nil {
return nil, xerrors.Errorf("failed to fetch import IDs: %w", err)
return nil, xerrors.Errorf("failed to find CARv2 file path: %w", err)
}

for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(params.Data.Root) {
CARV2FilePath = info.Labels[importmgr.LCARv2FilePath]
break
}
if c == "" {
return nil, xerrors.New("no CARv2 file path for deal")
}
CARV2FilePath = c
}

walletKey, err := a.StateAccountKey(ctx, params.Wallet, types.EmptyTSK)
Expand Down Expand Up @@ -413,29 +398,14 @@ func (a *API) newDealInfoWithTransfer(transferCh *api.DataTransferChannel, v sto

func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
importIDs, err := a.imgr().List()
carv2Path, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return false, xerrors.Errorf("failed to list imports: %w", err)
return false, err
}

for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
continue
}
if info.Labels[importmgr.LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[importmgr.LRootCid])
if err != nil {
continue
}
if c.Equals(root) {
return true, nil
}
if len(carv2Path) == 0 {
return false, nil
}

return false, nil
return true, nil
}

func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
Expand Down Expand Up @@ -1006,11 +976,24 @@ func (w *lenWriter) Write(p []byte) (n int, err error) {
}

func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataSize{}, xerrors.New("no CARv2 file for root")
}

rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck

dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
w := lenWriter(0)

err := car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
if err != nil {
return api.DataSize{}, err
}
Expand All @@ -1024,12 +1007,25 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e
}

func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
carv2FilePath, err := a.imgr().CARV2FilePathFor(root)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(carv2FilePath) == 0 {
return api.DataCIDSize{}, xerrors.New("no CARv2 file for root")
}

rdOnly, err := blockstore.OpenReadOnly(carv2FilePath, false)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
}
defer rdOnly.Close() //nolint:errcheck

dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
w := &writer.Writer{}
bw := bufio.NewWriterSize(w, int(writer.CommPBuf))

err := car.WriteCar(ctx, dag, []cid.Cid{root}, w)
err = car.WriteCar(ctx, dag, []cid.Cid{root}, w)
if err != nil {
return api.DataCIDSize{}, err
}
Expand Down
27 changes: 27 additions & 0 deletions node/repo/importmgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/filecoin-project/go-fil-markets/shared"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -123,6 +124,32 @@ func (m *Mgr) Remove(id uint64) error {
return nil
}

func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) {
importIDs, err := m.List()
if err != nil {
return "", xerrors.Errorf("failed to fetch import IDs: %w", err)
}

for _, importID := range importIDs {
info, err := m.Info(importID)
if err != nil {
continue
}
if info.Labels[LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[LRootCid])
if err != nil {
continue
}
if c.Equals(dagRoot) {
return info.Labels[LCARv2FilePath], nil
}
}

return "", nil
}

func (m *Mgr) NewTempFile(id uint64) (string, error) {
file, err := ioutil.TempFile(m.repoPath, fmt.Sprintf("%d", id))
if err != nil {
Expand Down

0 comments on commit 303d5d6

Please sign in to comment.