From 10bb065f76a588b1d7bc97707a7825a483dddf5c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 25 Oct 2024 17:04:50 +0000 Subject: [PATCH] fix(f3): poll the lease by repeatedly participating instead of checking progress (#12640) Previously, the flow was: 1. Get ticket. 2. Participate. 3. Repeatedly poll progress. The new flow is: 1. Get ticket. 2. Repeatedly participate, using the returned lease as an indicator of progress. That way, if the lotus node reboots we'll eventually re-tell them about the lease. fixes https://github.com/filecoin-project/go-f3/issues/719 --- CHANGELOG.md | 4 +- chain/lf3/participation.go | 134 +++++++++++++------------- chain/lf3/participation_lease.go | 17 +++- chain/lf3/participation_lease_test.go | 3 +- chain/lf3/participation_test.go | 79 ++++++++++++++- 5 files changed, 157 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 461e9d7d30d..1e36d0b9f8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ - Update `EthGetBlockByNumber` to return a pointer to ethtypes.EthBlock or nil for null rounds. ([filecoin-project/lotus#12529](https://github.com/filecoin-project/lotus/pull/12529)) - Reduce size of embedded genesis CAR files by removing WASM actor blocks and compressing with zstd. This reduces the `lotus` binary size by approximately 10 MiB. ([filecoin-project/lotus#12439](https://github.com/filecoin-project/lotus/pull/12439)) - Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517)) -- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575)) - `lotus chain head` now supports a `--height` flag to print just the epoch number of the current chain head ([filecoin-project/lotus#12609](https://github.com/filecoin-project/lotus/pull/12609)) - `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570)) - Expose APIs to list the miner IDs that are currently participating in F3 via node. ([filecoin-project/lotus#12608](https://github.com/filecoin-project/lotus/pull/12608)) @@ -18,9 +17,10 @@ - Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567)) - Event APIs (Eth events and actor events) should only return reverted events if client queries by specific block hash / tipset. Eth and actor event subscription APIs should always return reverted events to enable accurate observation of real-time changes. ([filecoin-project/lotus#12585](https://github.com/filecoin-project/lotus/pull/12585)) - Add logic to check if the miner's owner address is delegated (f4 address). If it is delegated, the `lotus-shed sectors termination-estimate` command now sends the termination state call using the worker ID. This fix resolves the issue where termination-estimate did not function correctly for miners with delegated owner addresses. ([filecoin-project/lotus#12569](https://github.com/filecoin-project/lotus/pull/12569)) +- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575)) - Fix a bug in F3 participation API where valid leases may get removed due to dynamic manifest update. ([filecoin-project/lotus#12597](https://github.com/filecoin-project/lotus/pull/12597)) - - Change the F3 participation ticket encoding to allow parity testing across non-go implementations, where a ticket issued by Lotus may need to be decoded by, for example, Forest . The changes also enforce the minimum instance participation of 1 for miners. ([filecoin-project/lotus#12615](https://github.com/filecoin-project/lotus/pull/12615)) +- Fix issue where F3 wouldn't start participating again if Lotus restarted without restarting the Miner ([filecoin-project/lotus#1240](https://github.com/filecoin-project/lotus/pull/12640)). ## Deps diff --git a/chain/lf3/participation.go b/chain/lf3/participation.go index 7d1b10bd808..280b44b9450 100644 --- a/chain/lf3/participation.go +++ b/chain/lf3/participation.go @@ -11,7 +11,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/lotus/api" @@ -37,7 +36,6 @@ const ( type F3ParticipationAPI interface { F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) - F3GetProgress(ctx context.Context) (gpbft.Instant, error) F3GetManifest(ctx context.Context) (*manifest.Manifest, error) } @@ -46,7 +44,6 @@ type Participant struct { participant address.Address backoff *backoff.Backoff maxCheckProgressAttempts int - previousTicket api.F3ParticipationTicket leaseTerm uint64 runningCtx context.Context @@ -92,21 +89,18 @@ func (p *Participant) run(ctx context.Context) (_err error) { } }() + var ticket api.F3ParticipationTicket for ctx.Err() == nil { + var err error start := time.Now() - ticket, err := p.tryGetF3ParticipationTicket(ctx) + ticket, err = p.tryGetF3ParticipationTicket(ctx, ticket) if err != nil { return err } - lease, participating, err := p.tryF3Participate(ctx, ticket) + err = p.tryParticipate(ctx, ticket) if err != nil { return err } - if participating { - if err := p.awaitLeaseExpiry(ctx, lease); err != nil { - return err - } - } const minPeriod = 500 * time.Millisecond if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod { select { @@ -120,10 +114,10 @@ func (p *Participant) run(ctx context.Context) (_err error) { return ctx.Err() } -func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) { +func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context, previousTicket api.F3ParticipationTicket) (api.F3ParticipationTicket, error) { p.backoff.Reset() for ctx.Err() == nil { - switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); { + switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, previousTicket, p.leaseTerm); { case ctx.Err() != nil: return api.F3ParticipationTicket{}, ctx.Err() case errors.Is(err, api.ErrF3Disabled): @@ -142,25 +136,51 @@ func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3Pa return api.F3ParticipationTicket{}, ctx.Err() } -func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) { +func (p *Participant) getManifest(ctx context.Context) (*manifest.Manifest, error) { + p.backoff.Reset() + for ctx.Err() == nil { + switch manifest, err := p.node.F3GetManifest(ctx); { + case errors.Is(err, api.ErrF3Disabled): + log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) + return nil, xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) + case err != nil: + log.Errorw("Error when fetching F3 manifest. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) + case manifest == nil: + // Can happen if we reboot and have no manifest. + log.Warnw("Received no F3 manifest from lotus. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration()) + default: + return manifest, nil + } + p.backOff(ctx) + } + return nil, ctx.Err() +} + +func (p *Participant) tryParticipate(ctx context.Context, ticket api.F3ParticipationTicket) error { p.backoff.Reset() + renewLeaseWithin := p.leaseTerm / 2 + var ( + manifest *manifest.Manifest + haveLease bool + ) for ctx.Err() == nil { - switch lease, err := p.node.F3Participate(ctx, ticket); { + lease, err := p.node.F3Participate(ctx, ticket) + switch { case ctx.Err() != nil: - return api.F3ParticipationLease{}, false, ctx.Err() + return ctx.Err() case errors.Is(err, api.ErrF3Disabled): log.Errorw("Cannot participate in F3 as it is disabled.", "err", err) - return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err) + return xerrors.Errorf("attempting F3 participation with ticket: %w", err) case errors.Is(err, api.ErrF3ParticipationTicketExpired): log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting): log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationTicketInvalid): log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err) p.backOff(ctx) - return api.F3ParticipationLease{}, false, nil + return nil case errors.Is(err, api.ErrF3ParticipationIssuerMismatch): log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err) p.backOff(ctx) @@ -171,69 +191,49 @@ func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3Partici p.backOff(ctx) continue case err != nil: + if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { + log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) + return nil + } log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err) p.backOff(ctx) continue + case lease.ValidityTerm <= renewLeaseWithin: + return nil default: + // we succeeded so reset the backoff. + p.backoff.Reset() + } + + // Log the first time we give out the lease. + if !haveLease { log.Infow("Successfully acquired F3 participation lease.", "issuer", lease.Issuer, "not-before", lease.FromInstance, "not-after", lease.ToInstance(), ) - p.previousTicket = ticket - return lease, true, nil + haveLease = true } - } - return api.F3ParticipationLease{}, false, ctx.Err() -} -func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error { - p.backoff.Reset() - renewLeaseWithin := p.leaseTerm / 2 - for ctx.Err() == nil { - manifest, err := p.node.F3GetManifest(ctx) - switch { - case errors.Is(err, api.ErrF3Disabled): - log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) - return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) - case err != nil: - if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { - log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) - return nil + // Fetch the manifest if necessary. + if manifest == nil || lease.Network != manifest.NetworkName { + manifest, err = p.getManifest(ctx) + if err != nil { + return err } - log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) - p.backOff(ctx) - continue - case manifest == nil || manifest.NetworkName != lease.Network: - // If we got an unexpected manifest, or no manifest, go back to the - // beginning and try to get another ticket. Switching from having a manifest - // to having no manifest can theoretically happen if the lotus node reboots - // and has no static manifest. - return nil - } - switch progress, err := p.node.F3GetProgress(ctx); { - case errors.Is(err, api.ErrF3Disabled): - log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err) - return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err) - case err != nil: - if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) { - log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err) + if manifest.NetworkName != lease.Network { + log.Warnf("Got a manifest for network %q while waiting for a lease on network %q. Getting another ticket.", manifest.NetworkName, lease.Network) return nil } - log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err) - p.backOff(ctx) - case progress.ID+renewLeaseWithin >= lease.ToInstance(): - log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance()) - return nil - default: - remainingInstanceLease := lease.ToInstance() - progress.ID - waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment - if waitTime == 0 { - waitTime = 100 * time.Millisecond - } - log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime) - p.backOffFor(ctx, waitTime) } + + // Wait until we think we may need to renew the lease. + waitTime := time.Duration(lease.ValidityTerm-renewLeaseWithin) * manifest.CatchUpAlignment + if waitTime == 0 { + waitTime = 100 * time.Millisecond + } + log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", lease.ValidityTerm, waitTime) + p.backOffFor(ctx, waitTime) } return ctx.Err() } diff --git a/chain/lf3/participation_lease.go b/chain/lf3/participation_lease.go index 6fd03949c05..ab3d1c60135 100644 --- a/chain/lf3/participation_lease.go +++ b/chain/lf3/participation_lease.go @@ -101,11 +101,16 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat l.mutex.Lock() defer l.mutex.Unlock() currentLease, found := l.leases[newLease.MinerID] - if found && currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance { - // For safety, strictly require lease start instance to never decrease. - return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting - } - if !found { + if found { + // short-circuite for reparticipation. + if currentLease == newLease { + return newLease, nil + } + if currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance { + // For safety, strictly require lease start instance to never decrease. + return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting + } + } else { log.Infof("started participating in F3 for miner %d", newLease.MinerID) } l.leases[newLease.MinerID] = newLease @@ -113,6 +118,8 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat case l.notifyParticipation <- struct{}{}: default: } + newLease.ValidityTerm = newLease.ToInstance() - instant.ID + newLease.FromInstance = instant.ID return newLease, nil } diff --git a/chain/lf3/participation_lease_test.go b/chain/lf3/participation_lease_test.go index 5c045768e86..c9dc11ac29e 100644 --- a/chain/lf3/participation_lease_test.go +++ b/chain/lf3/participation_lease_test.go @@ -32,7 +32,8 @@ func TestLeaser(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(123), lease.MinerID) require.Equal(t, issuer.String(), lease.Issuer) - require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5) + require.Equal(t, uint64(10), lease.FromInstance) // Current instance (10) + offset (5) + require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5) }) t.Run("get participants", func(t *testing.T) { progress.currentInstance = 11 diff --git a/chain/lf3/participation_test.go b/chain/lf3/participation_test.go index 79564d467eb..cfc2276c6fa 100644 --- a/chain/lf3/participation_test.go +++ b/chain/lf3/participation_test.go @@ -7,10 +7,10 @@ import ( "time" "github.com/jpillora/backoff" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/lotus/api" @@ -41,10 +41,6 @@ func (m *manifestFailAPI) F3GetOrRenewParticipationTicket(ctx context.Context, m } } -func (m *manifestFailAPI) F3GetProgress(ctx context.Context) (gpbft.Instant, error) { - return gpbft.Instant{}, nil -} - func (m *manifestFailAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) { return api.F3ParticipationLease{ Network: "test", @@ -73,3 +69,76 @@ func TestParticipantManifestFailure(t *testing.T) { <-api.manifestRequested require.NoError(t, p.Stop(context.Background())) } + +type repeatedParticipateAPI struct { + secondTicket chan struct{} + instance uint64 + t *testing.T +} + +func (m *repeatedParticipateAPI) F3GetManifest(ctx context.Context) (*manifest.Manifest, error) { + return &manifest.Manifest{ + NetworkName: "test", + CatchUpAlignment: time.Millisecond, + }, nil +} + +func (m *repeatedParticipateAPI) F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) { + switch string(previous) { + case "first ticket": + return api.F3ParticipationTicket("second ticket"), nil + case "": + return api.F3ParticipationTicket("first ticket"), nil + default: + panic("unexpected ticket") + } +} + +func (m *repeatedParticipateAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) { + switch string(ticket) { + case "first ticket": + case "second ticket": + // This is 6, not 5, because we expect one final call to participate before getting + // a new ticket. + assert.Equal(m.t, uint64(6), m.instance) + close(m.secondTicket) + return api.F3ParticipationLease{}, api.ErrF3ParticipationIssuerMismatch + default: + m.t.Errorf("unexpected f3 ticket: %s", string(ticket)) + return api.F3ParticipationLease{}, api.ErrF3Disabled + } + + if m.instance >= 10 { + m.t.Error("did not expect the participant to continue past the half-way point") + return api.F3ParticipationLease{}, api.ErrF3Disabled + } + + lease := api.F3ParticipationLease{ + Network: "test", + Issuer: "foobar", + MinerID: 0, + FromInstance: m.instance, + ValidityTerm: 10 - m.instance, + } + m.instance++ + + return lease, nil +} + +// Make sure we keep calling participate until our validity term drops to half (5) of the initial +// term (10). At that point, the participant should request a new ticket. +func TestParticipantRepeat(t *testing.T) { + api := &repeatedParticipateAPI{secondTicket: make(chan struct{}), t: t} + addr, err := address.NewIDAddress(1000) + require.NoError(t, err) + + p := lf3.NewParticipant(context.Background(), api, dtypes.MinerAddress(addr), + &backoff.Backoff{ + Min: 1 * time.Second, + Max: 1 * time.Minute, + Factor: 1.5, + }, 13, 10) + require.NoError(t, p.Start(context.Background())) + <-api.secondTicket + require.NoError(t, p.Stop(context.Background())) +}