Skip to content

Commit

Permalink
fix: account for DDO in UI (#1892)
Browse files Browse the repository at this point in the history
* account for DDO in UI

* add DDO to sealing page

* fix flaky tests
  • Loading branch information
LexLuthr authored Mar 21, 2024
1 parent 3575da1 commit 203c05b
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 29 deletions.
18 changes: 10 additions & 8 deletions db/directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package db
import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/filecoin-project/boost/db/fielddef"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/google/uuid"
"github.com/graph-gophers/graphql-go"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -109,13 +108,16 @@ func (d *DirectDealsDB) ByPieceCID(ctx context.Context, pieceCid cid.Cid) ([]*ty
return d.list(ctx, 0, 0, "PieceCID=?", pieceCid.String())
}

func (d *DirectDealsDB) BySectorID(ctx context.Context, sectorID abi.SectorID) ([]*types.DirectDeal, error) {
addr, err := address.NewIDAddress(uint64(sectorID.Miner))
if err != nil {
return nil, fmt.Errorf("creating address from ID %d: %w", sectorID.Miner, err)
}
func (d *DirectDealsDB) BySectorID(ctx context.Context, sectorID abi.SectorNumber) ([]*types.DirectDeal, error) {
return d.list(ctx, 0, 0, "SectorID=?", sectorID)
}

return d.list(ctx, 0, 0, "ProviderAddress=? AND SectorID=?", addr.String(), sectorID.Number)
func (d *DirectDealsDB) ActiveByPieceAllocID(ctx context.Context, piece cid.Cid, alloc verifreg.AllocationId) ([]*types.DirectDeal, error) {
whereArgs := []interface{}{}
whereArgs = append(whereArgs, piece.String())
whereArgs = append(whereArgs, uint64(alloc))
whereArgs = append(whereArgs, dealcheckpoints.Complete.String())
return d.list(ctx, 0, 0, "PieceCID=? AND AllocationID=? AND Checkpoint != ?", whereArgs...)
}

func (d *DirectDealsDB) ListAll(ctx context.Context) ([]*types.DirectDeal, error) {
Expand Down
1 change: 1 addition & 0 deletions gql/resolver_dealpublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type basicDealResolver struct {
PublishCid string
Transfer dealTransfer
Message string
IsDirect bool
}

type dealPublishResolver struct {
Expand Down
56 changes: 55 additions & 1 deletion gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
return nil, err
}

// Get DDO deals by piece CID
ddoDeals, err := r.directDealsDB.ByPieceCID(ctx, pieceCid)
if err != nil {
return nil, err
}

// Convert local index directory deals to graphQL format
var pids []*pieceInfoDeal
for _, dl := range pieceInfo.Deals {
Expand Down Expand Up @@ -320,7 +326,7 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
}

// Convert boost deals to graphQL format
deals := make([]*pieceDealResolver, 0, len(boostDeals)+len(legacyDeals))
deals := make([]*pieceDealResolver, 0, len(boostDeals)+len(legacyDeals)+len(ddoDeals))
for _, dl := range boostDeals {
bd := propToBasicDeal(dl.ClientDealProposal.Proposal)
bd.IsLegacy = false
Expand Down Expand Up @@ -401,6 +407,54 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
})
}

for _, dl := range ddoDeals {
bd := basicDealResolver{
ClientAddress: dl.Client.String(),
ProviderAddress: dl.Provider.String(),
PieceCid: dl.PieceCID.String(),
PieceSize: gqltypes.Uint64(dl.PieceSize),
ProviderCollateral: gqltypes.Uint64(0),
StartEpoch: gqltypes.Uint64(dl.StartEpoch),
EndEpoch: gqltypes.Uint64(dl.EndEpoch),
IsDirect: true,
IsLegacy: false,
ID: graphql.ID(dl.ID.String()),
}
bd.CreatedAt = graphql.Time{Time: dl.CreatedAt}
bd.ClientPeerID = dl.Client.String()
bd.DealDataRoot = ""
bd.PublishCid = ""
bd.Transfer = dealTransfer{
Type: "",
Size: gqltypes.Uint64(dl.Length),
}
bd.Message = dl.Checkpoint.String()

provAddr, err := address.NewFromString(bd.ProviderAddress)
if err != nil {
return nil, fmt.Errorf("parsing actor address %s", bd.ProviderAddress)
}
minerId, err := address.IDFromAddress(provAddr)
if err != nil {
return nil, fmt.Errorf("getting actor id from address %s", bd.ProviderAddress)
}
sector := &sectorResolver{
ID: gqltypes.Uint64(dl.SectorID),
Offset: gqltypes.Uint64(dl.Offset),
Length: gqltypes.Uint64(dl.Length),
}
deals = append(deals, &pieceDealResolver{
Deal: &bd,
Sector: sector,
ss: &sealStatusReporter{
mma: r.mma,
sector: sector,
ssm: r.ssm,
minerID: abi.ActorID(minerId),
},
})
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions gql/resolver_sealingpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package gql

import (
"context"
"errors"
"fmt"

gqltypes "github.com/filecoin-project/boost/gql/types"
smtypes "github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -130,6 +134,7 @@ type waitDeal struct {
ID graphql.ID
Size gqltypes.Uint64
IsLegacy bool
IsDirect bool
}

type waitDealSector struct {
Expand Down Expand Up @@ -187,6 +192,50 @@ func (r *resolver) populateWaitDealsSectors(ctx context.Context, sectorNumbers [

publishCid := p.DealInfo.PublishCid
if publishCid == nil {
// This is likely a direct deal
var directDeals []*smtypes.DirectDeal
if p.DealInfo.PieceActivationManifest.VerifiedAllocationKey != nil {
vkey := p.DealInfo.PieceActivationManifest.VerifiedAllocationKey
activeDeals, err := r.directDealsDB.ActiveByPieceAllocID(ctx, p.DealInfo.PieceCID(), verifreg.AllocationId(vkey.ID))
if err != nil {
return nil, err
}
// Match the client address
for _, deal := range activeDeals {
deal := deal
ac, err := r.fullNode.StateLookupID(ctx, deal.Client, types.EmptyTSK)
if err != nil {
return nil, err
}
w, err := address.IDFromAddress(ac)
if err != nil {
return nil, fmt.Errorf("converting client address to ID: %w", err)
}

wid := abi.ActorID(w)

if wid != vkey.Client {
log.Info("Client mismatch for deal", deal.ID)
continue
}
directDeals = append(directDeals, deal)
}
// We should have only 1 deal left here as import check prevents more than 1 active deal
// with for same client and allocationID
if len(directDeals) > 1 {
return nil, errors.New("more than 1 active deal with same client ID and allocation ID found")
}
if len(directDeals) == 1 {
d := directDeals[0]
deals = append(deals, &waitDeal{
ID: graphql.ID(d.ID.String()),
Size: gqltypes.Uint64(p.DealInfo.PieceActivationManifest.Size),
IsDirect: true,
})
used += uint64(p.DealInfo.PieceActivationManifest.Size)
continue
}
}
continue
}

Expand Down
2 changes: 2 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type DealBasic {
PublishCid: String!
Transfer: TransferParams!
Message: String!
IsDirect: Boolean!
}

type DealList {
Expand Down Expand Up @@ -310,6 +311,7 @@ type WaitDeal {
ID: ID!
Size: Uint64!
IsLegacy: Boolean!
IsDirect: Boolean!
}

type WaitDealsSector {
Expand Down
2 changes: 1 addition & 1 deletion itests/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func FullNodeAndMiner(t *testing.T, ensemble *kit.Ensemble) (*kit.TestFullNode,
ensemble.FullNode(&fullNode, fnOpts...).Miner(&miner, &fullNode, minerOpts...)
if defaultEnsemble {
ensemble.Start()
blockTime := 20 * time.Millisecond
blockTime := 100 * time.Millisecond
ensemble.BeginMining(blockTime)
}

Expand Down
13 changes: 11 additions & 2 deletions react/src/Components.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@ export function PageContainer(props) {
}

export function ShortDealLink(props) {
const linkBase = props.isLegacy ? "/legacy-deals/" : "/deals/"
let linkBase = "/deals/"
if (props.isLegacy){
linkBase = "/legacy-deals/"
return <Link to={linkBase + props.id}>
<ShortDealID id={props.id} />
</Link>
}
if (props.isDirect){
linkBase = "/direct-deals/"
}
return <Link to={linkBase + props.id}>
{props.id}
<ShortDealID id={props.id} />
</Link>
}

Expand Down
2 changes: 1 addition & 1 deletion react/src/DealPublish.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function DealsTable(props) {
<tr key={deal.ID}>
<td>{moment(deal.CreatedAt).fromNow()}</td>
<td className="deal-id">
<ShortDealLink id={deal.ID} isLegacy={deal.IsLegacy} />
<ShortDealLink id={deal.ID} isLegacy={deal.IsLegacy} isDirect={deal.IsDirect} />
</td>
<td className="size">{humanFileSize(deal.Transfer.Size)}</td>
<td className="piece-size">{humanFileSize(deal.PieceSize)}</td>
Expand Down
12 changes: 7 additions & 5 deletions react/src/LID.js
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ function PieceStatus({pieceCid, pieceStatus, searchQuery}) {
<th>CreatedAt</th>
<th>Deal ID</th>
<th>Legacy Deal</th>
<th>Direct Deal</th>
<th>Sector Number</th>
<th>Piece Offset</th>
<th>Piece Length</th>
Expand All @@ -688,12 +689,13 @@ function PieceStatus({pieceCid, pieceStatus, searchQuery}) {
{pieceStatus.Deals.map(deal => (
<tr key={deal.Deal.ID}>
<td>{moment(deal.Deal.CreatedAt).format(dateFormat)}</td>
<td><ShortDealLink id={deal.Deal.ID} isLegacy={deal.Deal.IsLegacy} /></td>
<td><ShortDealLink id={deal.Deal.ID} isLegacy={deal.Deal.IsLegacy} isDirect={deal.Deal.IsDirect}/></td>
<td>{deal.Deal.IsLegacy ? 'Yes' : 'No'}</td>
<td>{deal.Sector.ID+''}</td>
<td>{deal.Sector.Offset+''}</td>
<td>{deal.Sector.Length+''}</td>
<td><SealStatus status={deal.SealStatus} /></td>
<td>{deal.Deal.IsDirect ? 'Yes' : 'No'}</td>
<td>{deal.Sector.ID + ''}</td>
<td>{deal.Sector.Offset + ''}</td>
<td>{deal.Sector.Length + ''}</td>
<td><SealStatus status={deal.SealStatus}/></td>
</tr>
))}
</tbody>
Expand Down
8 changes: 1 addition & 7 deletions react/src/SealingPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ function WaitDealsSizes(props) {
{props.deals.map(deal => (
<tr key={deal.ID}>
<td className="deal-id">
{deal.IsLegacy ? (
<Link to={"/legacy-deals/" + deal.ID}>
<div className="short-deal-id">{deal.ID.substring(0, 12) + '…'}</div>
</Link>
) : (
<ShortDealLink id={deal.ID} />
)}
<ShortDealLink id={deal.ID} isLegacy={deal.IsLegacy} isDirect={deal.IsDirect}/>
</td>
<td className="deal-size">{humanFileSize(deal.Size)}</td>
</tr>
Expand Down
4 changes: 4 additions & 0 deletions react/src/gql.js
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ const PieceStatusQuery = gql`
IsLegacy
CreatedAt
DealDataRoot
IsDirect
}
Sector {
ID
Expand Down Expand Up @@ -658,6 +659,7 @@ const SealingPipelineQuery = gql`
ID
Size
IsLegacy
IsDirect
}
}
SnapDealsWaitDealsSectors {
Expand All @@ -668,6 +670,7 @@ const SealingPipelineQuery = gql`
ID
Size
IsLegacy
IsDirect
}
}
SectorStates {
Expand Down Expand Up @@ -791,6 +794,7 @@ const DealPublishQuery = gql`
}
ClientAddress
PieceSize
IsDirect
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions storagemarket/direct_deals_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,21 @@ func (ddp *DirectDealsProvider) Accept(ctx context.Context, entry *types.DirectD

log.Infow("found allocation for client", "allocation", spew.Sdump(allocation))

// TODO: validate the deal proposal and check for deal acceptance (allocation id, term, start epoch, end epoch, etc.)
// TermMin ; TermMax
allActive, err := ddp.directDealsDB.ListActive(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get active deals: %w", err)
}

for _, deal := range allActive {
// We should only process 1 deal for each allocation at one time to avoid trying to claim
// same allocation for 2 sectors at once
if entry.Client == deal.Client && entry.AllocationID == deal.AllocationID {
return &api.ProviderDealRejectionInfo{
Accepted: false,
Reason: fmt.Sprintf("another deal for client %s and allocation %d is already in process", entry.Client, entry.AllocationID),
}, nil
}
}

return &api.ProviderDealRejectionInfo{
Accepted: true,
Expand Down
5 changes: 3 additions & 2 deletions storagemarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -645,11 +646,11 @@ func (p *Provider) AddPieceToSector(ctx context.Context, deal smtypes.ProviderDe
return nil, fmt.Errorf("deal.PublishCid can't be nil")
}

sdInfo := lapi.PieceDealInfo{
sdInfo := piece.PieceDealInfo{
DealID: deal.ChainDealID,
DealProposal: &deal.ClientDealProposal.Proposal,
PublishCid: deal.PublishCID,
DealSchedule: lapi.DealSchedule{
DealSchedule: piece.DealSchedule{
StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch,
},
Expand Down

0 comments on commit 203c05b

Please sign in to comment.