Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Serve blob_sidecar #155

Merged
merged 14 commits into from
Feb 20, 2024
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.1
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.4
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -46,6 +48,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand All @@ -68,4 +71,5 @@ require (
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
32 changes: 32 additions & 0 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error
router.GET("/eth/v1/beacon/blocks/:block_id/root", h.wrappedHandler(h.handleEthV1BeaconBlocksRoot))
router.GET("/eth/v1/beacon/states/:state_id/finality_checkpoints", h.wrappedHandler(h.handleEthV1BeaconStatesFinalityCheckpoints))
router.GET("/eth/v1/beacon/deposit_snapshot", h.wrappedHandler(h.handleEthV1BeaconDepositSnapshot))
router.GET("/eth/v1/beacon/blob_sidecars/:block_id", h.wrappedHandler(h.handleEthV1BeaconBlobSidecars))

router.GET("/eth/v1/config/spec", h.wrappedHandler(h.handleEthV1ConfigSpec))
router.GET("/eth/v1/config/deposit_contract", h.wrappedHandler(h.handleEthV1ConfigDepositContract))
Expand Down Expand Up @@ -587,3 +588,34 @@ func (h *Handler) handleEthV1BeaconDepositSnapshot(ctx context.Context, r *http.
},
}), nil
}

func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil {
return NewUnsupportedMediaTypeResponse(nil), err
}

id, err := eth.NewBlockIdentifier(p.ByName("block_id"))
if err != nil {
return NewBadRequestResponse(nil), err
}

sidecars, err := h.eth.BlobSidecars(ctx, id)
if err != nil {
return NewInternalServerErrorResponse(nil), err
}

rsp := NewSuccessResponse(ContentTypeResolvers{
ContentTypeJSON: func() ([]byte, error) {
return json.Marshal(sidecars)
},
})

switch id.Type() {
case eth.BlockIDFinalized, eth.BlockIDRoot:
rsp.SetCacheControl("public, s-max-age=6000")
default:
rsp.SetCacheControl("public, s-max-age=15")
}

return rsp, nil
}
6 changes: 4 additions & 2 deletions pkg/beacon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type Config struct {
// Cache configuration holds configuration for the caches.
type CacheConfig struct {
// Blocks holds the block cache configuration.
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 200}"`
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 30}"`
// States holds the state cache configuration.
States store.Config `yaml:"states" default:"{\"MaxItems\": 5}"`
// DepositSnapshots holds the deposit snapshot cache configuration.
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 50}"`
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 30}"`
// BlobSidecars holds the blob sidecar cache configuration.
BlobSidecars store.Config `yaml:"blob_sidecars" default:"{\"MaxItems\": 30}"`
}

type FrontendConfig struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/chuckpreslar/emission"
"github.com/ethpandaops/beacon/pkg/beacon"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Default struct {
blocks *store.Block
states *store.BeaconState
depositSnapshots *store.DepositSnapshot
blobSidecars *store.BlobSidecar

spec *state.Spec
genesis *v1.Genesis
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
blocks: store.NewBlock(log, config.Caches.Blocks, namespace),
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),
blobSidecars: store.NewBlobSidecar(log, config.Caches.BlobSidecars, namespace),

servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
Expand Down Expand Up @@ -560,6 +563,10 @@ func (d *Default) GetBlockByStateRoot(ctx context.Context, stateRoot phase0.Root
return block, nil
}

func (d *Default) GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
return d.blobSidecars.GetBySlot(slot)
}

func (d *Default) GetBeaconStateBySlot(ctx context.Context, slot phase0.Slot) (*[]byte, error) {
block, err := d.GetBlockBySlot(ctx, slot)
if err != nil {
Expand Down
106 changes: 81 additions & 25 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,47 +311,69 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
}

if d.shouldDownloadStates() {
// If the state already exists, don't bother downloading it again.
existingState, err := d.states.GetByStateRoot(stateRoot)
if err == nil && existingState != nil {
d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)

return block, nil
}

beaconState, err := upstream.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream")
if err != nil {
return nil, fmt.Errorf("failed to fetch beacon state: %w", err)
}

if beaconState == nil {
return nil, errors.New("beacon state is nil")
}

expiresAt := time.Now().Add(FinalityHaltedServingPeriod)
if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
}

if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil {
return nil, fmt.Errorf("failed to store beacon state: %w", err)
// Download and store beacon state
if err = d.downloadAndStoreBeaconState(ctx, stateRoot, slot, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store beacon state: %w", err)
}
}

if slot != phase0.Slot(0) {
epoch := phase0.Epoch(slot / d.spec.SlotsPerEpoch)

// Download and store deposit snapshots
if err := d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil {
if err = d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store deposit snapshot: %w", err)
}
}

sp, err := upstream.Beacon.Spec()
if err != nil {
return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err)
}

denebFork, err := sp.ForkEpochs.GetByName("DENEB")
if err == nil && denebFork != nil {
if denebFork.Active(slot, sp.SlotsPerEpoch) {
// Download and store blob sidecars
if err := d.downloadAndStoreBlobSidecars(ctx, slot, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store blob sidecars: %w", err)
}
}
}

d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)

return block, nil
}

func (d *Default) downloadAndStoreBeaconState(ctx context.Context, stateRoot phase0.Root, slot phase0.Slot, node *Node) error {
// If the state already exists, don't bother downloading it again.
existingState, err := d.states.GetByStateRoot(stateRoot)
if err == nil && existingState != nil {
return nil
}

beaconState, err := node.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream")
if err != nil {
return fmt.Errorf("failed to fetch beacon state: %w", err)
}

if beaconState == nil {
return errors.New("beacon state is nil")
}

expiresAt := time.Now().Add(FinalityHaltedServingPeriod)
if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
}

if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil {
return fmt.Errorf("failed to store beacon state: %w", err)
}

return nil
}

func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch phase0.Epoch, node *Node) error {
// Check if we already have the deposit snapshot.
if _, err := d.depositSnapshots.GetByEpoch(epoch); err == nil {
Expand Down Expand Up @@ -386,3 +408,37 @@ func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch pha

return nil
}

func (d *Default) downloadAndStoreBlobSidecars(ctx context.Context, slot phase0.Slot, node *Node) error {
// Check if we already have the blob sidecars.
if _, err := d.blobSidecars.GetBySlot(slot); err == nil {
return nil
}

// Download the blob sidecars from our upstream.
blobSidecars, err := node.Beacon.FetchBeaconBlockBlobs(ctx, eth.SlotAsString(slot))
if err != nil {
return err
}

if blobSidecars == nil {
return errors.New("invalid blob sidecars")
}

// Store for the FinalityHaltedServingPeriod to ensure we have them in case of non-finality.
// We'll let the store handle purging old items.
expiresAt := time.Now().Add(FinalityHaltedServingPeriod)

if err := d.blobSidecars.Add(slot, blobSidecars, expiresAt); err != nil {
return fmt.Errorf("failed to store blob sidecars: %w", err)
}

d.log.
WithFields(logrus.Fields{
"slot": slot,
"node": node.Config.Name,
}).
Infof("Downloaded and stored blob sidecar for slot %d", slot)

return nil
}
3 changes: 3 additions & 0 deletions pkg/beacon/finality_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/beacon/pkg/beacon/api/types"
"github.com/ethpandaops/beacon/pkg/beacon/state"
Expand Down Expand Up @@ -47,6 +48,8 @@ type FinalityProvider interface {
GetBeaconStateByStateRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBeaconStateByRoot returns the beacon sate with the given root.
GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBlobSidecarsBySlot returns the blob sidecars for the given slot.
GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error)
// ListFinalizedSlots returns a slice of finalized slots.
ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error)
// GetEpochBySlot returns the epoch for the given slot.
Expand Down
63 changes: 63 additions & 0 deletions pkg/beacon/store/blob_sidecars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package store

import (
"errors"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/cache"
"github.com/ethpandaops/checkpointz/pkg/eth"
"github.com/sirupsen/logrus"
)

type BlobSidecar struct {
store *cache.TTLMap
log logrus.FieldLogger
}

func NewBlobSidecar(log logrus.FieldLogger, config Config, namespace string) *BlobSidecar {
d := &BlobSidecar{
log: log.WithField("component", "beacon/store/blob_sidecar"),
store: cache.NewTTLMap(config.MaxItems, "blob_sidecar", namespace),
}

d.store.OnItemDeleted(func(key string, value interface{}, expiredAt time.Time) {
d.log.WithField("key", key).WithField("expired_at", expiredAt.String()).Debug("Blob sidecar was deleted from the cache")
})

d.store.EnableMetrics(namespace)

return d
}

func (d *BlobSidecar) Add(slot phase0.Slot, sidecars []*deneb.BlobSidecar, expiresAt time.Time) error {
d.store.Add(eth.SlotAsString(slot), sidecars, expiresAt, false)

d.log.WithFields(
logrus.Fields{
"slot": eth.SlotAsString(slot),
"expires_at": expiresAt.String(),
},
).Debug("Added blob sidecar")

return nil
}

func (d *BlobSidecar) GetBySlot(slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
data, _, err := d.store.Get(eth.SlotAsString(slot))
if err != nil {
return nil, err
}

return d.parseSidecar(data)
}

func (d *BlobSidecar) parseSidecar(data interface{}) ([]*deneb.BlobSidecar, error) {
sidecar, ok := data.([]*deneb.BlobSidecar)
if !ok {
return nil, errors.New("invalid blob sidecar type")
}

return sidecar, nil
}
47 changes: 47 additions & 0 deletions pkg/beacon/store/blob_sidecars_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package store

import (
"testing"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

func TestBlobSidecarAddAndGet(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test_a"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(100)
expiresAt := time.Now().Add(10 * time.Minute)
sidecars := []*deneb.BlobSidecar{
{
Blob: deneb.Blob{},
},
}

err := blobSidecarStore.Add(slot, sidecars, expiresAt)
assert.NoError(t, err)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.NoError(t, err)
assert.NotNil(t, retrievedSidecars)
assert.Equal(t, sidecars, retrievedSidecars)
}

func TestBlobSidecarGetBySlotNotFound(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test_b"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(200)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.Error(t, err)
assert.Nil(t, retrievedSidecars)
}
Loading
Loading