Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fund miners with the aggregate fee when ProveCommitting #6428

Merged
merged 5 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/params_nerpanet.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const UpgradeOrangeHeight = 300

const UpgradeTrustHeight = 600
const UpgradeNorwegianHeight = 201000
const UpgradeActorsV4Height = 203000
const UpgradeTurboHeight = 203000
const UpgradeHyperdriveHeight = 999999999

func init() {
Expand Down
30 changes: 30 additions & 0 deletions chain/actors/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package policy
import (
"sort"

"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
Expand Down Expand Up @@ -367,3 +369,31 @@ func GetDeclarationsMax(nwVer network.Version) int {
panic("unsupported network version")
}
}

func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {

case actors.Version0:

return big.Zero()

case actors.Version2:

return big.Zero()

case actors.Version3:

return big.Zero()

case actors.Version4:

return big.Zero()

case actors.Version5:

return miner5.AggregateNetworkFee(aggregateSize, baseFee)

default:
panic("unsupported network version")
}
}
17 changes: 17 additions & 0 deletions chain/actors/policy/policy.go.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package policy
import (
"sort"

"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
Expand Down Expand Up @@ -246,3 +248,18 @@ func GetDeclarationsMax(nwVer network.Version) int {
panic("unsupported network version")
}
}

func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {
{{range .versions}}
case actors.Version{{.}}:
{{if (le . 4)}}
return big.Zero()
{{else}}
return miner{{.}}.AggregateNetworkFee(aggregateSize, baseFee)
{{end}}
{{end}}
default:
panic("unsupported network version")
}
}
101 changes: 69 additions & 32 deletions extern/storage-sealing/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"sync"
"time"

"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/lotus/chain/actors"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand All @@ -31,9 +35,11 @@ type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
ChainBaseFee(context.Context, TipSetToken) (abi.TokenAmount, error)

StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
}

type AggregateInput struct {
Expand All @@ -51,9 +57,9 @@ type CommitBatcher struct {
getConfig GetSealingConfigFunc
prover ffiwrapper.Prover

deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes

notify, stop, stopped chan struct{}
force chan chan []sealiface.CommitBatchRes
Expand All @@ -70,9 +76,9 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig,
prover: prov,

deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},

notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.CommitBatchRes),
Expand Down Expand Up @@ -132,30 +138,30 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return nil
}

var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}

if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}

deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}

wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
Expand Down Expand Up @@ -208,7 +214,7 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}

Expand Down Expand Up @@ -285,7 +291,20 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}

goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, collateral)
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err)
}

nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}

aggFee := policy.AggregateNetworkFee(nv, len(infos), bf)

goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, big.Add(collateral, aggFee))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.... shouldn't we have a new field for this? Or shouldn't the max fee be a function of the number of sectors, or something like that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6420 does that


from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
Expand Down Expand Up @@ -369,16 +388,15 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i

// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
sn := s.SectorNumber

cu, err := b.getCommitCutoff(s)
if err != nil {
log.Errorf("getting chain head: %s", err)
return sealiface.CommitBatchRes{}, nil
return sealiface.CommitBatchRes{}, err
}

sn := s.SectorNumber

b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = cu
b.todo[sn] = in

sent := make(chan sealiface.CommitBatchRes, 1)
Expand Down Expand Up @@ -452,24 +470,43 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
}
}

func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
// TODO: If this returned epochs, it would make testing much easier
func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
return time.Now(), xerrors.Errorf("getting chain head: %s", err)
}

nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return time.Now(), xerrors.Errorf("getting network version: %s", err)
}

pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok)
if err != nil {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
}

cutoffEpoch := pci.PreCommitEpoch + policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), si.SectorType)

for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}

if deadlineEpoch <= curEpoch {
return time.Now()
if cutoffEpoch <= curEpoch {
return time.Now(), nil
}

return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
}

func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {
Expand Down
62 changes: 43 additions & 19 deletions extern/storage-sealing/precommit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"sync"
"time"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -40,9 +43,9 @@ type PreCommitBatcher struct {
feeCfg FeeConfig
getConfig GetSealingConfigFunc

deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes

notify, stop, stopped chan struct{}
force chan chan []sealiface.PreCommitBatchRes
Expand All @@ -58,9 +61,9 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
feeCfg: feeCfg,
getConfig: getConfig,

deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},

notify: make(chan struct{}, 1),
force: make(chan chan []sealiface.PreCommitBatchRes),
Expand Down Expand Up @@ -120,30 +123,30 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return nil
}

var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}

if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}

deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}

wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
Expand Down Expand Up @@ -191,7 +194,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}

Expand Down Expand Up @@ -254,7 +257,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
sn := s.SectorNumber

b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = getPreCommitCutoff(curEpoch, s)
b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
Expand Down Expand Up @@ -330,3 +333,24 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error {
return ctx.Err()
}
}

// TODO: If this returned epochs, it would make testing much easier
func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}

if cutoffEpoch <= curEpoch {
return time.Now()
}

return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
}
Loading