diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index 11ed9c8c33..2dfcca2e2b 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -13,6 +15,7 @@ import ( ) type Config struct { + ServerVersion uint AwsClientConfig aws.ClientConfig BlobstoreConfig blobstore.Config EthClientConfig geth.EthClientConfig @@ -37,6 +40,11 @@ type Config struct { } func NewConfig(ctx *cli.Context) (Config, error) { + version := ctx.GlobalUint(flags.DataApiServerVersionFlag.Name) + if version != 1 && version != 2 { + return Config{}, fmt.Errorf("unknown server version %d, must be in [1, 2]", version) + } + loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) if err != nil { return Config{}, err @@ -56,6 +64,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), ServerMode: ctx.GlobalString(flags.ServerModeFlag.Name), + ServerVersion: version, PrometheusConfig: prometheus.Config{ ServerURL: ctx.GlobalString(flags.PrometheusServerURLFlag.Name), Username: ctx.GlobalString(flags.PrometheusServerUsernameFlag.Name), diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index 8a77bcfb17..b38070dfb2 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -132,6 +132,13 @@ var ( Value: "9100", EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_HTTP_PORT"), } + DataApiServerVersionFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "dataapi-version"), + Usage: "DataApi server version. Options are 1 and 2.", + Required: false, + Value: 1, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DATA_API_VERSION"), + } ) var requiredFlags = []cli.Flag{ @@ -156,6 +163,7 @@ var requiredFlags = []cli.Flag{ var optionalFlags = []cli.Flag{ ServerModeFlag, MetricsHTTPPort, + DataApiServerVersionFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 2f46b05aa7..7053639e0c 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -16,9 +16,11 @@ import ( "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" + "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/urfave/cli" @@ -127,6 +129,33 @@ func RunDataApi(ctx *cli.Context) error { logger.Info("Enabled metrics for Data Access API", "socket", httpSocket) } + if config.ServerVersion == 2 { + blobMetadataStorev2 := blobstorev2.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName) + serverv2 := dataapi.NewServerV2( + dataapi.Config{ + ServerMode: config.ServerMode, + SocketAddr: config.SocketAddr, + AllowOrigins: config.AllowOrigins, + DisperserHostname: config.DisperserHostname, + ChurnerHostname: config.ChurnerHostname, + BatcherHealthEndpt: config.BatcherHealthEndpt, + }, + blobMetadataStorev2, + promClient, + subgraphClient, + tx, + chainState, + indexedChainState, + logger, + metrics, + ) + return runServer(serverv2, logger) + } + + return runServer(server, logger) +} + +func runServer[T dataapi.ServerInterface](server T, logger logging.Logger) error { // Setup channel to listen for termination signals quit := make(chan os.Signal, 1) // catch SIGINT (Ctrl+C) and SIGTERM (e.g., from `kill`) @@ -142,7 +171,7 @@ func RunDataApi(ctx *cli.Context) error { // Block until a signal is received. <-quit logger.Info("Shutting down server...") - err = server.Shutdown() + err := server.Shutdown() if err != nil { logger.Errorf("Failed to shutdown server: %v", err) diff --git a/disperser/dataapi/server_v2.go b/disperser/dataapi/server_v2.go new file mode 100644 index 0000000000..cd5377b49c --- /dev/null +++ b/disperser/dataapi/server_v2.go @@ -0,0 +1,178 @@ +package dataapi + +import ( + "errors" + "net/http" + "os" + "time" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/disperser/dataapi/docs" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/gin-contrib/cors" + "github.com/gin-contrib/logger" + "github.com/gin-gonic/gin" + swaggerfiles "github.com/swaggo/files" + ginswagger "github.com/swaggo/gin-swagger" +) + +type ServerInterface interface { + Start() error + Shutdown() error +} + +type serverv2 struct { + serverMode string + socketAddr string + allowOrigins []string + logger logging.Logger + + blobMetadataStore *blobstore.BlobMetadataStore + subgraphClient SubgraphClient + chainReader core.Reader + chainState core.ChainState + indexedChainState core.IndexedChainState + promClient PrometheusClient + metrics *Metrics +} + +func NewServerV2( + config Config, + blobMetadataStore *blobstore.BlobMetadataStore, + promClient PrometheusClient, + subgraphClient SubgraphClient, + chainReader core.Reader, + chainState core.ChainState, + indexedChainState core.IndexedChainState, + logger logging.Logger, + metrics *Metrics, +) *serverv2 { + return &serverv2{ + logger: logger.With("component", "DataAPIServerV2"), + serverMode: config.ServerMode, + socketAddr: config.SocketAddr, + allowOrigins: config.AllowOrigins, + blobMetadataStore: blobMetadataStore, + promClient: promClient, + subgraphClient: subgraphClient, + chainReader: chainReader, + chainState: chainState, + indexedChainState: indexedChainState, + metrics: metrics, + } +} + +func (s *serverv2) Start() error { + if s.serverMode == gin.ReleaseMode { + // optimize performance and disable debug features. + gin.SetMode(gin.ReleaseMode) + } + + router := gin.New() + basePath := "/api/v2" + docs.SwaggerInfo.BasePath = basePath + docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST") + v2 := router.Group(basePath) + { + feed := v2.Group("/feed") + { + // Blob feed + feed.GET("/blobs", s.FetchBlobsHandler) + feed.GET("/blobs/:blob_key", s.FetchBlobHandler) + // Batch feed + feed.GET("/batches", s.FetchBatchesHandler) + feed.GET("/batches/:batch_header_hash", s.FetchBatchHandler) + } + operators := v2.Group("/operators") + { + operators.GET("/non-signers", s.FetchNonSingers) + operators.GET("/stake", s.FetchOperatorsStake) + operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) + operators.GET("/reachability", s.CheckOperatorsReachability) + } + metrics := v2.Group("/metrics") + { + metrics.GET("/overview", s.FetchMetricsOverviewHandler) + metrics.GET("/throughput", s.FetchMetricsThroughputHandler) + } + swagger := v2.Group("/swagger") + { + swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler)) + } + } + + router.GET("/", func(g *gin.Context) { + g.JSON(http.StatusAccepted, gin.H{"status": "OK"}) + }) + + router.Use(logger.SetLogger( + logger.WithSkipPath([]string{"/"}), + )) + + config := cors.DefaultConfig() + config.AllowOrigins = s.allowOrigins + config.AllowCredentials = true + config.AllowMethods = []string{"GET", "POST", "HEAD", "OPTIONS"} + + if s.serverMode != gin.ReleaseMode { + config.AllowOrigins = []string{"*"} + } + router.Use(cors.New(config)) + + srv := &http.Server{ + Addr: s.socketAddr, + Handler: router, + ReadTimeout: 5 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + WriteTimeout: 20 * time.Second, + IdleTimeout: 120 * time.Second, + } + + errChan := run(s.logger, srv) + return <-errChan +} + +func (s *serverv2) Shutdown() error { + return nil +} + +func (s *serverv2) FetchBlobsHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBlobsHandler unimplemented")) +} + +func (s *serverv2) FetchBlobHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBlobHandler unimplemented")) +} + +func (s *serverv2) FetchBatchesHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBatchesHandler unimplemented")) +} + +func (s *serverv2) FetchBatchHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchBatchHandler unimplemented")) +} + +func (s *serverv2) FetchOperatorsStake(c *gin.Context) { + errorResponse(c, errors.New("FetchOperatorsStake unimplemented")) +} + +func (s *serverv2) FetchOperatorsNodeInfo(c *gin.Context) { + errorResponse(c, errors.New("FetchOperatorsNodeInfo unimplemented")) +} + +func (s *serverv2) CheckOperatorsReachability(c *gin.Context) { + errorResponse(c, errors.New("CheckOperatorsReachability unimplemented")) +} + +func (s *serverv2) FetchNonSingers(c *gin.Context) { + errorResponse(c, errors.New("FetchNonSingers unimplemented")) +} + +func (s *serverv2) FetchMetricsOverviewHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented")) +} + +func (s *serverv2) FetchMetricsThroughputHandler(c *gin.Context) { + errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented")) +}