diff --git a/go.mod b/go.mod index 50668a7b..d81ff424 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,14 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/sirupsen/logrus v1.9.1 github.com/spf13/cobra v1.6.1 + github.com/stretchr/testify v1.8.4 gopkg.in/yaml.v2 v2.4.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.16.0 // indirect github.com/ferranbt/fastssz v0.1.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -46,6 +48,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect @@ -68,4 +71,5 @@ require ( golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/api/handler.go b/pkg/api/handler.go index d88aa4b4..319600f3 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -51,6 +51,7 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error router.GET("/eth/v1/beacon/blocks/:block_id/root", h.wrappedHandler(h.handleEthV1BeaconBlocksRoot)) router.GET("/eth/v1/beacon/states/:state_id/finality_checkpoints", h.wrappedHandler(h.handleEthV1BeaconStatesFinalityCheckpoints)) router.GET("/eth/v1/beacon/deposit_snapshot", h.wrappedHandler(h.handleEthV1BeaconDepositSnapshot)) + router.GET("/eth/v1/beacon/blob_sidecars/:block_id", h.wrappedHandler(h.handleEthV1BeaconBlobSidecars)) router.GET("/eth/v1/config/spec", h.wrappedHandler(h.handleEthV1ConfigSpec)) router.GET("/eth/v1/config/deposit_contract", h.wrappedHandler(h.handleEthV1ConfigDepositContract)) @@ -587,3 +588,34 @@ func (h *Handler) handleEthV1BeaconDepositSnapshot(ctx context.Context, r *http. }, }), nil } + +func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { + if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil { + return NewUnsupportedMediaTypeResponse(nil), err + } + + id, err := eth.NewBlockIdentifier(p.ByName("block_id")) + if err != nil { + return NewBadRequestResponse(nil), err + } + + sidecars, err := h.eth.BlobSidecars(ctx, id) + if err != nil { + return NewInternalServerErrorResponse(nil), err + } + + rsp := NewSuccessResponse(ContentTypeResolvers{ + ContentTypeJSON: func() ([]byte, error) { + return json.Marshal(sidecars) + }, + }) + + switch id.Type() { + case eth.BlockIDFinalized, eth.BlockIDRoot: + rsp.SetCacheControl("public, s-max-age=6000") + default: + rsp.SetCacheControl("public, s-max-age=15") + } + + return rsp, nil +} diff --git a/pkg/beacon/config.go b/pkg/beacon/config.go index 453de0f3..d818eb9b 100644 --- a/pkg/beacon/config.go +++ b/pkg/beacon/config.go @@ -24,11 +24,13 @@ type Config struct { // Cache configuration holds configuration for the caches. type CacheConfig struct { // Blocks holds the block cache configuration. - Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 200}"` + Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 30}"` // States holds the state cache configuration. States store.Config `yaml:"states" default:"{\"MaxItems\": 5}"` // DepositSnapshots holds the deposit snapshot cache configuration. - DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 50}"` + DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 30}"` + // BlobSidecars holds the blob sidecar cache configuration. + BlobSidecars store.Config `yaml:"blob_sidecars" default:"{\"MaxItems\": 30}"` } type FrontendConfig struct { diff --git a/pkg/beacon/default.go b/pkg/beacon/default.go index 05b3b057..a201c8ed 100644 --- a/pkg/beacon/default.go +++ b/pkg/beacon/default.go @@ -9,6 +9,7 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/chuckpreslar/emission" "github.com/ethpandaops/beacon/pkg/beacon" @@ -38,6 +39,7 @@ type Default struct { blocks *store.Block states *store.BeaconState depositSnapshots *store.DepositSnapshot + blobSidecars *store.BlobSidecar spec *state.Spec genesis *v1.Genesis @@ -79,6 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C blocks: store.NewBlock(log, config.Caches.Blocks, namespace), states: store.NewBeaconState(log, config.Caches.States, namespace), depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace), + blobSidecars: store.NewBlobSidecar(log, config.Caches.BlobSidecars, namespace), servingMutex: sync.Mutex{}, historicalMutex: sync.Mutex{}, @@ -560,6 +563,10 @@ func (d *Default) GetBlockByStateRoot(ctx context.Context, stateRoot phase0.Root return block, nil } +func (d *Default) GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error) { + return d.blobSidecars.GetBySlot(slot) +} + func (d *Default) GetBeaconStateBySlot(ctx context.Context, slot phase0.Slot) (*[]byte, error) { block, err := d.GetBlockBySlot(ctx, slot) if err != nil { diff --git a/pkg/beacon/download.go b/pkg/beacon/download.go index 1224f084..7074c020 100644 --- a/pkg/beacon/download.go +++ b/pkg/beacon/download.go @@ -311,30 +311,9 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N } if d.shouldDownloadStates() { - // If the state already exists, don't bother downloading it again. - existingState, err := d.states.GetByStateRoot(stateRoot) - if err == nil && existingState != nil { - d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name) - - return block, nil - } - - beaconState, err := upstream.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream") - if err != nil { - return nil, fmt.Errorf("failed to fetch beacon state: %w", err) - } - - if beaconState == nil { - return nil, errors.New("beacon state is nil") - } - - expiresAt := time.Now().Add(FinalityHaltedServingPeriod) - if slot == phase0.Slot(0) { - expiresAt = time.Now().Add(999999 * time.Hour) - } - - if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil { - return nil, fmt.Errorf("failed to store beacon state: %w", err) + // Download and store beacon state + if err = d.downloadAndStoreBeaconState(ctx, stateRoot, slot, upstream); err != nil { + return nil, fmt.Errorf("failed to download and store beacon state: %w", err) } } @@ -342,16 +321,59 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N epoch := phase0.Epoch(slot / d.spec.SlotsPerEpoch) // Download and store deposit snapshots - if err := d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil { + if err = d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil { return nil, fmt.Errorf("failed to download and store deposit snapshot: %w", err) } } + sp, err := upstream.Beacon.Spec() + if err != nil { + return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err) + } + + denebFork, err := sp.ForkEpochs.GetByName("DENEB") + if err == nil && denebFork != nil { + if denebFork.Active(slot, sp.SlotsPerEpoch) { + // Download and store blob sidecars + if err := d.downloadAndStoreBlobSidecars(ctx, slot, upstream); err != nil { + return nil, fmt.Errorf("failed to download and store blob sidecars: %w", err) + } + } + } + d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name) return block, nil } +func (d *Default) downloadAndStoreBeaconState(ctx context.Context, stateRoot phase0.Root, slot phase0.Slot, node *Node) error { + // If the state already exists, don't bother downloading it again. + existingState, err := d.states.GetByStateRoot(stateRoot) + if err == nil && existingState != nil { + return nil + } + + beaconState, err := node.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream") + if err != nil { + return fmt.Errorf("failed to fetch beacon state: %w", err) + } + + if beaconState == nil { + return errors.New("beacon state is nil") + } + + expiresAt := time.Now().Add(FinalityHaltedServingPeriod) + if slot == phase0.Slot(0) { + expiresAt = time.Now().Add(999999 * time.Hour) + } + + if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil { + return fmt.Errorf("failed to store beacon state: %w", err) + } + + return nil +} + func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch phase0.Epoch, node *Node) error { // Check if we already have the deposit snapshot. if _, err := d.depositSnapshots.GetByEpoch(epoch); err == nil { @@ -386,3 +408,37 @@ func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch pha return nil } + +func (d *Default) downloadAndStoreBlobSidecars(ctx context.Context, slot phase0.Slot, node *Node) error { + // Check if we already have the blob sidecars. + if _, err := d.blobSidecars.GetBySlot(slot); err == nil { + return nil + } + + // Download the blob sidecars from our upstream. + blobSidecars, err := node.Beacon.FetchBeaconBlockBlobs(ctx, eth.SlotAsString(slot)) + if err != nil { + return err + } + + if blobSidecars == nil { + return errors.New("invalid blob sidecars") + } + + // Store for the FinalityHaltedServingPeriod to ensure we have them in case of non-finality. + // We'll let the store handle purging old items. + expiresAt := time.Now().Add(FinalityHaltedServingPeriod) + + if err := d.blobSidecars.Add(slot, blobSidecars, expiresAt); err != nil { + return fmt.Errorf("failed to store blob sidecars: %w", err) + } + + d.log. + WithFields(logrus.Fields{ + "slot": slot, + "node": node.Config.Name, + }). + Infof("Downloaded and stored blob sidecar for slot %d", slot) + + return nil +} diff --git a/pkg/beacon/finality_provider.go b/pkg/beacon/finality_provider.go index 8d26db22..f3eb0c88 100644 --- a/pkg/beacon/finality_provider.go +++ b/pkg/beacon/finality_provider.go @@ -5,6 +5,7 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/beacon/pkg/beacon/api/types" "github.com/ethpandaops/beacon/pkg/beacon/state" @@ -47,6 +48,8 @@ type FinalityProvider interface { GetBeaconStateByStateRoot(ctx context.Context, root phase0.Root) (*[]byte, error) // GetBeaconStateByRoot returns the beacon sate with the given root. GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (*[]byte, error) + // GetBlobSidecarsBySlot returns the blob sidecars for the given slot. + GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error) // ListFinalizedSlots returns a slice of finalized slots. ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) // GetEpochBySlot returns the epoch for the given slot. diff --git a/pkg/beacon/store/blob_sidecars.go b/pkg/beacon/store/blob_sidecars.go new file mode 100644 index 00000000..d0ed611d --- /dev/null +++ b/pkg/beacon/store/blob_sidecars.go @@ -0,0 +1,63 @@ +package store + +import ( + "errors" + "time" + + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethpandaops/checkpointz/pkg/cache" + "github.com/ethpandaops/checkpointz/pkg/eth" + "github.com/sirupsen/logrus" +) + +type BlobSidecar struct { + store *cache.TTLMap + log logrus.FieldLogger +} + +func NewBlobSidecar(log logrus.FieldLogger, config Config, namespace string) *BlobSidecar { + d := &BlobSidecar{ + log: log.WithField("component", "beacon/store/blob_sidecar"), + store: cache.NewTTLMap(config.MaxItems, "blob_sidecar", namespace), + } + + d.store.OnItemDeleted(func(key string, value interface{}, expiredAt time.Time) { + d.log.WithField("key", key).WithField("expired_at", expiredAt.String()).Debug("Blob sidecar was deleted from the cache") + }) + + d.store.EnableMetrics(namespace) + + return d +} + +func (d *BlobSidecar) Add(slot phase0.Slot, sidecars []*deneb.BlobSidecar, expiresAt time.Time) error { + d.store.Add(eth.SlotAsString(slot), sidecars, expiresAt, false) + + d.log.WithFields( + logrus.Fields{ + "slot": eth.SlotAsString(slot), + "expires_at": expiresAt.String(), + }, + ).Debug("Added blob sidecar") + + return nil +} + +func (d *BlobSidecar) GetBySlot(slot phase0.Slot) ([]*deneb.BlobSidecar, error) { + data, _, err := d.store.Get(eth.SlotAsString(slot)) + if err != nil { + return nil, err + } + + return d.parseSidecar(data) +} + +func (d *BlobSidecar) parseSidecar(data interface{}) ([]*deneb.BlobSidecar, error) { + sidecar, ok := data.([]*deneb.BlobSidecar) + if !ok { + return nil, errors.New("invalid blob sidecar type") + } + + return sidecar, nil +} diff --git a/pkg/beacon/store/blob_sidecars_test.go b/pkg/beacon/store/blob_sidecars_test.go new file mode 100644 index 00000000..18bc9434 --- /dev/null +++ b/pkg/beacon/store/blob_sidecars_test.go @@ -0,0 +1,47 @@ +package store + +import ( + "testing" + "time" + + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" +) + +func TestBlobSidecarAddAndGet(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_a" + blobSidecarStore := NewBlobSidecar(logger, config, namespace) + + slot := phase0.Slot(100) + expiresAt := time.Now().Add(10 * time.Minute) + sidecars := []*deneb.BlobSidecar{ + { + Blob: deneb.Blob{}, + }, + } + + err := blobSidecarStore.Add(slot, sidecars, expiresAt) + assert.NoError(t, err) + + retrievedSidecars, err := blobSidecarStore.GetBySlot(slot) + assert.NoError(t, err) + assert.NotNil(t, retrievedSidecars) + assert.Equal(t, sidecars, retrievedSidecars) +} + +func TestBlobSidecarGetBySlotNotFound(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_b" + blobSidecarStore := NewBlobSidecar(logger, config, namespace) + + slot := phase0.Slot(200) + + retrievedSidecars, err := blobSidecarStore.GetBySlot(slot) + assert.Error(t, err) + assert.Nil(t, retrievedSidecars) +} diff --git a/pkg/service/eth/eth.go b/pkg/service/eth/eth.go index f1981f9f..113f7c78 100644 --- a/pkg/service/eth/eth.go +++ b/pkg/service/eth/eth.go @@ -6,6 +6,7 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/beacon/pkg/beacon/api/types" "github.com/ethpandaops/beacon/pkg/beacon/state" @@ -432,3 +433,110 @@ func (h *Handler) BlockRoot(ctx context.Context, blockID BlockIdentifier) (phase return phase0.Root{}, fmt.Errorf("invalid block id: %v", blockID.String()) } } + +// BlobSidecars returns the blob sidecars for the given block ID. +func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([]*deneb.BlobSidecar, error) { + var err error + + const call = "blob_sidecars" + + h.metrics.ObserveCall(call, blockID.Type().String()) + + defer func() { + if err != nil { + h.metrics.ObserveErrorCall(call, blockID.Type().String()) + } + }() + + slot := phase0.Slot(0) + + switch blockID.Type() { + case BlockIDGenesis: + block, err := h.provider.GetBlockBySlot(ctx, phase0.Slot(0)) + if err != nil { + return nil, err + } + + if block == nil { + return nil, fmt.Errorf("no genesis block") + } + + sl, err := block.Slot() + if err != nil { + return nil, err + } + + slot = sl + case BlockIDSlot: + sslot, err := NewSlotFromString(blockID.Value()) + if err != nil { + return nil, err + } + + block, err := h.provider.GetBlockBySlot(ctx, sslot) + if err != nil { + return nil, err + } + + if block == nil { + return nil, fmt.Errorf("no block for slot %v", sslot) + } + + sl, err := block.Slot() + if err != nil { + return nil, err + } + + slot = sl + case BlockIDRoot: + root, err := blockID.AsRoot() + if err != nil { + return nil, err + } + + block, err := h.provider.GetBlockByRoot(ctx, root) + if err != nil { + return nil, err + } + + if block == nil { + return nil, fmt.Errorf("no block for root %v", root) + } + + sl, err := block.Slot() + if err != nil { + return nil, err + } + + slot = sl + case BlockIDFinalized: + finality, err := h.provider.Finalized(ctx) + if err != nil { + return nil, err + } + + if finality == nil || finality.Finalized == nil { + return nil, fmt.Errorf("no finality") + } + + block, err := h.provider.GetBlockByRoot(ctx, finality.Finalized.Root) + if err != nil { + return nil, err + } + + if block == nil { + return nil, fmt.Errorf("no block for finalized root %v", finality.Finalized.Root) + } + + sl, err := block.Slot() + if err != nil { + return nil, err + } + + slot = sl + default: + return nil, fmt.Errorf("invalid block id: %v", blockID.String()) + } + + return h.provider.GetBlobSidecarsBySlot(ctx, slot) +}