Skip to content

Commit

Permalink
Implement GPBFT message compression using zstd (#793)
Browse files Browse the repository at this point in the history
Add the ability to compress GPBFT messages controllable via manifest.
Implement benchmarks to compare vanilla CBOR and ZSTD encoding.

Basic local run:
```
BenchmarkCborEncoding-12    	   47173	     25491 ns/op	  135409 B/op	      87 allocs/op
BenchmarkCborDecoding-12    	   64550	     18078 ns/op	   61728 B/op	     209 allocs/op
BenchmarkZstdEncoding-12    	   29061	     41489 ns/op	  193455 B/op	      88 allocs/op
BenchmarkZstdDecoding-12    	   66172	     17924 ns/op	  176517 B/op	     211 allocs/op
```

Fixes #786
  • Loading branch information
masih authored Dec 13, 2024
1 parent fde5157 commit b04ede1
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 14 deletions.
26 changes: 26 additions & 0 deletions f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,31 @@ func TestF3DynamicManifest_WithPauseAndRebootstrap(t *testing.T) {
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3DynamicManifest_RebootstrapWithCompression(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).withDynamicManifest().start()
env.waitForInstanceNumber(10, 30*time.Second, true)

env.manifest.Pause = true
env.updateManifest()

env.waitForNodesStopped()

env.manifest.BootstrapEpoch = 956
env.manifest.PubSub.CompressionEnabled = true
env.manifest.Pause = false
env.updateManifest()
env.waitForManifest()

env.clock.Add(1 * time.Minute)

env.waitForInstanceNumber(3, 30*time.Second, true)
env.requireEqualManifests(true)

cert0, err := env.nodes[0].f3.GetCert(env.testCtx, 0)
require.NoError(t, err)
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3LateBootstrap(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).start()

Expand Down Expand Up @@ -286,6 +311,7 @@ var base = manifest.Manifest{
EC: manifest.DefaultEcConfig,
CertificateExchange: manifest.DefaultCxConfig,
CatchUpAlignment: manifest.DefaultCatchUpAlignment,
PubSub: manifest.DefaultPubSubConfig,
}

type testNode struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/klauspost/compress v1.17.11
github.com/libp2p/go-libp2p v0.37.2
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/marcboeker/go-duckdb v1.8.2
Expand Down Expand Up @@ -67,7 +68,6 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
35 changes: 22 additions & 13 deletions host.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package f3

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -51,7 +50,8 @@ type gpbftRunner struct {
msgsMutex sync.Mutex
selfMessages map[uint64]map[roundPhase][]*gpbft.GMessage

inputs gpbftInputs
inputs gpbftInputs
msgEncoding gMessageEncoding
}

type roundPhase struct {
Expand Down Expand Up @@ -132,6 +132,15 @@ func newRunner(
return nil, fmt.Errorf("creating participant: %w", err)
}
runner.participant = p

if runner.manifest.PubSub.CompressionEnabled {
runner.msgEncoding, err = newZstdGMessageEncoding()
if err != nil {
return nil, err
}
} else {
runner.msgEncoding = &cborGMessageEncoding{}
}
return runner, nil
}

Expand Down Expand Up @@ -443,13 +452,12 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage)
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
err = msg.MarshalCBOR(&bw)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
return fmt.Errorf("encoding GMessage for broadcast: %w", err)
}

err = h.topic.Publish(ctx, bw.Bytes())
err = h.topic.Publish(ctx, encoded)
if err != nil {
return fmt.Errorf("publishing message: %w", err)
}
Expand All @@ -464,11 +472,11 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
if err := msg.MarshalCBOR(&bw); err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("encoding GMessage for broadcast: %w", err)
}
if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil {
if err := h.topic.Publish(h.runningCtx, encoded); err != nil {
return fmt.Errorf("publishing message: %w", err)
}
return nil
Expand All @@ -481,12 +489,13 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
recordValidationTime(ctx, start, _result)
}(time.Now())

var gmsg gpbft.GMessage
if err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)); err != nil {
gmsg, err := h.msgEncoding.Decode(msg.Data)
if err != nil {
log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err)
return pubsub.ValidationReject
}

switch validatedMessage, err := h.participant.ValidateMessage(&gmsg); {
switch validatedMessage, err := h.participant.ValidateMessage(gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugf("validation error during validation: %+v", err)
return pubsub.ValidationReject
Expand Down
16 changes: 16 additions & 0 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
MaximumPollInterval: 4 * DefaultEcConfig.Period,
}

DefaultPubSubConfig = PubSubConfig{
CompressionEnabled: false,
}

// Default instance alignment when catching up.
DefaultCatchUpAlignment = DefaultEcConfig.Period / 2
)
Expand Down Expand Up @@ -194,6 +198,12 @@ func (e *EcConfig) Validate() error {
return nil
}

type PubSubConfig struct {
CompressionEnabled bool
}

func (p *PubSubConfig) Validate() error { return nil }

// Manifest identifies the specific configuration for the F3 instance currently running.
type Manifest struct {
// Pause stops the participation in F3.
Expand Down Expand Up @@ -227,6 +237,8 @@ type Manifest struct {
EC EcConfig
// Certificate Exchange specific parameters
CertificateExchange CxConfig
// PubSubConfig specifies the pubsub related configuration.
PubSub PubSubConfig
}

func (m *Manifest) Equal(o *Manifest) bool {
Expand Down Expand Up @@ -289,6 +301,9 @@ func (m *Manifest) Validate() error {
if err := m.CertificateExchange.Validate(); err != nil {
return fmt.Errorf("invalid manifest: invalid certificate exchange config: %w", err)
}
if err := m.PubSub.Validate(); err != nil {
return fmt.Errorf("invalid manifest: invalid pubsub config: %w", err)
}

return nil
}
Expand All @@ -305,6 +320,7 @@ func LocalDevnetManifest() *Manifest {
Gpbft: DefaultGpbftConfig,
CertificateExchange: DefaultCxConfig,
CatchUpAlignment: DefaultCatchUpAlignment,
PubSub: DefaultPubSubConfig,
}
return m
}
Expand Down
75 changes: 75 additions & 0 deletions msg_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package f3

import (
"bytes"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/klauspost/compress/zstd"
)

var (
_ gMessageEncoding = (*cborGMessageEncoding)(nil)
_ gMessageEncoding = (*zstdGMessageEncoding)(nil)
)

type gMessageEncoding interface {
Encode(*gpbft.GMessage) ([]byte, error)
Decode([]byte) (*gpbft.GMessage, error)
}

type cborGMessageEncoding struct{}

func (c *cborGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
var buf bytes.Buffer
if err := m.MarshalCBOR(&buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (c *cborGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
r := bytes.NewReader(v)
var msg gpbft.GMessage
if err := msg.UnmarshalCBOR(r); err != nil {
return nil, err
}
return &msg, nil
}

type zstdGMessageEncoding struct {
cborEncoding cborGMessageEncoding
compressor *zstd.Encoder
decompressor *zstd.Decoder
}

func newZstdGMessageEncoding() (*zstdGMessageEncoding, error) {
writer, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
reader, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}
return &zstdGMessageEncoding{
compressor: writer,
decompressor: reader,
}, nil
}

func (c *zstdGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if err != nil {
return nil, err
}
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
return compressed, err
}

func (c *zstdGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v)))
if err != nil {
return nil, err
}
return c.cborEncoding.Decode(cborEncoded)
}
Loading

0 comments on commit b04ede1

Please sign in to comment.