Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
add piece idempotent (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
zl03jsj authored Mar 8, 2022
1 parent 11e31f0 commit e908f53
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
56 changes: 37 additions & 19 deletions storage-sealing/input.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -296,41 +296,59 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
}

m.inputLk.Lock()
if _, exist := m.pendingPieces[proposalCID(deal)]; exist {
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock()
return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal))

// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := pp.waitAddPieceResp(ctx)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
}

resCh := make(chan struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()

m.pendingPieces[proposalCID(deal)] = &pendingPiece{
res, err := pp.waitAddPieceResp(ctx)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}

func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal types.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
doneCh: doneCh,
size: size,
deal: deal,
data: data,
assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}{sn: sn, offset: offset, err: err}
},
}
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
pp.resp = &pieceAcceptResp{sn, offset, err}
close(pp.doneCh)
}

m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()

res := <-resCh

return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
return pp
}

func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
Expand Down
21 changes: 20 additions & 1 deletion storage-sealing/sealing.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type SealingAPI interface {
MessagerSendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (string, error)

//for market
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read
MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error //perm:write
UpdateDealOnPacking(ctx context.Context, miner address.Address, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error //perm:write
}
Expand Down Expand Up @@ -149,7 +149,16 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi
return expiration >= dealEnd, nil
}

type pieceAcceptResp struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}

type pendingPiece struct {
doneCh chan struct{}
resp *pieceAcceptResp

size abi.UnpaddedPieceSize
deal types2.PieceDealInfo

Expand All @@ -159,6 +168,16 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
}

func (pp *pendingPiece) waitAddPieceResp(ctx context.Context) (*pieceAcceptResp, error) {
select {
case <-pp.doneCh:
res := pp.resp
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, metaDataService *service.MetadataService, sectorInfoService *service.SectorInfoService, logService *service.LogService, sealer sectorstorage.SectorManager, sc types2.SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc types2.GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel, networkParams *config.NetParamsConfig, pieceStorage piecestorage.IPieceStorage) *Sealing {
s := &Sealing{
api: api,
Expand Down

0 comments on commit e908f53

Please sign in to comment.