Skip to content

Commit

Permalink
Integrate finality exchange (#433)
Browse files Browse the repository at this point in the history
* Integrate finality exchange

I've also changed the finality exchange start/stop methods to take
contexts for consistency.

* Add the ability to explicitly pause/resume F3 instances

Mostly useful for testing, but we may want to offer users a command to
do the same thing.

* Record errors from illegal received certificates.

* Test F3 pause/resume/catchup

1. If 2/3 nodes are paused, we should stop producing instances.
2. If we resume one node, we should start coming to agreement again.
3. If we resume the final node, even after a long delay, it should catch up.
4. Finally, the final node should be able to participate in consensus.
  • Loading branch information
Stebalien authored Jul 9, 2024
1 parent d56edf1 commit f08fbf9
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 32 deletions.
1 change: 1 addition & 0 deletions certexchange/polling/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (p *Poller) Poll(ctx context.Context, peer peer.ID) (*PollResult, error) {
)
if err != nil {
res.Status = PollIllegal
res.Error = err
return res, nil
}
if err := p.Store.Put(ctx, cert); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions certexchange/polling/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestPoller(t *testing.T) {
Store: serverCs,
}

require.NoError(t, server.Start())
t.Cleanup(func() { require.NoError(t, server.Stop()) })
require.NoError(t, server.Start(ctx))
t.Cleanup(func() { require.NoError(t, server.Stop(context.Background())) })

clientDs := ds_sync.MutexWrap(datastore.NewMapDatastore())
clientCs, err := certstore.CreateStore(ctx, clientDs, 0, cg.PowerTable)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestPoller(t *testing.T) {
}

// Stop the server, and make sure we get a failure.
require.NoError(t, server.Stop())
require.NoError(t, server.Stop(ctx))

{
res, err := poller.Poll(ctx, serverHost.ID())
Expand Down
6 changes: 3 additions & 3 deletions certexchange/polling/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ type Subscriber struct {
stop context.CancelFunc
}

func (s *Subscriber) Start() error {
func (s *Subscriber) Start(startCtx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
s.stop = cancel

var err error

s.peerTracker = newPeerTracker()
s.poller, err = NewPoller(ctx, &s.Client, s.Store, s.SignatureVerifier)
s.poller, err = NewPoller(startCtx, &s.Client, s.Store, s.SignatureVerifier)
if err != nil {
return err
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (s *Subscriber) Start() error {
return nil
}

func (s *Subscriber) Stop() error {
func (s *Subscriber) Stop(stopCtx context.Context) error {
if s.stop != nil {
s.stop()
s.wg.Wait()
Expand Down
8 changes: 4 additions & 4 deletions certexchange/polling/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, mocknet.LinkAll())

for _, server := range servers {
require.NoError(t, server.Start())
t.Cleanup(func() { require.NoError(t, server.Stop()) })
require.NoError(t, server.Start(ctx))
t.Cleanup(func() { require.NoError(t, server.Stop(context.Background())) })
}

clientDs := ds_sync.MutexWrap(datastore.NewMapDatastore())
Expand All @@ -73,9 +73,9 @@ func TestSubscriber(t *testing.T) {
InitialPollInterval: 100 * time.Millisecond,
}

require.NoError(t, subscriber.Start())
require.NoError(t, subscriber.Start(ctx))

t.Cleanup(func() { require.NoError(t, subscriber.Stop()) })
t.Cleanup(func() { require.NoError(t, subscriber.Stop(context.Background())) })

require.NoError(t, mocknet.ConnectAllButSelf())

Expand Down
4 changes: 2 additions & 2 deletions certexchange/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func TestClientServer(t *testing.T) {
NetworkName: testNetworkName,
}

require.NoError(t, server.Start())
t.Cleanup(func() { require.NoError(t, server.Stop()) })
require.NoError(t, server.Start(ctx))
t.Cleanup(func() { require.NoError(t, server.Stop(context.Background())) })

require.NoError(t, mocknet.ConnectAllButSelf())

Expand Down
4 changes: 2 additions & 2 deletions certexchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *Server) handleRequest(ctx context.Context, stream network.Stream) (_err
}

// Start the server.
func (s *Server) Start() error {
func (s *Server) Start(startCtx context.Context) error {
s.runningLk.Lock()
defer s.runningLk.Unlock()
if s.stopFunc != nil {
Expand Down Expand Up @@ -156,7 +156,7 @@ func (s *Server) Start() error {
}

// Stop the server.
func (s *Server) Stop() error {
func (s *Server) Stop(stopCtx context.Context) error {
// Ask the handlers to cancel/stop.
s.runningLk.RLock()
if s.stopFunc != nil {
Expand Down
118 changes: 100 additions & 18 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package f3

import (
"context"
"fmt"
"sync"
"time"

"github.com/filecoin-project/go-f3/certexchange"
certexpoll "github.com/filecoin-project/go-f3/certexchange/polling"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/certstore"
"github.com/filecoin-project/go-f3/ec"
Expand Down Expand Up @@ -40,6 +43,8 @@ type F3 struct {
cs *certstore.Store
manifest *manifest.Manifest
runner *gpbftRunner
certsub *certexpoll.Subscriber
certserv *certexchange.Server
}

// New creates and setups f3 with libp2p
Expand Down Expand Up @@ -183,9 +188,8 @@ func (m *F3) Start(startCtx context.Context) (_err error) {
defer func() {
m.mu.Lock()
defer m.mu.Unlock()
if m.runner != nil {
_err = multierr.Append(_err, m.runner.Stop(context.Background()))
m.runner = nil
if err := m.stopInternal(context.Background()); err != nil {
_err = multierr.Append(_err, err)
}
}()

Expand Down Expand Up @@ -232,38 +236,116 @@ func (m *F3) Stop(stopCtx context.Context) (_err error) {
)
}

func (m *F3) reconfigure(ctx context.Context, manifest *manifest.Manifest) error {
func (m *F3) reconfigure(ctx context.Context, manifest *manifest.Manifest) (_err error) {
m.mu.Lock()
defer m.mu.Unlock()

if err := m.stopInternal(ctx); err != nil {
// Log but don't abort.
log.Errorw("failed to properly stop F3 while reconfiguring", "error", err)
}

if manifest == nil {
return nil
}

// If we have a new manifest, reconfigure.
if m.manifest == nil || m.manifest.NetworkName != manifest.NetworkName {
m.cs = nil
m.manifest = manifest
}

return m.resumeInternal(m.runningCtx)
}

func (m *F3) Pause() error {
m.mu.Lock()
defer m.mu.Unlock()

return m.stopInternal(m.runningCtx)
}

func (m *F3) Resume() error {
m.mu.Lock()
defer m.mu.Unlock()

return m.resumeInternal(m.runningCtx)
}

func (m *F3) stopInternal(ctx context.Context) error {
var err error
if m.runner != nil {
if err := m.runner.Stop(ctx); err != nil {
return err
// Log and ignore shutdown errors.
if serr := m.runner.Stop(ctx); serr != nil {
err = multierr.Append(err, fmt.Errorf("failed to stop gpbft: %w", serr))
}
m.runner = nil
}
if manifest == nil {
return nil
if m.certsub != nil {
if serr := m.certsub.Stop(ctx); serr != nil {
err = multierr.Append(err, fmt.Errorf("failed to stop certificate exchange subscriber: %w", serr))
}
m.certsub = nil
}
if m.certserv != nil {
if serr := m.certserv.Stop(ctx); serr != nil {

err = multierr.Append(err, fmt.Errorf("failed to stop certificate exchange server: %w", serr))
}
m.certserv = nil
}
return err
}

func (m *F3) resumeInternal(ctx context.Context) error {
runnerEc := m.ec
if len(manifest.PowerUpdate) > 0 {
runnerEc = ec.WithModifiedPower(m.ec, manifest.PowerUpdate)
if len(m.manifest.PowerUpdate) > 0 {
runnerEc = ec.WithModifiedPower(m.ec, m.manifest.PowerUpdate)
}

cs, err := openCertstore(m.runningCtx, runnerEc, m.ds, manifest)
if err != nil {
return xerrors.Errorf("failed to open certstore: %w", err)
// We don't reset this field if we only pause/resume.
if m.cs == nil {
cs, err := openCertstore(m.runningCtx, runnerEc, m.ds, m.manifest)
if err != nil {
return xerrors.Errorf("failed to open certstore: %w", err)
}

m.cs = cs
}

m.cs = cs
m.manifest = manifest
m.runner, err = newRunner(
m.certserv = &certexchange.Server{
NetworkName: m.manifest.NetworkName,
RequestTimeout: m.manifest.ServerRequestTimeout,
Host: m.host,
Store: m.cs,
}
if err := m.certserv.Start(ctx); err != nil {
return err
}

m.certsub = &certexpoll.Subscriber{
Client: certexchange.Client{
Host: m.host,
NetworkName: m.manifest.NetworkName,
RequestTimeout: m.manifest.ClientRequestTimeout,
},
Store: m.cs,
SignatureVerifier: m.verifier,
InitialPollInterval: m.manifest.ECPeriod,
MaximumPollInterval: m.manifest.MaximumPollInterval,
MinimumPollInterval: m.manifest.MinimumPollInterval,
}
if err := m.certsub.Start(ctx); err != nil {
return err
}

if runner, err := newRunner(
ctx, m.cs, runnerEc, m.pubsub, m.verifier,
m.busBroadcast.Publish, m.manifest,
)
if err != nil {
); err != nil {
return err
} else {
m.runner = runner
}

return m.runner.Start(ctx)
Expand Down
14 changes: 14 additions & 0 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ type ManifestProvider interface {

type Version string

// Certificate Exchange config
type CxConfig struct {
// Request timeout for the certificate exchange client.
ClientRequestTimeout time.Duration
// Request timeout for the certificate exchange server.
ServerRequestTimeout time.Duration
// Minimum CX polling interval.
MinimumPollInterval time.Duration
// Maximum CX polling interval.
MaximumPollInterval time.Duration
}

type GpbftConfig struct {
Delta time.Duration
DeltaBackOffExponent float64
Expand Down Expand Up @@ -86,6 +98,8 @@ type Manifest struct {
*GpbftConfig
// EC-specific parameters
*EcConfig
// Certificate Exchange specific parameters
*CxConfig
}

func LocalDevnetManifest() *Manifest {
Expand Down
6 changes: 6 additions & 0 deletions manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ var base manifest.Manifest = manifest.Manifest{
ECPeriod: 30 * time.Second,
BaseDecisionBackoffTable: []float64{1.3, 1.69, 2.2, 2.86, 3.71, 4.83, 6.27, 8.16, 10.6, 13.79, 15.},
},
CxConfig: &manifest.CxConfig{
ClientRequestTimeout: 10 * time.Second,
ServerRequestTimeout: time.Minute,
MinimumPollInterval: 30 * time.Second,
MaximumPollInterval: 2 * time.Minute,
},
}

func TestManifest_Serialization(t *testing.T) {
Expand Down
58 changes: 58 additions & 0 deletions test/f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,48 @@ func TestSimpleF3(t *testing.T) {
env.waitForInstanceNumber(5, 10*time.Second, false)
}

func TestPauseResumeCatchup(t *testing.T) {
env := newTestEnvironment(t, 3, false)

env.connectAll()
env.start()
env.waitForInstanceNumber(1, 10*time.Second, true)

// Pausing two nodes should pause the network.
env.pauseNode(1)
env.pauseNode(2)

oldInstance := env.nodes[0].currentGpbftInstance()
time.Sleep(time.Second)
newInstance := env.nodes[0].currentGpbftInstance()
require.Equal(t, oldInstance, newInstance)

// Resuming node 1 should continue agreeing on instances.
env.resumeNode(1)
require.Equal(t, oldInstance, newInstance)
resumeInstance := newInstance + 1
env.waitForInstanceNumber(resumeInstance, 10*time.Second, false)

// Wait until we're far enough that pure GPBFT catchup should be impossible.
targetInstance := resumeInstance + env.manifest.CommiteeLookback
env.waitForInstanceNumber(targetInstance, 30*time.Second, false)

pausedInstance := env.nodes[2].currentGpbftInstance()
require.Less(t, pausedInstance, resumeInstance)

env.resumeNode(2)

// Everyone should catch up eventually
env.waitForInstanceNumber(targetInstance, 30*time.Second, true)

// Pause the "good" node.
env.pauseNode(0)
node0failInstance := env.nodes[0].currentGpbftInstance()

// We should be able to make progress with the remaining nodes.
env.waitForInstanceNumber(node0failInstance+3, 30*time.Second, false)
}

func TestDynamicManifest_WithoutChanges(t *testing.T) {
env := newTestEnvironment(t, 2, true)

Expand Down Expand Up @@ -133,6 +175,12 @@ var base manifest.Manifest = manifest.Manifest{
ECDelayMultiplier: 1.0,
BaseDecisionBackoffTable: []float64{1., 1.2},
},
CxConfig: &manifest.CxConfig{
ClientRequestTimeout: 10 * time.Second,
ServerRequestTimeout: time.Minute,
MinimumPollInterval: 100 * time.Millisecond,
MaximumPollInterval: time.Second,
},
}

type testNode struct {
Expand Down Expand Up @@ -378,6 +426,16 @@ func (e *testEnv) start() {
e.newHeadEveryPeriod(e.manifest.ECPeriod)
}

func (e *testEnv) pauseNode(i int) {
n := e.nodes[i]
require.NoError(e.t, n.f3.Pause())
}

func (e *testEnv) resumeNode(i int) {
n := e.nodes[i]
require.NoError(e.t, n.f3.Resume())
}

func (e *testEnv) startNode(i int) {
n := e.nodes[i]
require.NoError(e.t, n.f3.Start(e.testCtx))
Expand Down

0 comments on commit f08fbf9

Please sign in to comment.