Skip to content

Commit

Permalink
fix(f3): poll the lease by repeatedly participating instead of checki…
Browse files Browse the repository at this point in the history
…ng progress

Previously, the flow was:

1. Get ticket.
2. Participate.
3. Repeatedly poll progress.

The new flow is:

1. Get ticket.
2. Repeatedly participate, using the returned lease as an indicator of progress.

That way, if the lotus node reboots we'll eventually re-tell them about
the lease.

fixes filecoin-project/go-f3#719
  • Loading branch information
Stebalien committed Oct 25, 2024
1 parent 703333c commit 1ae4fa7
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 72 deletions.
134 changes: 67 additions & 67 deletions chain/lf3/participation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/lotus/api"
Expand All @@ -37,7 +36,6 @@ const (
type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

Expand All @@ -46,7 +44,6 @@ type Participant struct {
participant address.Address
backoff *backoff.Backoff
maxCheckProgressAttempts int
previousTicket api.F3ParticipationTicket
leaseTerm uint64

runningCtx context.Context
Expand Down Expand Up @@ -92,21 +89,18 @@ func (p *Participant) run(ctx context.Context) (_err error) {
}
}()

var ticket api.F3ParticipationTicket
for ctx.Err() == nil {
var err error
start := time.Now()
ticket, err := p.tryGetF3ParticipationTicket(ctx)
ticket, err = p.tryGetF3ParticipationTicket(ctx, ticket)
if err != nil {
return err
}
lease, participating, err := p.tryF3Participate(ctx, ticket)
err = p.tryParticipate(ctx, ticket)
if err != nil {
return err
}
if participating {
if err := p.awaitLeaseExpiry(ctx, lease); err != nil {
return err
}
}
const minPeriod = 500 * time.Millisecond
if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod {
select {
Expand All @@ -120,10 +114,10 @@ func (p *Participant) run(ctx context.Context) (_err error) {
return ctx.Err()
}

func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) {
func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context, previousTicket api.F3ParticipationTicket) (api.F3ParticipationTicket, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, previousTicket, p.leaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
Expand All @@ -142,25 +136,51 @@ func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3Pa
return api.F3ParticipationTicket{}, ctx.Err()
}

func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
func (p *Participant) getManifest(ctx context.Context) (*manifest.Manifest, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch manifest, err := p.node.F3GetManifest(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return nil, xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
log.Errorw("Error when fetching F3 manifest. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
case manifest == nil:
// Can happen if we reboot and have no manifest.
log.Warnw("Received no F3 manifest from lotus. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration())
default:
return manifest, nil
}
p.backOff(ctx)
}
return nil, ctx.Err()
}

func (p *Participant) tryParticipate(ctx context.Context, ticket api.F3ParticipationTicket) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
var (
manifest *manifest.Manifest
haveLease bool
)
for ctx.Err() == nil {
switch lease, err := p.node.F3Participate(ctx, ticket); {
lease, err := p.node.F3Participate(ctx, ticket)
switch {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
return ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
Expand All @@ -171,69 +191,49 @@ func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3Partici
p.backOff(ctx)
continue
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
continue
case lease.ValidityTerm <= renewLeaseWithin:
return nil
default:
// we succeeded so reset the backoff.
p.backoff.Reset()
}

// Log the first time we give out the lease.
if !haveLease {
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
p.previousTicket = ticket
return lease, true, nil
haveLease = true
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
for ctx.Err() == nil {
manifest, err := p.node.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
// Fetch the manifest if necessary.
if manifest == nil || lease.Network != manifest.NetworkName {
manifest, err = p.getManifest(ctx)
if err != nil {
return err
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
continue
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := p.node.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
if manifest.NetworkName != lease.Network {
log.Warnf("Got a manifest for network %q while waiting for a lease on network %q. Getting another ticket.", manifest.NetworkName, lease.Network)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
p.backOffFor(ctx, waitTime)
}

// Wait until we think we may need to renew the lease.
waitTime := time.Duration(lease.ValidityTerm-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", lease.ValidityTerm, waitTime)
p.backOffFor(ctx, waitTime)
}
return ctx.Err()
}
Expand Down
17 changes: 12 additions & 5 deletions chain/lf3/participation_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,25 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat
l.mutex.Lock()
defer l.mutex.Unlock()
currentLease, found := l.leases[newLease.MinerID]
if found && currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
if !found {
if found {
// short-circuite for reparticipation.
if currentLease == newLease {
return newLease, nil
}
if currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
} else {
log.Infof("started participating in F3 for miner %d", newLease.MinerID)
}
l.leases[newLease.MinerID] = newLease
select {
case l.notifyParticipation <- struct{}{}:
default:
}
newLease.ValidityTerm = instant.ID - newLease.FromInstance
newLease.FromInstance = instant.ID
return newLease, nil
}

Expand Down

0 comments on commit 1ae4fa7

Please sign in to comment.