From 1ae4fa713758b81fc7f402f34e466f20bc12b20b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 24 Oct 2024 13:19:37 -0700 Subject: [PATCH] fix(f3): poll the lease by repeatedly participating instead of checking progress Previously, the flow was: 1. Get ticket. 2. Participate. 3. Repeatedly poll progress. The new flow is: 1. Get ticket. 2. Repeatedly participate, using the returned lease as an indicator of progress. That way, if the lotus node reboots we'll eventually re-tell them about the lease. fixes https://github.com/filecoin-project/go-f3/issues/719 --- chain/lf3/participation.go | 134 +++++++++++++++---------------- chain/lf3/participation_lease.go | 17 ++-- 2 files changed, 79 insertions(+), 72 deletions(-) diff --git a/chain/lf3/participation.go b/chain/lf3/participation.go index 7d1b10bd808..280b44b9450 100644 --- a/chain/lf3/participation.go +++ b/chain/lf3/participation.go @@ -11,7 +11,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/lotus/api" @@ -37,7 +36,6 @@ const ( type F3ParticipationAPI interface { F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) - F3GetProgress(ctx context.Context) (gpbft.Instant, error) F3GetManifest(ctx context.Context) (*manifest.Manifest, error) } @@ -46,7 +44,6 @@ type Participant struct { participant address.Address backoff *backoff.Backoff maxCheckProgressAttempts int - previousTicket api.F3ParticipationTicket leaseTerm uint64 runningCtx context.Context @@ -92,21 +89,18 @@ func (p *Participant) run(ctx context.Context) (_err error) { } }() + var ticket api.F3ParticipationTicket for ctx.Err() == nil { + var err error start := time.Now() - ticket, err := p.tryGetF3ParticipationTicket(ctx) + ticket, err = p.tryGetF3ParticipationTicket(ctx, ticket) if err != nil { return err } - lease, participating, err := p.tryF3Participate(ctx, ticket) + err = p.tryParticipate(ctx, ticket) if err != nil { return err } - if participating { - if err := p.awaitLeaseExpiry(ctx, lease); err != nil { - return err - } - } const minPeriod = 500 * time.Millisecond if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod { select { @@ -120,10 +114,10 @@ func (p *Participant) run(ctx context.Context) (_err error) { return ctx.Err() } -func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) { +func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context, previousTicket api.F3ParticipationTicket) (api.F3ParticipationTicket, error) { p.backoff.Reset() for ctx.Err() == nil { - switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); { + switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, previousTicket, p.leaseTerm); { case ctx.Err() != nil: return api.F3ParticipationTicket{}, ctx.Err() case errors.Is(err, api.ErrF3Disabled): @@ -142,25 +136,51 @@ func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3Pa return api.F3ParticipationTicket{}, ctx.Err() } -func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) { +func (p *Participant) getManifest(ctx context.Context) (*manifest.Manifest, error) { + p.backoff.Reset() + for ctx.Err() == nil { + switch manifest, err := p.node.F3GetManifest(ctx); { + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) + return nil, xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) + case err != nil: + log.Errorw("Error when fetching F3 manifest. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) + case manifest == nil: + // Can happen if we reboot and have no manifest. + log.Warnw("Received no F3 manifest from lotus. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration()) + default: + return manifest, nil + } + p.backOff(ctx) + } + return nil, ctx.Err() +} + +func (p *Participant) tryParticipate(ctx context.Context, ticket api.F3ParticipationTicket) error { p.backoff.Reset() + renewLeaseWithin := p.leaseTerm / 2 + var ( + manifest *manifest.Manifest + haveLease bool + ) for ctx.Err() == nil { - switch lease, err := p.node.F3Participate(ctx, ticket); { + lease, err := p.node.F3Participate(ctx, ticket) + switch { case ctx.Err() != nil: - return api.F3ParticipationLease{}, false, ctx.Err() + return ctx.Err() case errors.Is(err, api.ErrF3Disabled): log.Errorw("Cannot participate in F3 as it is disabled.", "err", err) - return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err) + return xerrors.Errorf("attempting F3 participation with ticket: %w", err) case errors.Is(err, api.ErrF3ParticipationTicketExpired): log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting): log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationTicketInvalid): log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err) p.backOff(ctx) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationIssuerMismatch): log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err) p.backOff(ctx) @@ -171,69 +191,49 @@ func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3Partici p.backOff(ctx) continue case err != nil: + if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { + log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) + return nil + } log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err) p.backOff(ctx) continue + case lease.ValidityTerm <= renewLeaseWithin: + return nil default: + // we succeeded so reset the backoff. + p.backoff.Reset() + } + + // Log the first time we give out the lease. + if !haveLease { log.Infow("Successfully acquired F3 participation lease.", "issuer", lease.Issuer, "not-before", lease.FromInstance, "not-after", lease.ToInstance(), ) - p.previousTicket = ticket - return lease, true, nil + haveLease = true } - } - return api.F3ParticipationLease{}, false, ctx.Err() -} -func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error { - p.backoff.Reset() - renewLeaseWithin := p.leaseTerm / 2 - for ctx.Err() == nil { - manifest, err := p.node.F3GetManifest(ctx) - switch { - case errors.Is(err, api.ErrF3Disabled): - log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) - return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) - case err != nil: - if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { - log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) - return nil + // Fetch the manifest if necessary. + if manifest == nil || lease.Network != manifest.NetworkName { + manifest, err = p.getManifest(ctx) + if err != nil { + return err } - log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) - p.backOff(ctx) - continue - case manifest == nil || manifest.NetworkName != lease.Network: - // If we got an unexpected manifest, or no manifest, go back to the - // beginning and try to get another ticket. Switching from having a manifest - // to having no manifest can theoretically happen if the lotus node reboots - // and has no static manifest. - return nil - } - switch progress, err := p.node.F3GetProgress(ctx); { - case errors.Is(err, api.ErrF3Disabled): - log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) - return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) - case err != nil: - if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { - log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) + if manifest.NetworkName != lease.Network { + log.Warnf("Got a manifest for network %q while waiting for a lease on network %q. Getting another ticket.", manifest.NetworkName, lease.Network) return nil } - log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) - p.backOff(ctx) - case progress.ID+renewLeaseWithin >= lease.ToInstance(): - log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance()) - return nil - default: - remainingInstanceLease := lease.ToInstance() - progress.ID - waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment - if waitTime == 0 { - waitTime = 100 * time.Millisecond - } - log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime) - p.backOffFor(ctx, waitTime) } + + // Wait until we think we may need to renew the lease. + waitTime := time.Duration(lease.ValidityTerm-renewLeaseWithin) * manifest.CatchUpAlignment + if waitTime == 0 { + waitTime = 100 * time.Millisecond + } + log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", lease.ValidityTerm, waitTime) + p.backOffFor(ctx, waitTime) } return ctx.Err() } diff --git a/chain/lf3/participation_lease.go b/chain/lf3/participation_lease.go index 6fd03949c05..b987e4c6364 100644 --- a/chain/lf3/participation_lease.go +++ b/chain/lf3/participation_lease.go @@ -101,11 +101,16 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat l.mutex.Lock() defer l.mutex.Unlock() currentLease, found := l.leases[newLease.MinerID] - if found && currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance { - // For safety, strictly require lease start instance to never decrease. - return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting - } - if !found { + if found { + // short-circuite for reparticipation. + if currentLease == newLease { + return newLease, nil + } + if currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance { + // For safety, strictly require lease start instance to never decrease. + return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting + } + } else { log.Infof("started participating in F3 for miner %d", newLease.MinerID) } l.leases[newLease.MinerID] = newLease @@ -113,6 +118,8 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat case l.notifyParticipation <- struct{}{}: default: } + newLease.ValidityTerm = instant.ID - newLease.FromInstance + newLease.FromInstance = instant.ID return newLease, nil }