Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix flaky TestDealPublisher and re-enable #6991

Merged
merged 1 commit into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
Expand Down Expand Up @@ -203,9 +204,9 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)

// If the maximum number of deals per message has been reached,
// send a publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not strictly necessary but makes testing a bit easier (I think?)

log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
p.publishAllDeals()
return
Expand All @@ -218,7 +219,7 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
func (p *DealPublisher) waitForMoreDeals() {
// Check if we're already waiting for deals
if !p.publishPeriodStart.IsZero() {
elapsed := time.Since(p.publishPeriodStart)
elapsed := build.Clock.Since(p.publishPeriodStart)
log.Infof("%s elapsed of / %s until publish deals queue is published",
elapsed, p.publishPeriod)
return
Expand All @@ -227,11 +228,11 @@ func (p *DealPublisher) waitForMoreDeals() {
// Set a timeout to wait for more deals to arrive
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
ctx, cancel := context.WithCancel(p.ctx)
p.publishPeriodStart = time.Now()
p.publishPeriodStart = build.Clock.Now()
p.cancelWaitForMoreDeals = cancel

go func() {
timer := time.NewTimer(p.publishPeriod)
timer := build.Clock.Timer(p.publishPeriod)
select {
case <-ctx.Done():
timer.Stop()
Expand Down
54 changes: 49 additions & 5 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"

"github.com/stretchr/testify/require"

tutils "github.com/filecoin-project/specs-actors/v2/support/testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -25,7 +27,11 @@ import (
)

func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
oldClock := build.Clock
t.Cleanup(func() { build.Clock = oldClock })
mc := clock.NewMock()
build.Clock = mc

testCases := []struct {
name string
publishPeriod time.Duration
Expand Down Expand Up @@ -92,6 +98,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc.Set(time.Now())
dpapi := newDPAPI(t)

// Create a deal publisher
Expand All @@ -116,14 +123,51 @@ func TestDealPublisher(t *testing.T) {
}

// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)
if tc.publishPeriod > 0 {
// If we expect deals to get stuck in the queue, wait until that happens
if tc.maxDealsPerMsg != 0 && tc.dealCountWithinPublishPeriod%int(tc.maxDealsPerMsg) != 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
return !dp.publishPeriodStart.IsZero()
}, time.Second, time.Millisecond, "failed to queue deals")
}

// Then wait to send
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()

// Advance if necessary.
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}

return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}

// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
}

if tc.publishPeriod > 0 && tc.dealCountAfterPublishPeriod > 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}

checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
})
}
Expand All @@ -133,7 +177,7 @@ func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)

// Create a deal publisher
start := time.Now()
start := build.Clock.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
Expand All @@ -152,15 +196,15 @@ func TestForcePublish(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)

// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
build.Clock.Sleep(10 * time.Millisecond)

// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now()))

// Force publish all pending deals
dp.ForcePublishPendingDeals()
Expand Down
6 changes: 3 additions & 3 deletions markets/storageadapter/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
}

p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
curTime := time.Now()
for time.Since(curTime) < addPieceRetryTimeout {
curTime := build.Clock.Now()
for build.Clock.Since(curTime) < addPieceRetryTimeout {
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
if err != nil {
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
}
break
}
select {
case <-time.After(addPieceRetryWait):
case <-build.Clock.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")
Expand Down