Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new dataapi server #955

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"fmt"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
Expand All @@ -13,6 +15,7 @@ import (
)

type Config struct {
ServerVersion uint
AwsClientConfig aws.ClientConfig
BlobstoreConfig blobstore.Config
EthClientConfig geth.EthClientConfig
Expand All @@ -37,6 +40,11 @@ type Config struct {
}

func NewConfig(ctx *cli.Context) (Config, error) {
version := ctx.GlobalUint(flags.DataApiServerVersionFlag.Name)
if version != 1 && version != 2 {
return Config{}, fmt.Errorf("unknown server version %d, must be in [1, 2]", version)
}

loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix)
if err != nil {
return Config{}, err
Expand All @@ -56,6 +64,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
ServerMode: ctx.GlobalString(flags.ServerModeFlag.Name),
ServerVersion: version,
PrometheusConfig: prometheus.Config{
ServerURL: ctx.GlobalString(flags.PrometheusServerURLFlag.Name),
Username: ctx.GlobalString(flags.PrometheusServerUsernameFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ var (
Value: "9100",
EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_HTTP_PORT"),
}
DataApiServerVersionFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "dataapi-version"),
Usage: "DataApi server version. Options are 1 and 2.",
Required: false,
Value: 1,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DATA_API_VERSION"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -156,6 +163,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
ServerModeFlag,
MetricsHTTPPort,
DataApiServerVersionFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
31 changes: 30 additions & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
"github.com/Layr-Labs/eigensdk-go/logging"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/urfave/cli"
Expand Down Expand Up @@ -127,6 +129,33 @@ func RunDataApi(ctx *cli.Context) error {
logger.Info("Enabled metrics for Data Access API", "socket", httpSocket)
}

if config.ServerVersion == 2 {
blobMetadataStorev2 := blobstorev2.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName)
serverv2 := dataapi.NewServerV2(
dataapi.Config{
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
AllowOrigins: config.AllowOrigins,
DisperserHostname: config.DisperserHostname,
ChurnerHostname: config.ChurnerHostname,
BatcherHealthEndpt: config.BatcherHealthEndpt,
},
blobMetadataStorev2,
promClient,
subgraphClient,
tx,
chainState,
indexedChainState,
logger,
metrics,
)
return runServer(serverv2, logger)
}

return runServer(server, logger)
}

func runServer[T dataapi.ServerInterface](server T, logger logging.Logger) error {
// Setup channel to listen for termination signals
quit := make(chan os.Signal, 1)
// catch SIGINT (Ctrl+C) and SIGTERM (e.g., from `kill`)
Expand All @@ -142,7 +171,7 @@ func RunDataApi(ctx *cli.Context) error {
// Block until a signal is received.
<-quit
logger.Info("Shutting down server...")
err = server.Shutdown()
err := server.Shutdown()

if err != nil {
logger.Errorf("Failed to shutdown server: %v", err)
Expand Down
178 changes: 178 additions & 0 deletions disperser/dataapi/server_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package dataapi

import (
"errors"
"net/http"
"os"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
swaggerfiles "github.com/swaggo/files"
ginswagger "github.com/swaggo/gin-swagger"
)

type ServerInterface interface {
Start() error
Shutdown() error
}

type serverv2 struct {
serverMode string
socketAddr string
allowOrigins []string
logger logging.Logger

blobMetadataStore *blobstore.BlobMetadataStore
subgraphClient SubgraphClient
chainReader core.Reader
chainState core.ChainState
indexedChainState core.IndexedChainState
promClient PrometheusClient
metrics *Metrics
}

func NewServerV2(
config Config,
blobMetadataStore *blobstore.BlobMetadataStore,
promClient PrometheusClient,
subgraphClient SubgraphClient,
chainReader core.Reader,
chainState core.ChainState,
indexedChainState core.IndexedChainState,
logger logging.Logger,
metrics *Metrics,
) *serverv2 {
return &serverv2{
logger: logger.With("component", "DataAPIServerV2"),
serverMode: config.ServerMode,
socketAddr: config.SocketAddr,
allowOrigins: config.AllowOrigins,
blobMetadataStore: blobMetadataStore,
promClient: promClient,
subgraphClient: subgraphClient,
chainReader: chainReader,
chainState: chainState,
indexedChainState: indexedChainState,
metrics: metrics,
}
}

func (s *serverv2) Start() error {
if s.serverMode == gin.ReleaseMode {
// optimize performance and disable debug features.
gin.SetMode(gin.ReleaseMode)
}

router := gin.New()
basePath := "/api/v2"
docs.SwaggerInfo.BasePath = basePath
docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST")
v2 := router.Group(basePath)
{
feed := v2.Group("/feed")
{
// Blob feed
feed.GET("/blobs", s.FetchBlobsHandler)
feed.GET("/blobs/:blob_key", s.FetchBlobHandler)
// Batch feed
feed.GET("/batches", s.FetchBatchesHandler)
feed.GET("/batches/:batch_header_hash", s.FetchBatchHandler)
}
operators := v2.Group("/operators")
{
operators.GET("/non-signers", s.FetchNonSingers)
operators.GET("/stake", s.FetchOperatorsStake)
operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo)
operators.GET("/reachability", s.CheckOperatorsReachability)
}
metrics := v2.Group("/metrics")
{
metrics.GET("/overview", s.FetchMetricsOverviewHandler)
metrics.GET("/throughput", s.FetchMetricsThroughputHandler)
}
swagger := v2.Group("/swagger")
{
swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler))
}
}

router.GET("/", func(g *gin.Context) {
g.JSON(http.StatusAccepted, gin.H{"status": "OK"})
})

router.Use(logger.SetLogger(
logger.WithSkipPath([]string{"/"}),
))

config := cors.DefaultConfig()
config.AllowOrigins = s.allowOrigins
config.AllowCredentials = true
config.AllowMethods = []string{"GET", "POST", "HEAD", "OPTIONS"}

if s.serverMode != gin.ReleaseMode {
config.AllowOrigins = []string{"*"}
}
router.Use(cors.New(config))

srv := &http.Server{
Addr: s.socketAddr,
Handler: router,
ReadTimeout: 5 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 20 * time.Second,
IdleTimeout: 120 * time.Second,
}

errChan := run(s.logger, srv)
return <-errChan
}

func (s *serverv2) Shutdown() error {
return nil
}

func (s *serverv2) FetchBlobsHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBlobsHandler unimplemented"))
}

func (s *serverv2) FetchBlobHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBlobHandler unimplemented"))
}

func (s *serverv2) FetchBatchesHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBatchesHandler unimplemented"))
}

func (s *serverv2) FetchBatchHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchBatchHandler unimplemented"))
}

func (s *serverv2) FetchOperatorsStake(c *gin.Context) {
errorResponse(c, errors.New("FetchOperatorsStake unimplemented"))
}

func (s *serverv2) FetchOperatorsNodeInfo(c *gin.Context) {
errorResponse(c, errors.New("FetchOperatorsNodeInfo unimplemented"))
}

func (s *serverv2) CheckOperatorsReachability(c *gin.Context) {
errorResponse(c, errors.New("CheckOperatorsReachability unimplemented"))
}

func (s *serverv2) FetchNonSingers(c *gin.Context) {
errorResponse(c, errors.New("FetchNonSingers unimplemented"))
}

func (s *serverv2) FetchMetricsOverviewHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented"))
}

func (s *serverv2) FetchMetricsThroughputHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented"))
}
Loading