Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

fix: seal sector: add piece idempotent #196

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions storage-sealing/input.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -296,41 +296,59 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
}

m.inputLk.Lock()
if _, exist := m.pendingPieces[proposalCID(deal)]; exist {
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock()
return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal))

// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := pp.waitAddPieceResp(ctx)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
}

resCh := make(chan struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()

m.pendingPieces[proposalCID(deal)] = &pendingPiece{
res, err := pp.waitAddPieceResp(ctx)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}

func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal types.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
doneCh: doneCh,
size: size,
deal: deal,
data: data,
assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}{sn: sn, offset: offset, err: err}
},
}
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
pp.resp = &pieceAcceptResp{sn, offset, err}
close(pp.doneCh)
}

m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()

res := <-resCh

return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
return pp
}

func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
Expand Down
21 changes: 20 additions & 1 deletion storage-sealing/sealing.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type SealingAPI interface {
MessagerSendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (string, error)

//for market
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read
MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error //perm:write
UpdateDealOnPacking(ctx context.Context, miner address.Address, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error //perm:write
}
Expand Down Expand Up @@ -149,7 +149,16 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi
return expiration >= dealEnd, nil
}

type pieceAcceptResp struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}

type pendingPiece struct {
doneCh chan struct{}
resp *pieceAcceptResp

size abi.UnpaddedPieceSize
deal types2.PieceDealInfo

Expand All @@ -159,6 +168,16 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
}

func (pp *pendingPiece) waitAddPieceResp(ctx context.Context) (*pieceAcceptResp, error) {
select {
case <-pp.doneCh:
res := pp.resp
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, metaDataService *service.MetadataService, sectorInfoService *service.SectorInfoService, logService *service.LogService, sealer sectorstorage.SectorManager, sc types2.SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc types2.GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel, networkParams *config.NetParamsConfig, pieceStorage piecestorage.IPieceStorage) *Sealing {
s := &Sealing{
api: api,
Expand Down