From e908f53eae9fa960ead7aa1cc6facebf01a90896 Mon Sep 17 00:00:00 2001 From: zl Date: Tue, 8 Mar 2022 10:19:34 +0800 Subject: [PATCH] add piece idempotent (#196) --- storage-sealing/input.go | 56 +++++++++++++++++++++++++------------- storage-sealing/sealing.go | 21 +++++++++++++- 2 files changed, 57 insertions(+), 20 deletions(-) mode change 100644 => 100755 storage-sealing/input.go mode change 100644 => 100755 storage-sealing/sealing.go diff --git a/storage-sealing/input.go b/storage-sealing/input.go old mode 100644 new mode 100755 index 94745c68..3c24e140 --- a/storage-sealing/input.go +++ b/storage-sealing/input.go @@ -296,41 +296,59 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec } m.inputLk.Lock() - if _, exist := m.pendingPieces[proposalCID(deal)]; exist { + if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) + + // we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful + for { + res, err := pp.waitAddPieceResp(ctx) + if err != nil { + return api.SectorOffset{}, err + } + // there was an error waiting for a pre-existing add piece call, let's retry + if res.err != nil { + m.inputLk.Lock() + pp = m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() + continue + } + // all good, return the response + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err + } } - resCh := make(chan struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }, 1) + pp := m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() - m.pendingPieces[proposalCID(deal)] = &pendingPiece{ + res, err := pp.waitAddPieceResp(ctx) + if err != nil { + return api.SectorOffset{}, err + } + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err +} + +func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal types.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { + doneCh := make(chan struct{}) + pp := &pendingPiece{ + doneCh: doneCh, size: size, deal: deal, data: data, assigned: false, - accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { - resCh <- struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }{sn: sn, offset: offset, err: err} - }, + } + pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { + pp.resp = &pieceAcceptResp{sn, offset, err} + close(pp.doneCh) } + m.pendingPieces[proposalCID(deal)] = pp go func() { - defer m.inputLk.Unlock() if err := m.updateInput(ctx, sp); err != nil { log.Errorf("%+v", err) } }() - res := <-resCh - - return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err + return pp } func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error { diff --git a/storage-sealing/sealing.go b/storage-sealing/sealing.go old mode 100644 new mode 100755 index 66959692..df6e8c3f --- a/storage-sealing/sealing.go +++ b/storage-sealing/sealing.go @@ -78,7 +78,7 @@ type SealingAPI interface { MessagerSendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (string, error) //for market - GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read + GetUnPackedDeals(ctx context.Context, miner address.Address, spec *market2.GetDealSpec) ([]*market2.DealInfoIncludePath, error) //perm:read MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error //perm:write UpdateDealOnPacking(ctx context.Context, miner address.Address, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error //perm:write } @@ -149,7 +149,16 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi return expiration >= dealEnd, nil } +type pieceAcceptResp struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error +} + type pendingPiece struct { + doneCh chan struct{} + resp *pieceAcceptResp + size abi.UnpaddedPieceSize deal types2.PieceDealInfo @@ -159,6 +168,16 @@ type pendingPiece struct { accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } +func (pp *pendingPiece) waitAddPieceResp(ctx context.Context) (*pieceAcceptResp, error) { + select { + case <-pp.doneCh: + res := pp.resp + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, metaDataService *service.MetadataService, sectorInfoService *service.SectorInfoService, logService *service.LogService, sealer sectorstorage.SectorManager, sc types2.SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc types2.GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel, networkParams *config.NetParamsConfig, pieceStorage piecestorage.IPieceStorage) *Sealing { s := &Sealing{ api: api,