Skip to content

Commit

Permalink
Merge pull request #5505 from filecoin-project/fix/dont-publish-expir…
Browse files Browse the repository at this point in the history
…ed-deals

Dont publish expired deals
  • Loading branch information
magik6k authored Feb 2, 2021
2 parents 987f410 + 3ced11c commit 20e92da
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 38 deletions.
59 changes: 47 additions & 12 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

type dealPublisherAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
}
Expand Down Expand Up @@ -223,32 +224,66 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
return
}

// onComplete is called when the publish message has been sent or there
// was an error
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
// Send the publish result on the pending deal's Result channel
res := publishResult{
msgCid: msgCid,
err: err,
}
select {
case <-p.ctx.Done():
case <-pd.ctx.Done():
case pd.Result <- res:
}
}

// Validate each deal to make sure it can be published
validated := make([]*pendingDeal, 0, len(ready))
deals := make([]market2.ClientDealProposal, 0, len(ready))
for _, pd := range ready {
// Validate the deal
if err := p.validateDeal(pd.deal); err != nil {
// Validation failed, complete immediately with an error
go onComplete(pd, cid.Undef, err)
continue
}

validated = append(validated, pd)
deals = append(deals, pd.deal)
}

// Send the publish message
msgCid, err := p.publishDealProposals(deals)

// Signal that each deal has been published
for _, pd := range ready {
pd := pd
go func() {
res := publishResult{
msgCid: msgCid,
err: err,
}
select {
case <-p.ctx.Done():
case pd.Result <- res:
}
}()
for _, pd := range validated {
go onComplete(pd, msgCid, err)
}
}

// validateDeal checks that the deal proposal start epoch hasn't already
// elapsed
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
head, err := p.api.ChainHead(p.ctx)
if err != nil {
return err
}
if head.Height() > deal.Proposal.StartEpoch {
return xerrors.Errorf(
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
}
return nil
}

// Sends the publish message
func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) {
if len(deals) == 0 {
return cid.Undef, nil
}

log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))

provider := deals[0].Proposal.Provider
Expand Down
84 changes: 58 additions & 26 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (

func TestDealPublisher(t *testing.T) {
testCases := []struct {
name string
publishPeriod time.Duration
maxDealsPerMsg uint64
dealCountWithinPublishPeriod int
expiredWithinPublishPeriod int
dealCountAfterPublishPeriod int
expectedDealsPerMsg []int
name string
publishPeriod time.Duration
maxDealsPerMsg uint64
dealCountWithinPublishPeriod int
ctxCancelledWithinPublishPeriod int
expiredDeals int
dealCountAfterPublishPeriod int
expectedDealsPerMsg []int
}{{
name: "publish one deal within publish period",
publishPeriod: 10 * time.Millisecond,
Expand Down Expand Up @@ -61,22 +62,30 @@ func TestDealPublisher(t *testing.T) {
dealCountWithinPublishPeriod: 3,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1, 1},
}, {
name: "ignore deals with cancelled context",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "ignore expired deals",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
expiredWithinPublishPeriod: 2,
expiredDeals: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "zero config",
publishPeriod: 0,
maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2,
expiredWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1},
name: "zero config",
publishPeriod: 0,
maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1},
}}

for _, tc := range testCases {
Expand All @@ -96,31 +105,37 @@ func TestDealPublisher(t *testing.T) {

// Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal
publishDeal := func(expired bool) {
publishDeal := func(ctxCancelled bool, expired bool) {
pctx := ctx
var cancel context.CancelFunc
if expired {
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}

startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
if !expired {
if !ctxCancelled && !expired {
dealsToPublish = append(dealsToPublish, deal)
}
go func() {
_, err := dp.Publish(pctx, deal)
if expired {
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
Expand All @@ -130,18 +145,21 @@ func TestDealPublisher(t *testing.T) {

// Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false)
publishDeal(false, false)
}
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true, false)
}
for i := 0; i < tc.expiredWithinPublishPeriod; i++ {
publishDeal(true)
for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true)
}

// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)

// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false)
publishDeal(false, false)
}

// For each message that was expected to be sent
Expand Down Expand Up @@ -223,6 +241,20 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI {
}
}

func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
dummyCid, err := cid.Parse("bafkqaaa")
require.NoError(d.t, err)
return types.NewTipSet([]*types.BlockHeader{{
Miner: tutils.NewActorAddr(d.t, "miner"),
Height: abi.ChainEpoch(10),
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
}

func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) {
d.stateMinerInfoCalls <- address
return miner.MinerInfo{Worker: d.worker}, nil
Expand Down

0 comments on commit 20e92da

Please sign in to comment.