Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit and Precommit batcher cannot share a getSectorDeadline method #6416

Merged
merged 4 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/params_nerpanet.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const UpgradeOrangeHeight = 300

const UpgradeTrustHeight = 600
const UpgradeNorwegianHeight = 201000
const UpgradeActorsV4Height = 203000
const UpgradeTurboHeight = 203000
const UpgradeHyperdriveHeight = 999999999

func init() {
Expand Down
85 changes: 54 additions & 31 deletions extern/storage-sealing/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"sync"
"time"

"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/lotus/chain/actors"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -34,6 +38,7 @@ type CommitBatcherApi interface {

StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
}

type AggregateInput struct {
Expand All @@ -51,9 +56,9 @@ type CommitBatcher struct {
getConfig GetSealingConfigFunc
prover ffiwrapper.Prover

deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes

notify, stop, stopped chan struct{}
force chan chan []sealiface.CommitBatchRes
Expand All @@ -70,9 +75,9 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig,
prover: prov,

deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},

notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.CommitBatchRes),
Expand Down Expand Up @@ -132,30 +137,30 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return nil
}

var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}

if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}

deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}

wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
Expand Down Expand Up @@ -208,7 +213,7 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}

Expand Down Expand Up @@ -369,16 +374,15 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i

// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
sn := s.SectorNumber

cu, err := b.getCommitCutoff(s)
if err != nil {
log.Errorf("getting chain head: %s", err)
return sealiface.CommitBatchRes{}, nil
return sealiface.CommitBatchRes{}, err
}

sn := s.SectorNumber

b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = cu
b.todo[sn] = in

sent := make(chan sealiface.CommitBatchRes, 1)
Expand Down Expand Up @@ -452,24 +456,43 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
}
}

func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
// TODO: If this returned epochs, it would make testing much easier
func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
arajasek marked this conversation as resolved.
Show resolved Hide resolved
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
return time.Now(), xerrors.Errorf("getting chain head: %s", err)
}

nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return time.Now(), xerrors.Errorf("getting network version: %s", err)
}

pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok)
if err != nil {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
}

cutoffEpoch := pci.PreCommitEpoch + policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), si.SectorType)

for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}

if deadlineEpoch <= curEpoch {
return time.Now()
if cutoffEpoch <= curEpoch {
return time.Now(), nil
}

return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
}

func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {
Expand Down
62 changes: 43 additions & 19 deletions extern/storage-sealing/precommit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"sync"
"time"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -40,9 +43,9 @@ type PreCommitBatcher struct {
feeCfg FeeConfig
getConfig GetSealingConfigFunc

deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes

notify, stop, stopped chan struct{}
force chan chan []sealiface.PreCommitBatchRes
Expand All @@ -58,9 +61,9 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
feeCfg: feeCfg,
getConfig: getConfig,

deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},

notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.PreCommitBatchRes),
Expand Down Expand Up @@ -120,30 +123,30 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return nil
}

var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}

if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}

deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}

wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
Expand Down Expand Up @@ -191,7 +194,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}

Expand Down Expand Up @@ -254,7 +257,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
sn := s.SectorNumber

b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = getPreCommitCutoff(curEpoch, s)
b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
Expand Down Expand Up @@ -330,3 +333,24 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error {
return ctx.Err()
}
}

// TODO: If this returned epochs, it would make testing much easier
func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}

if cutoffEpoch <= curEpoch {
return time.Now()
}

return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
}