From 7047e1be4861d24b516c65c474ef37309966a40e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:08:54 -0800 Subject: [PATCH 01/14] Get writes working first --- tools/traffic/config/config.go | 35 +-- tools/traffic/config/flags.go | 8 +- tools/traffic/config/worker_config.go | 3 +- tools/traffic/generator_v2.go | 148 ++-------- tools/traffic/table/blob_metadata.go | 9 +- tools/traffic/table/blob_store_test.go | 3 +- tools/traffic/workers/blob_reader.go | 233 ---------------- tools/traffic/workers/blob_reader_test.go | 151 ----------- tools/traffic/workers/blob_status_tracker.go | 256 ------------------ .../workers/blob_status_tracker_test.go | 205 -------------- tools/traffic/workers/blob_writer.go | 39 +-- tools/traffic/workers/blob_writer_test.go | 7 +- tools/traffic/workers/unconfirmed_key.go | 12 +- 13 files changed, 85 insertions(+), 1024 deletions(-) delete mode 100644 tools/traffic/workers/blob_reader.go delete mode 100644 tools/traffic/workers/blob_reader_test.go delete mode 100644 tools/traffic/workers/blob_status_tracker.go delete mode 100644 tools/traffic/workers/blob_status_tracker_test.go diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 9702c7be86..d415f75431 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -2,14 +2,12 @@ package config import ( "errors" - "fmt" "time" - "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/retriever" - - "github.com/Layr-Labs/eigenda/common" "github.com/urfave/cli" ) @@ -20,17 +18,14 @@ type Config struct { LoggingConfig common.LoggerConfig // Configuration for the disperser client. - DisperserClientConfig *clients.Config + DisperserClientConfig *clients.DisperserClientConfig - // Configuration for the retriever client. - RetrievalClientConfig *retriever.Config + // Signer private key + SignerPrivateKey string // Configuration for the graph. TheGraphConfig *thegraph.Config - // Configuration for the EigenDA client. - EigenDAClientConfig *clients.EigenDAClientConfig - // Configures the traffic generator workers. WorkerConfig WorkerConfig @@ -62,14 +57,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) { } config := &Config{ - DisperserClientConfig: &clients.Config{ + DisperserClientConfig: &clients.DisperserClientConfig{ Hostname: ctx.GlobalString(HostnameFlag.Name), Port: ctx.GlobalString(GrpcPortFlag.Name), - Timeout: ctx.Duration(TimeoutFlag.Name), UseSecureGrpcFlag: ctx.GlobalBool(UseSecureGrpcFlag.Name), }, - RetrievalClientConfig: retrieverConfig, + SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), TheGraphConfig: &thegraph.Config{ Endpoint: ctx.String(TheGraphUrlFlag.Name), @@ -77,12 +71,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { MaxRetries: ctx.Int(TheGraphRetriesFlag.Name), }, - EigenDAClientConfig: &clients.EigenDAClientConfig{ - RPC: fmt.Sprintf("%s:%s", ctx.GlobalString(HostnameFlag.Name), ctx.GlobalString(GrpcPortFlag.Name)), - SignerPrivateKeyHex: ctx.String(SignerPrivateKeyFlag.Name), - DisableTLS: ctx.GlobalBool(DisableTLSFlag.Name), - }, - LoggingConfig: *loggerConfig, MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name), @@ -107,19 +95,12 @@ func NewConfig(ctx *cli.Context) (*Config, error) { RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), - EigenDAServiceManager: retrieverConfig.EigenDAServiceManagerAddr, - SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), - CustomQuorums: customQuorumsUint8, + CustomQuorums: customQuorumsUint8, MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name), MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name), }, } - err = config.EigenDAClientConfig.CheckAndSetDefaults() - if err != nil { - return nil, err - } - return config, nil } diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index c4218e52cf..1b9709f78b 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -1,14 +1,13 @@ package config import ( + "time" + + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/indexer" - "github.com/Layr-Labs/eigenda/retriever/flags" - "time" - - "github.com/Layr-Labs/eigenda/common" "github.com/urfave/cli" ) @@ -261,7 +260,6 @@ var Flags []cli.Flag func init() { Flags = append(requiredFlags, optionalFlags...) - Flags = append(Flags, flags.RetrieverFlags(envPrefix)...) Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) Flags = append(Flags, geth.EthClientFlags(envPrefix)...) diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go index 78eb7a6fc2..0c6ee0eeb2 100644 --- a/tools/traffic/config/worker_config.go +++ b/tools/traffic/config/worker_config.go @@ -38,8 +38,7 @@ type WorkerConfig struct { // The address of the EigenDA service manager smart contract, in hex. EigenDAServiceManager string - // The private key to use for signing requests. - SignerPrivateKey string + // Custom quorum numbers to use for the traffic generator. CustomQuorums []uint8 diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index beec5e393b..68c1a6a9d5 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -9,23 +9,13 @@ import ( "syscall" "time" - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/Layr-Labs/eigenda/core/eth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" - retrivereth "github.com/Layr-Labs/eigenda/retriever/eth" + "github.com/Layr-Labs/eigenda/api/clients/v2" + "github.com/Layr-Labs/eigenda/common" + auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/Layr-Labs/eigensdk-go/logging" - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core" ) // Generator simulates read/write traffic to the DA service. @@ -37,7 +27,7 @@ import ( // └------------┘ └------------┘ // // The traffic generator is built from three principal components: one or more writers -// that write blobs, a statusTracker that polls the dispenser service until blobs are confirmed, +// that write blobs, a statusTracker that polls the disperser service until blobs are confirmed, // and one or more readers that read blobs. // // When a writer finishes writing a blob, it sends information about that blob to the statusTracker. @@ -50,12 +40,10 @@ type Generator struct { generatorMetrics metrics.Metrics logger *logging.Logger disperserClient clients.DisperserClient - eigenDAClient *clients.EigenDAClient - config *config.Config + // eigenDAClient *clients.EigenDAClient #TODO: Add this back in when the client is implemented + config *config.Config - writers []*workers.BlobWriter - statusTracker *workers.BlobStatusTracker - readers []*workers.BlobReader + writers []*workers.BlobWriter } func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { @@ -64,17 +52,22 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { return nil, err } - var signer core.BlobRequestSigner - if config.EigenDAClientConfig.SignerPrivateKeyHex != "" { - signer = auth.NewLocalBlobRequestSigner(config.EigenDAClientConfig.SignerPrivateKeyHex) - } + // var signer corev2.BlobRequestSigner - logger2 := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) - client, err := clients.NewEigenDAClient(logger2, *config.EigenDAClientConfig) - if err != nil { - return nil, err + var signer *auth.LocalBlobRequestSigner + if config.SignerPrivateKey != "" { + signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKey) + } else { + logger.Error("signer private key is required") + return nil, fmt.Errorf("signer private key is required") } + // logger2 := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) + // client, err := clients.NewEigenDAClient(logger2, *config.EigenDAClientConfig) + // if err != nil { + // return nil, err + // } + ctx, cancel := context.WithCancel(context.Background()) waitGroup := sync.WaitGroup{} @@ -84,25 +77,14 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { config.WorkerConfig.MetricsBlacklist, config.WorkerConfig.MetricsFuzzyBlacklist) - blobTable := table.NewBlobStore() - - unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100) + uncertifiedKeyChannel := make(chan *workers.UncertifiedKey, 100) // TODO: create a dedicated reservation for traffic generator - disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer) + disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() return nil, fmt.Errorf("new disperser-client: %w", err) } - statusVerifier := workers.NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - unconfirmedKeyChannel, - blobTable, - disperserClient, - generatorMetrics) writers := make([]*workers.BlobWriter, 0) for i := 0; i < int(config.WorkerConfig.NumWriteInstances); i++ { @@ -112,27 +94,11 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { logger, &config.WorkerConfig, disperserClient, - unconfirmedKeyChannel, + uncertifiedKeyChannel, generatorMetrics) writers = append(writers, &writer) } - retriever, chainClient := buildRetriever(config) - - readers := make([]*workers.BlobReader, 0) - for i := 0; i < int(config.WorkerConfig.NumReadInstances); i++ { - reader := workers.NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig, - retriever, - chainClient, - blobTable, - generatorMetrics) - readers = append(readers, &reader) - } - return &Generator{ ctx: &ctx, cancel: &cancel, @@ -140,83 +106,27 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { generatorMetrics: generatorMetrics, logger: &logger, disperserClient: disperserClient, - eigenDAClient: client, config: config, writers: writers, - statusTracker: &statusVerifier, - readers: readers, }, nil } -// buildRetriever creates a retriever client for the traffic generator. -func buildRetriever(config *config.Config) (clients.RetrievalClient, retrivereth.ChainClient) { - loggerConfig := common.DefaultLoggerConfig() - - logger, err := common.NewLogger(loggerConfig) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate logger: %s", err)) - } - - gethClient, err := geth.NewMultiHomingClient(config.RetrievalClientConfig.EthClientConfig, gethcommon.Address{}, logger) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate geth client: %s", err)) - } - - tx, err := eth.NewReader( - logger, - gethClient, - config.RetrievalClientConfig.BLSOperatorStateRetrieverAddr, - config.RetrievalClientConfig.EigenDAServiceManagerAddr) - if err != nil { - panic(fmt.Sprintf("Unable to instantiate transactor: %s", err)) - } - - cs := eth.NewChainState(tx, gethClient) - - chainState := thegraph.MakeIndexedChainState(*config.TheGraphConfig, cs, logger) - - var assignmentCoordinator core.AssignmentCoordinator = &core.StdAssignmentCoordinator{} - - nodeClient := clients.NewNodeClient(config.NodeClientTimeout) - - config.RetrievalClientConfig.EncoderConfig.LoadG2Points = true - v, err := verifier.NewVerifier(&config.RetrievalClientConfig.EncoderConfig, nil) - if err != nil { - panic(fmt.Sprintf("Unable to build statusTracker: %s", err)) - } - - retriever, err := clients.NewRetrievalClient( - logger, - chainState, - assignmentCoordinator, - nodeClient, - v, - config.RetrievalClientConfig.NumConnections) - - if err != nil { - panic(fmt.Sprintf("Unable to build retriever: %s", err)) - } - - chainClient := retrivereth.NewChainClient(gethClient, logger) - - return retriever, chainClient -} - // Start instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. func (generator *Generator) Start() error { generator.generatorMetrics.Start() - generator.statusTracker.Start() + + // generator.statusTracker.Start() for _, writer := range generator.writers { writer.Start() time.Sleep(generator.config.InstanceLaunchInterval) } - for _, reader := range generator.readers { - reader.Start() - time.Sleep(generator.config.InstanceLaunchInterval) - } + // for _, reader := range generator.readers { + // reader.Start() + // time.Sleep(generator.config.InstanceLaunchInterval) + // } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go index 40a0b415e9..24ee9dc81e 100644 --- a/tools/traffic/table/blob_metadata.go +++ b/tools/traffic/table/blob_metadata.go @@ -1,6 +1,10 @@ package table -import "errors" +import ( + "errors" + + corev2 "github.com/Layr-Labs/eigenda/core/v2" +) // BlobMetadata encapsulates various information about a blob written by the traffic generator. type BlobMetadata struct { @@ -13,6 +17,9 @@ type BlobMetadata struct { // Hash of the batch header that the blob was written in. BatchHeaderHash [32]byte + // Blob header of the blob. + BlobHeader *corev2.BlobHeader + // Checksum of the blob. Checksum [16]byte diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go index 99c9c91a02..37992675be 100644 --- a/tools/traffic/table/blob_store_test.go +++ b/tools/traffic/table/blob_store_test.go @@ -1,10 +1,11 @@ package table import ( + "testing" + tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/stretchr/testify/assert" "golang.org/x/exp/rand" - "testing" ) // randomMetadata generates a random BlobMetadata instance. diff --git a/tools/traffic/workers/blob_reader.go b/tools/traffic/workers/blob_reader.go deleted file mode 100644 index 033eb9ee64..0000000000 --- a/tools/traffic/workers/blob_reader.go +++ /dev/null @@ -1,233 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "fmt" - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/retriever/eth" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" - gcommon "github.com/ethereum/go-ethereum/common" - "math/big" - "sync" - "time" -) - -// BlobReader reads blobs from the DA network at a configured rate. -type BlobReader struct { - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - retriever clients.RetrievalClient - chainClient eth.ChainClient - - // blobsToRead blobs we are required to read a certain number of times. - blobsToRead *table.BlobStore - - // metrics for the blob reader. - metrics *blobReaderMetrics -} - -type blobReaderMetrics struct { - generatorMetrics metrics.Metrics - fetchBatchHeaderMetric metrics.LatencyMetric - fetchBatchHeaderSuccess metrics.CountMetric - fetchBatchHeaderFailure metrics.CountMetric - readLatencyMetric metrics.LatencyMetric - readSuccessMetric metrics.CountMetric - readFailureMetric metrics.CountMetric - recombinationSuccessMetric metrics.CountMetric - recombinationFailureMetric metrics.CountMetric - validBlobMetric metrics.CountMetric - invalidBlobMetric metrics.CountMetric - operatorSuccessMetrics map[core.OperatorID]metrics.CountMetric - operatorFailureMetrics map[core.OperatorID]metrics.CountMetric - requiredReadPoolSizeMetric metrics.GaugeMetric - optionalReadPoolSizeMetric metrics.GaugeMetric -} - -// NewBlobReader creates a new BlobReader instance. -func NewBlobReader( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - retriever clients.RetrievalClient, - chainClient eth.ChainClient, - blobStore *table.BlobStore, - generatorMetrics metrics.Metrics) BlobReader { - - return BlobReader{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - retriever: retriever, - chainClient: chainClient, - blobsToRead: blobStore, - metrics: &blobReaderMetrics{ - generatorMetrics: generatorMetrics, - fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"), - fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"), - fetchBatchHeaderFailure: generatorMetrics.NewCountMetric("fetch_batch_header_failure"), - recombinationSuccessMetric: generatorMetrics.NewCountMetric("recombination_success"), - recombinationFailureMetric: generatorMetrics.NewCountMetric("recombination_failure"), - readLatencyMetric: generatorMetrics.NewLatencyMetric("read"), - validBlobMetric: generatorMetrics.NewCountMetric("valid_blob"), - invalidBlobMetric: generatorMetrics.NewCountMetric("invalid_blob"), - readSuccessMetric: generatorMetrics.NewCountMetric("read_success"), - readFailureMetric: generatorMetrics.NewCountMetric("read_failure"), - operatorSuccessMetrics: make(map[core.OperatorID]metrics.CountMetric), - operatorFailureMetrics: make(map[core.OperatorID]metrics.CountMetric), - requiredReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("required_read_pool_size"), - optionalReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("optional_read_pool_size"), - }, - } -} - -// Start begins a blob reader goroutine. -func (r *BlobReader) Start() { - r.waitGroup.Add(1) - ticker := time.NewTicker(r.config.ReadRequestInterval) - go func() { - defer r.waitGroup.Done() - for { - select { - case <-(*r.ctx).Done(): - err := (*r.ctx).Err() - if err != nil { - r.logger.Info("blob reader context closed", "err:", err) - } - return - case <-ticker.C: - r.randomRead() - } - } - }() -} - -// randomRead reads a random blob. -func (r *BlobReader) randomRead() { - metadata := r.blobsToRead.GetNext() - if metadata == nil { - // There are no blobs that we are required to read. - return - } - - r.metrics.requiredReadPoolSizeMetric.Set(float64(r.blobsToRead.Size())) - - ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout) - defer cancel() - - start := time.Now() - batchHeader, err := r.chainClient.FetchBatchHeader( - ctxTimeout, - gcommon.HexToAddress(r.config.EigenDAServiceManager), - metadata.BatchHeaderHash[:], - big.NewInt(int64(0)), - nil) - if err != nil { - r.logger.Error("failed to get batch header", "err:", err) - r.metrics.fetchBatchHeaderFailure.Increment() - return - } - r.metrics.fetchBatchHeaderMetric.ReportLatency(time.Since(start)) - - r.metrics.fetchBatchHeaderSuccess.Increment() - - ctxTimeout, cancel = context.WithTimeout(*r.ctx, r.config.RetrieveBlobChunksTimeout) - defer cancel() - - start = time.Now() - chunks, err := r.retriever.RetrieveBlobChunks( - ctxTimeout, - metadata.BatchHeaderHash, - uint32(metadata.BlobIndex), - uint(batchHeader.ReferenceBlockNumber), - batchHeader.BlobHeadersRoot, - core.QuorumID(0)) - if err != nil { - r.logger.Error("failed to read chunks", "err:", err) - r.metrics.readFailureMetric.Increment() - return - } - r.metrics.readLatencyMetric.ReportLatency(time.Since(start)) - - r.metrics.readSuccessMetric.Increment() - - assignments := chunks.Assignments - - data, err := r.retriever.CombineChunks(chunks) - if err != nil { - r.logger.Error("failed to combine chunks", "err:", err) - r.metrics.recombinationFailureMetric.Increment() - return - } - r.metrics.recombinationSuccessMetric.Increment() - - r.verifyBlob(metadata, &data) - - indexSet := make(map[encoding.ChunkNumber]bool) - for index := range chunks.Indices { - indexSet[chunks.Indices[index]] = true - } - - for id, assignment := range assignments { - for index := assignment.StartIndex; index < assignment.StartIndex+assignment.NumChunks; index++ { - if indexSet[index] { - r.reportChunk(id) - } else { - r.reportMissingChunk(id) - } - } - } -} - -// reportChunk reports a successful chunk read. -func (r *BlobReader) reportChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorSuccessMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId)) - r.metrics.operatorSuccessMetrics[operatorId] = metric - } - - metric.Increment() -} - -// reportMissingChunk reports a missing chunk. -func (r *BlobReader) reportMissingChunk(operatorId core.OperatorID) { - metric, exists := r.metrics.operatorFailureMetrics[operatorId] - if !exists { - metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_witheld_chunk", operatorId)) - r.metrics.operatorFailureMetrics[operatorId] = metric - } - - metric.Increment() -} - -// verifyBlob performs sanity checks on the blob. -func (r *BlobReader) verifyBlob(metadata *table.BlobMetadata, blob *[]byte) { - // Trim off the padding. - truncatedBlob := (*blob)[:metadata.Size] - recomputedChecksum := md5.Sum(truncatedBlob) - - if metadata.Checksum == recomputedChecksum { - r.metrics.validBlobMetric.Increment() - } else { - r.metrics.invalidBlobMetric.Increment() - } -} diff --git a/tools/traffic/workers/blob_reader_test.go b/tools/traffic/workers/blob_reader_test.go deleted file mode 100644 index ea9d5f0773..0000000000 --- a/tools/traffic/workers/blob_reader_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package workers - -import ( - "context" - "crypto/md5" - "sync" - "testing" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - apiMock "github.com/Layr-Labs/eigenda/api/clients/mock" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" - retrieverMock "github.com/Layr-Labs/eigenda/retriever/mock" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" -) - -// TestBlobReaderNoOptionalReads tests the BlobReader's basic functionality' -func TestBlobReader(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - blobTable := table.NewBlobStore() - - readerMetrics := metrics.NewMockMetrics() - - chainClient := &retrieverMock.MockChainClient{} - chainClient.On( - "FetchBatchHeader", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything).Return(&binding.BatchHeader{}, nil) - retrievalClient := &apiMock.MockRetrievalClient{} - - blobReader := NewBlobReader( - &ctx, - &waitGroup, - logger, - &config.WorkerConfig{}, - retrievalClient, - chainClient, - blobTable, - readerMetrics) - - blobSize := 1024 - readPermits := 2 - blobCount := 100 - - invalidBlobCount := 0 - - // Insert some blobs into the table. - for i := 0; i < blobCount; i++ { - - key := make([]byte, 32) - _, err = rand.Read(key) - assert.Nil(t, err) - - blobData := make([]byte, blobSize) - _, err = rand.Read(blobData) - assert.Nil(t, err) - - var checksum [16]byte - if i%10 == 0 { - // Simulate an invalid blob - invalidBlobCount++ - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - } else { - checksum = md5.Sum(blobData) - } - - batchHeaderHash := [32]byte{} - _, err = rand.Read(batchHeaderHash[:]) - assert.Nil(t, err) - - blobMetadata, err := table.NewBlobMetadata( - key, - checksum, - uint(blobSize), - uint(i), - batchHeaderHash, - readPermits) - assert.Nil(t, err) - - // Simplify tracking by hijacking the BlobHeaderLength field to store the blob index, - // which is used as a unique identifier within this test. - chunks := &clients.BlobChunks{BlobHeaderLength: blobMetadata.BlobIndex} - retrievalClient.On("RetrieveBlobChunks", - blobMetadata.BatchHeaderHash, - uint32(blobMetadata.BlobIndex), - mock.Anything, - mock.Anything, - mock.Anything).Return(chunks, nil) - retrievalClient.On("CombineChunks", chunks).Return(blobData, nil) - - blobTable.Add(blobMetadata) - } - - // Do a bunch of reads. - expectedTotalReads := uint(readPermits * blobCount) - for i := uint(0); i < expectedTotalReads; i++ { - blobReader.randomRead() - - chainClient.AssertNumberOfCalls(t, "FetchBatchHeader", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "RetrieveBlobChunks", int(i+1)) - retrievalClient.AssertNumberOfCalls(t, "CombineChunks", int(i+1)) - - remainingPermits := uint(0) - for _, metadata := range blobTable.GetAll() { - remainingPermits += uint(metadata.RemainingReadPermits) - } - assert.Equal(t, remainingPermits, expectedTotalReads-i-1) - - assert.Equal(t, i+1, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, i+1, uint(readerMetrics.GetCount("recombination_success"))) - } - - expectedInvalidBlobs := uint(invalidBlobCount * readPermits) - expectedValidBlobs := expectedTotalReads - expectedInvalidBlobs - - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("required_read_pool_size"))) - assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("optional_read_pool_size"))) - - // Table is empty, so doing a random read should have no effect. - blobReader.randomRead() - - // Give the system a moment to attempt to do work. This should not result in any reads. - time.Sleep(time.Second / 10) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("read_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("fetch_batch_header_success"))) - assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("recombination_success"))) - assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) - assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) - - cancel() -} diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go deleted file mode 100644 index 15bcca84bf..0000000000 --- a/tools/traffic/workers/blob_status_tracker.go +++ /dev/null @@ -1,256 +0,0 @@ -package workers - -import ( - "context" - "math/rand" - "sync" - "time" - - "github.com/Layr-Labs/eigenda/api/clients" - "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/Layr-Labs/eigensdk-go/logging" -) - -// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. -// When blobs become confirmed, the status tracker updates the blob blobsToRead accordingly. -// This is a thread safe data structure. -type BlobStatusTracker struct { - - // The context for the generator. All work should cease when this context is cancelled. - ctx *context.Context - - // Tracks the number of active goroutines within the generator. - waitGroup *sync.WaitGroup - - // All logs should be written using this logger. - logger logging.Logger - - // config contains the configuration for the generator. - config *config.WorkerConfig - - // Contains confirmed blobs. Blobs are added here when they are confirmed by the disperser service. - confirmedBlobs *table.BlobStore - - // The disperser client used to monitor the disperser service. - disperser clients.DisperserClient - - // The keys of blobs that have not yet been confirmed by the disperser service. - unconfirmedBlobs []*UnconfirmedKey - - // Newly added keys that require verification. - keyChannel chan *UnconfirmedKey - - blobsInFlightMetric metrics.GaugeMetric - getStatusLatencyMetric metrics.LatencyMetric - confirmationLatencyMetric metrics.LatencyMetric - getStatusErrorCountMetric metrics.CountMetric - unknownCountMetric metrics.CountMetric - processingCountMetric metrics.CountMetric - dispersingCountMetric metrics.CountMetric - failedCountMetric metrics.CountMetric - insufficientSignaturesCountMetric metrics.CountMetric - confirmedCountMetric metrics.CountMetric - finalizedCountMetric metrics.CountMetric -} - -// NewBlobStatusTracker creates a new BlobStatusTracker instance. -func NewBlobStatusTracker( - ctx *context.Context, - waitGroup *sync.WaitGroup, - logger logging.Logger, - config *config.WorkerConfig, - keyChannel chan *UnconfirmedKey, - table *table.BlobStore, - disperser clients.DisperserClient, - generatorMetrics metrics.Metrics) BlobStatusTracker { - - return BlobStatusTracker{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - keyChannel: keyChannel, - confirmedBlobs: table, - disperser: disperser, - unconfirmedBlobs: make([]*UnconfirmedKey, 0), - blobsInFlightMetric: generatorMetrics.NewGaugeMetric("blobs_in_flight"), - getStatusLatencyMetric: generatorMetrics.NewLatencyMetric("get_status"), - confirmationLatencyMetric: generatorMetrics.NewLatencyMetric("confirmation"), - getStatusErrorCountMetric: generatorMetrics.NewCountMetric("get_status_ERROR"), - unknownCountMetric: generatorMetrics.NewCountMetric("get_status_UNKNOWN"), - processingCountMetric: generatorMetrics.NewCountMetric("get_status_PROCESSING"), - dispersingCountMetric: generatorMetrics.NewCountMetric("get_status_DISPERSING"), - failedCountMetric: generatorMetrics.NewCountMetric("get_status_FAILED"), - insufficientSignaturesCountMetric: generatorMetrics.NewCountMetric("get_status_INSUFFICIENT_SIGNATURES"), - confirmedCountMetric: generatorMetrics.NewCountMetric("get_status_CONFIRMED"), - finalizedCountMetric: generatorMetrics.NewCountMetric("get_status_FINALIZED"), - } -} - -// Start begins the status goroutine, which periodically polls -// the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) Start() { - tracker.waitGroup.Add(1) - go tracker.monitor() -} - -// monitor periodically polls the disperser service to verify the status of blobs. -func (tracker *BlobStatusTracker) monitor() { - ticker := time.NewTicker(tracker.config.TrackerInterval) - for { - select { - case <-(*tracker.ctx).Done(): - tracker.waitGroup.Done() - return - case key := <-tracker.keyChannel: - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, key) - case <-ticker.C: - tracker.poll() - } - } -} - -// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service. -// If a Key is confirmed, it is added to the blob confirmedBlobs and removed from the list of unconfirmed keys. -func (tracker *BlobStatusTracker) poll() { - - // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. - // Revisit this method if there are performance problems. - - nonFinalBlobs := make([]*UnconfirmedKey, 0) - for _, key := range tracker.unconfirmedBlobs { - - blobStatus, err := tracker.getBlobStatus(key) - if err != nil { - tracker.logger.Error("failed to get blob status: ", "err:", err) - // There was an error getting status. Try again later. - nonFinalBlobs = append(nonFinalBlobs, key) - continue - } - - tracker.updateStatusMetrics(blobStatus.Status) - if isBlobStatusTerminal(blobStatus.Status) { - if isBlobStatusConfirmed(blobStatus.Status) { - tracker.forwardToReader(key, blobStatus) - } - } else { - // try again later - nonFinalBlobs = append(nonFinalBlobs, key) - } - } - tracker.unconfirmedBlobs = nonFinalBlobs - tracker.blobsInFlightMetric.Set(float64(len(tracker.unconfirmedBlobs))) -} - -// isBlobStatusTerminal returns true if the status is a terminal status. -func isBlobStatusTerminal(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_FAILED: - return true - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser.BlobStatus_CONFIRMED: - // Technically this isn't terminal, as confirmed blobs eventually should become finalized. - // But it is terminal from the status tracker's perspective, since we stop tracking the blob - // once it becomes either confirmed or finalized. - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// isBlobStatusConfirmed returns true if the status is a confirmed status. -func isBlobStatusConfirmed(status disperser.BlobStatus) bool { - switch status { - case disperser.BlobStatus_CONFIRMED: - return true - case disperser.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -// updateStatusMetrics updates the metrics for the reported status of a blob. -func (tracker *BlobStatusTracker) updateStatusMetrics(status disperser.BlobStatus) { - switch status { - case disperser.BlobStatus_UNKNOWN: - tracker.unknownCountMetric.Increment() - case disperser.BlobStatus_PROCESSING: - tracker.processingCountMetric.Increment() - case disperser.BlobStatus_DISPERSING: - tracker.dispersingCountMetric.Increment() - case disperser.BlobStatus_FAILED: - tracker.failedCountMetric.Increment() - case disperser.BlobStatus_INSUFFICIENT_SIGNATURES: - tracker.insufficientSignaturesCountMetric.Increment() - case disperser.BlobStatus_CONFIRMED: - tracker.confirmedCountMetric.Increment() - case disperser.BlobStatus_FINALIZED: - tracker.finalizedCountMetric.Increment() - default: - tracker.logger.Error("unknown blob status", "status:", status) - } -} - -// getBlobStatus gets the status of a blob from the disperser service. Returns nil if there was an error -// getting the status. -func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser.BlobStatusReply, error) { - ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout) - defer cancel() - - start := time.Now() - status, err := tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) - - if err != nil { - tracker.getStatusErrorCountMetric.Increment() - return nil, err - } - tracker.getStatusLatencyMetric.ReportLatency(time.Since(start)) - - return status, nil -} - -// forwardToReader forwards a blob to the reader. Only called once the blob is ready to be read. -func (tracker *BlobStatusTracker) forwardToReader(key *UnconfirmedKey, status *disperser.BlobStatusReply) { - batchHeaderHash := [32]byte(status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash) - blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex() - - confirmationTime := time.Now() - confirmationLatency := confirmationTime.Sub(key.SubmissionTime) - tracker.confirmationLatencyMetric.ReportLatency(confirmationLatency) - - requiredDownloads := tracker.config.RequiredDownloads - var downloadCount int32 - if requiredDownloads < 0 { - // Allow unlimited downloads. - downloadCount = -1 - } else if requiredDownloads == 0 { - // Do not download blob. - return - } else if requiredDownloads < 1 { - // Download blob with probability equal to requiredDownloads. - if rand.Float64() < requiredDownloads { - // Download the blob once. - downloadCount = 1 - } else { - // Do not download blob. - return - } - } else { - // Download blob requiredDownloads times. - downloadCount = int32(requiredDownloads) - } - - blobMetadata, err := table.NewBlobMetadata(key.Key, key.Checksum, key.Size, uint(blobIndex), batchHeaderHash, int(downloadCount)) - if err != nil { - tracker.logger.Error("failed to create blob metadata", "err:", err) - return - } - tracker.confirmedBlobs.Add(blobMetadata) -} diff --git a/tools/traffic/workers/blob_status_tracker_test.go b/tools/traffic/workers/blob_status_tracker_test.go deleted file mode 100644 index 9246e2052c..0000000000 --- a/tools/traffic/workers/blob_status_tracker_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package workers - -import ( - "context" - "fmt" - disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/common" - tu "github.com/Layr-Labs/eigenda/common/testutils" - "github.com/Layr-Labs/eigenda/tools/traffic/config" - "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/table" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/exp/rand" - "sync" - "testing" - "time" -) - -func getRandomStatus() disperser_rpc.BlobStatus { - return disperser_rpc.BlobStatus(rand.Intn(7)) -} - -func isStatusTerminal(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_UNKNOWN: - return false - case disperser_rpc.BlobStatus_PROCESSING: - return false - case disperser_rpc.BlobStatus_DISPERSING: - return false - - case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES: - return true - case disperser_rpc.BlobStatus_FAILED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - case disperser_rpc.BlobStatus_CONFIRMED: - return true - default: - panic("unknown status") - } -} - -func isStatusSuccess(status disperser_rpc.BlobStatus) bool { - switch status { - case disperser_rpc.BlobStatus_CONFIRMED: - return true - case disperser_rpc.BlobStatus_FINALIZED: - return true - default: - return false - } -} - -func TestStatusTracker(t *testing.T) { - tu.InitializeRandom() - - ctx, cancel := context.WithCancel(context.Background()) - waitGroup := sync.WaitGroup{} - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - assert.Nil(t, err) - - requiredDownloads := rand.Intn(10) + 1 - config := &config.WorkerConfig{ - RequiredDownloads: float64(requiredDownloads), - } - - blobStore := table.NewBlobStore() - - trackerMetrics := metrics.NewMockMetrics() - - disperserClient := &MockDisperserClient{} - - tracker := NewBlobStatusTracker( - &ctx, - &waitGroup, - logger, - config, - make(chan *UnconfirmedKey), - blobStore, - disperserClient, - trackerMetrics) - - expectedGetStatusCount := 0 - statusCounts := make(map[disperser_rpc.BlobStatus]int) - checksums := make(map[string][16]byte) - sizes := make(map[string]uint) - - statusMap := make(map[string]disperser_rpc.BlobStatus) - - for i := 0; i < 100; i++ { - - // Add some new keys to track. - newKeys := rand.Intn(10) - for j := 0; j < newKeys; j++ { - key := make([]byte, 16) - checksum := [16]byte{} - size := rand.Uint32() - - _, err = rand.Read(key) - assert.Nil(t, err) - _, err = rand.Read(checksum[:]) - assert.Nil(t, err) - - checksums[string(key)] = checksum - sizes[string(key)] = uint(size) - - stringifiedKey := string(key) - statusMap[stringifiedKey] = disperser_rpc.BlobStatus_UNKNOWN - - unconfirmedKey := &UnconfirmedKey{ - Key: key, - Checksum: checksum, - Size: uint(size), - SubmissionTime: time.Now(), - } - - tracker.unconfirmedBlobs = append(tracker.unconfirmedBlobs, unconfirmedKey) - } - - // Reset the mock disperser client. - disperserClient.mock = mock.Mock{} - expectedGetStatusCount = 0 - - // Choose some new statuses to be returned. - // Count the number of status queries we expect to see in this iteration. - for key, status := range statusMap { - var newStatus disperser_rpc.BlobStatus - if isStatusTerminal(status) { - newStatus = status - } else { - // Blobs in a non-terminal status will be queried again. - expectedGetStatusCount += 1 - // Set the next status to be returned. - newStatus = getRandomStatus() - statusMap[key] = newStatus - - statusCounts[newStatus] += 1 - } - disperserClient.mock.On("GetBlobStatus", []byte(key)).Return( - &disperser_rpc.BlobStatusReply{ - Status: newStatus, - Info: &disperser_rpc.BlobInfo{ - BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ - BatchMetadata: &disperser_rpc.BatchMetadata{ - BatchHeaderHash: make([]byte, 32), - }, - }, - }, - }, nil) - } - - // Simulate advancement of time, allowing the tracker to process the new keys. - tracker.poll() - - // Validate the number of calls made to the disperser client. - disperserClient.mock.AssertNumberOfCalls(t, "GetBlobStatus", expectedGetStatusCount) - - // Read the data in the confirmedBlobs into a map for quick lookup. - tableData := make(map[string]*table.BlobMetadata) - for _, metadata := range blobStore.GetAll() { - tableData[string(metadata.Key)] = metadata - } - - blobsInFlight := 0 - for key, status := range statusMap { - metadata, present := tableData[key] - - if !isStatusTerminal(status) { - blobsInFlight++ - } - - if isStatusSuccess(status) { - // Successful blobs should be in the confirmedBlobs. - assert.True(t, present) - } else { - // Non-successful blobs should not be in the confirmedBlobs. - assert.False(t, present) - } - - // Verify metadata. - if present { - assert.Equal(t, checksums[key], metadata.Checksum) - assert.Equal(t, sizes[key], metadata.Size) - assert.Equal(t, requiredDownloads, metadata.RemainingReadPermits) - } - } - - // Verify metrics. - for status, count := range statusCounts { // TODO - metricName := fmt.Sprintf("get_status_%s", status.String()) - assert.Equal(t, float64(count), trackerMetrics.GetCount(metricName), "status: %s", status.String()) - } - if float64(blobsInFlight) != trackerMetrics.GetGaugeValue("blobs_in_flight") { - assert.Equal(t, float64(blobsInFlight), trackerMetrics.GetGaugeValue("blobs_in_flight")) - } - } - - cancel() - tu.ExecuteWithTimeout(func() { - waitGroup.Wait() - }, time.Second) -} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index e0add4b355..52d95caa6e 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -8,7 +8,8 @@ import ( "sync" "time" - "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/api/clients/v2" + v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" @@ -32,8 +33,8 @@ type BlobWriter struct { // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient - // Unconfirmed keys are sent here. - unconfirmedKeyChannel chan *UnconfirmedKey + // Uncertified keys are sent here. + uncertifiedKeyChannel chan *UncertifiedKey // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData []byte @@ -55,7 +56,7 @@ func NewBlobWriter( logger logging.Logger, config *config.WorkerConfig, disperser clients.DisperserClient, - unconfirmedKeyChannel chan *UnconfirmedKey, + uncertifiedKeyChannel chan *UncertifiedKey, generatorMetrics metrics.Metrics) BlobWriter { var fixedRandomData []byte @@ -78,7 +79,7 @@ func NewBlobWriter( logger: logger, config: config, disperser: disperser, - unconfirmedKeyChannel: unconfirmedKeyChannel, + uncertifiedKeyChannel: uncertifiedKeyChannel, fixedRandomData: fixedRandomData, writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), @@ -128,7 +129,7 @@ func (writer *BlobWriter) writeNextBlob() { checksum := md5.Sum(data) - writer.unconfirmedKeyChannel <- &UnconfirmedKey{ + writer.uncertifiedKeyChannel <- &UncertifiedKey{ Key: key, Checksum: checksum, Size: uint(len(data)), @@ -153,20 +154,24 @@ func (writer *BlobWriter) getRandomData() ([]byte, error) { } // sendRequest sends a blob to a disperser. -func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) { +func (writer *BlobWriter) sendRequest(data []byte) (key v2.BlobKey, err error) { ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) defer cancel() - if writer.config.SignerPrivateKey != "" { - _, key, err = writer.disperser.DisperseBlobAuthenticated( - ctxTimeout, - data, - writer.config.CustomQuorums) - } else { - _, key, err = writer.disperser.DisperseBlob( - ctxTimeout, - data, - writer.config.CustomQuorums) + writer.logger.Info("sending blob request", "size", len(data)) + status, key, err := writer.disperser.DisperseBlob( + ctxTimeout, + data, + 0, + writer.config.CustomQuorums, + 0, + ) + if err != nil { + writer.logger.Error("failed to send blob request", "err", err) + return } + + writer.logger.Info("blob request sent", "key", key.Hex(), "status", status.String()) + return } diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index dcd70841c6..23a494b124 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -4,6 +4,9 @@ import ( "context" "crypto/md5" "fmt" + "sync" + "testing" + "github.com/Layr-Labs/eigenda/common" tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/Layr-Labs/eigenda/disperser" @@ -13,8 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "golang.org/x/exp/rand" - "sync" - "testing" ) func TestBlobWriter(t *testing.T) { @@ -56,7 +57,7 @@ func TestBlobWriter(t *testing.T) { } disperserClient := &MockDisperserClient{} - unconfirmedKeyChannel := make(chan *UnconfirmedKey, 100) + unconfirmedKeyChannel := make(chan *UncertifiedKey, 100) generatorMetrics := metrics.NewMockMetrics() diff --git a/tools/traffic/workers/unconfirmed_key.go b/tools/traffic/workers/unconfirmed_key.go index c86b8f1dcd..9523cd04c1 100644 --- a/tools/traffic/workers/unconfirmed_key.go +++ b/tools/traffic/workers/unconfirmed_key.go @@ -1,11 +1,15 @@ package workers -import "time" +import ( + "time" -// UnconfirmedKey is a Key that has not yet been confirmed by the disperser service. -type UnconfirmedKey struct { + v2 "github.com/Layr-Labs/eigenda/core/v2" +) + +// UncertifiedKey is a Key that has not yet been certified by the disperser service. +type UncertifiedKey struct { // The Key of the blob. - Key []byte + Key v2.BlobKey // The Size of the blob in bytes. Size uint // The Checksum of the blob. From 67f97a06023ce6eef88ca12773a254b5db2f492e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 19 Dec 2024 17:10:56 -0800 Subject: [PATCH 02/14] save --- api/clients/v2/accountant.go | 1 + api/clients/v2/disperser_client.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/api/clients/v2/accountant.go b/api/clients/v2/accountant.go index be0030d3f8..a2084e01e3 100644 --- a/api/clients/v2/accountant.go +++ b/api/clients/v2/accountant.go @@ -104,6 +104,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo } return 0, a.cumulativePayment, nil } + fmt.Println("accoutant calcu", "numSymbols", symbolUsage, "relativeBinRecord", relativeBinRecord.Usage, "overflowBin", overflowBinRecord, "incrementRequired", incrementRequired, "a.cumulativePayment", a.cumulativePayment) return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available") } diff --git a/api/clients/v2/disperser_client.go b/api/clients/v2/disperser_client.go index 3ef20404be..0aeb03d0e7 100644 --- a/api/clients/v2/disperser_client.go +++ b/api/clients/v2/disperser_client.go @@ -99,10 +99,28 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error { return fmt.Errorf("error getting payment state for initializing accountant: %w", err) } + fmt.Println("paymentState", paymentState) + err = c.accountant.SetPaymentState(paymentState) if err != nil { return fmt.Errorf("error setting payment state for accountant: %w", err) } + + fmt.Println("Populate accountant %w", "account", + c.accountant.accountID, + "binRecords", + c.accountant.binRecords, + "cumulativePayment", + c.accountant.cumulativePayment, + "minNumSymbols", + c.accountant.minNumSymbols, + "onDemand", + c.accountant.onDemand, + "pricePerSymbol", + c.accountant.pricePerSymbol, + "reservation", + c.accountant.reservation, + ) return nil } From cb0526d93af1e7f9cbcbfccb819be1cfba5e9948 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:38:36 -0800 Subject: [PATCH 03/14] cleanup --- tools/traffic/generator_v2.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 68c1a6a9d5..b29971401d 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" ) // Generator simulates read/write traffic to the DA service. @@ -52,8 +53,6 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { return nil, err } - // var signer corev2.BlobRequestSigner - var signer *auth.LocalBlobRequestSigner if config.SignerPrivateKey != "" { signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKey) @@ -62,11 +61,12 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { return nil, fmt.Errorf("signer private key is required") } - // logger2 := log.NewLogger(log.NewTerminalHandler(os.Stderr, true)) - // client, err := clients.NewEigenDAClient(logger2, *config.EigenDAClientConfig) - // if err != nil { - // return nil, err - // } + signerAccountId, err := signer.GetAccountID() + if err != nil { + return nil, fmt.Errorf("error getting account ID: %w", err) + } + accountId := gethcommon.HexToAddress(signerAccountId) + logger.Info("Initializing traffic generator", "accountId", accountId) ctx, cancel := context.WithCancel(context.Background()) waitGroup := sync.WaitGroup{} From 6fc53325cd5e09e2bfb5c93804f8a2b28498965b Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:43:03 -0800 Subject: [PATCH 04/14] rm unused logs --- api/clients/v2/accountant.go | 2 +- api/clients/v2/disperser_client.go | 17 ----------------- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/api/clients/v2/accountant.go b/api/clients/v2/accountant.go index a2084e01e3..9ad17d7f27 100644 --- a/api/clients/v2/accountant.go +++ b/api/clients/v2/accountant.go @@ -104,7 +104,7 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo } return 0, a.cumulativePayment, nil } - fmt.Println("accoutant calcu", "numSymbols", symbolUsage, "relativeBinRecord", relativeBinRecord.Usage, "overflowBin", overflowBinRecord, "incrementRequired", incrementRequired, "a.cumulativePayment", a.cumulativePayment) + return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available") } diff --git a/api/clients/v2/disperser_client.go b/api/clients/v2/disperser_client.go index 0aeb03d0e7..4f4415fea4 100644 --- a/api/clients/v2/disperser_client.go +++ b/api/clients/v2/disperser_client.go @@ -99,28 +99,11 @@ func (c *disperserClient) PopulateAccountant(ctx context.Context) error { return fmt.Errorf("error getting payment state for initializing accountant: %w", err) } - fmt.Println("paymentState", paymentState) - err = c.accountant.SetPaymentState(paymentState) if err != nil { return fmt.Errorf("error setting payment state for accountant: %w", err) } - fmt.Println("Populate accountant %w", "account", - c.accountant.accountID, - "binRecords", - c.accountant.binRecords, - "cumulativePayment", - c.accountant.cumulativePayment, - "minNumSymbols", - c.accountant.minNumSymbols, - "onDemand", - c.accountant.onDemand, - "pricePerSymbol", - c.accountant.pricePerSymbol, - "reservation", - c.accountant.reservation, - ) return nil } From 27da9d7b608dbf6b106c9569fc6f5b2a83050d82 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:46:59 -0800 Subject: [PATCH 05/14] Update docker files --- docker-bake.hcl | 2 ++ trafficgenerator.Dockerfile | 4 ++-- trafficgenerator2.Dockerfile | 5 ++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docker-bake.hcl b/docker-bake.hcl index ecee0f2aef..6fa0c24f20 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -44,6 +44,7 @@ group "all" { "churner", "dataapi", "traffic-generator", + "traffic-generator2", "controller", "relay" ] @@ -84,6 +85,7 @@ group "internal-release" { "churner-internal", "dataapi-internal", "traffic-generator-internal", + "traffic-generator2-internal", "controller-internal", "relay-internal" ] diff --git a/trafficgenerator.Dockerfile b/trafficgenerator.Dockerfile index 8d39eaf337..5ffc767c29 100644 --- a/trafficgenerator.Dockerfile +++ b/trafficgenerator.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,7 +13,7 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd -FROM alpine:3.18 as generator +FROM alpine:3.18 AS generator COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin diff --git a/trafficgenerator2.Dockerfile b/trafficgenerator2.Dockerfile index 8f9dc149b7..092d0e6e71 100644 --- a/trafficgenerator2.Dockerfile +++ b/trafficgenerator2.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.1-alpine3.18 as builder +FROM golang:1.21.1-alpine3.18 AS builder RUN apk add --no-cache make musl-dev linux-headers gcc git jq bash @@ -13,9 +13,8 @@ RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build -o ./bin/generator ./cmd2 -FROM alpine:3.18 as generator2 +FROM alpine:3.18 AS generator2 COPY --from=builder /app/tools/traffic/bin/generator /usr/local/bin -COPY --from=builder /app/inabox/resources /resources ENTRYPOINT ["generator"] From 83f79e8aaa059955bed2ab2be84b86a8bd0f690e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 30 Dec 2024 17:34:11 -0600 Subject: [PATCH 06/14] rename dockerfile --- docker-bake.hcl | 16 ++++++++-------- tools/traffic/config/config.go | 4 ++++ tools/traffic/config/flags.go | 10 ++-------- ....Dockerfile => trafficgenerator-v2.Dockerfile | 0 4 files changed, 14 insertions(+), 16 deletions(-) rename trafficgenerator2.Dockerfile => trafficgenerator-v2.Dockerfile (100%) diff --git a/docker-bake.hcl b/docker-bake.hcl index 6fa0c24f20..2c7f04e4b6 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -203,19 +203,19 @@ target "traffic-generator-internal" { ] } -target "traffic-generator2" { +target "traffic-generator-v2" { context = "." - dockerfile = "./trafficgenerator2.Dockerfile" + dockerfile = "./trafficgenerator-v2.Dockerfile" target = "generator2" - tags = ["${REGISTRY}/${REPO}/traffic-generator2:${BUILD_TAG}"] + tags = ["${REGISTRY}/${REPO}/traffic-generator-v2:${BUILD_TAG}"] } -target "traffic-generator2-internal" { - inherits = ["traffic-generator2"] +target "traffic-generator-v2-internal" { + inherits = ["traffic-generator-v2"] tags = [ - "${REGISTRY}/eigenda-traffic-generator2:${BUILD_TAG}", - "${REGISTRY}/eigenda-traffic-generator2:${GIT_SHA}", - "${REGISTRY}/eigenda-traffic-generator2:sha-${GIT_SHORT_SHA}" + "${REGISTRY}/eigenda-traffic-generator-v2:${BUILD_TAG}", + "${REGISTRY}/eigenda-traffic-generator-v2:${GIT_SHA}", + "${REGISTRY}/eigenda-traffic-generator-v2:sha-${GIT_SHORT_SHA}" ] } diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index d415f75431..aea0d36b4a 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -43,6 +43,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return nil, err } customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name) + if len(customQuorums) == 0 { + return nil, errors.New("no custom quorum numbers provided") + } + customQuorumsUint8 := make([]uint8, len(customQuorums)) for i, q := range customQuorums { if q < 0 || q > 255 { diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index 1b9709f78b..fc0ec776d9 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -4,9 +4,6 @@ import ( "time" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/geth" - "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/indexer" "github.com/urfave/cli" ) @@ -132,7 +129,7 @@ var ( TheGraphUrlFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "the-graph-url"), Usage: "URL of the subgraph instance.", - Required: true, + Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_URL"), } TheGraphPullIntervalFlag = cli.DurationFlag{ @@ -224,7 +221,6 @@ var ( var requiredFlags = []cli.Flag{ HostnameFlag, GrpcPortFlag, - TheGraphUrlFlag, } var optionalFlags = []cli.Flag{ @@ -242,6 +238,7 @@ var optionalFlags = []cli.Flag{ RequiredDownloadsFlag, DisableTLSFlag, MetricsHTTPPortFlag, + TheGraphUrlFlag, TheGraphPullIntervalFlag, TheGraphRetriesFlag, VerifierIntervalFlag, @@ -260,9 +257,6 @@ var Flags []cli.Flag func init() { Flags = append(requiredFlags, optionalFlags...) - Flags = append(Flags, kzg.CLIFlags(envPrefix)...) Flags = append(Flags, common.LoggerCLIFlags(envPrefix, FlagPrefix)...) - Flags = append(Flags, geth.EthClientFlags(envPrefix)...) Flags = append(Flags, indexer.CLIFlags(envPrefix)...) - Flags = append(Flags, thegraph.CLIFlags(envPrefix)...) } diff --git a/trafficgenerator2.Dockerfile b/trafficgenerator-v2.Dockerfile similarity index 100% rename from trafficgenerator2.Dockerfile rename to trafficgenerator-v2.Dockerfile From c1a3ae04dd5ae2243047dd61d40c3370951efea7 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 30 Dec 2024 17:42:40 -0600 Subject: [PATCH 07/14] Add graceful shutdown, remove uncertified key channel (tmp), get writes working --- tools/traffic/cmd2/main.go | 23 +++++++++- tools/traffic/generator_v2.go | 62 +++++++++++++++++---------- tools/traffic/metrics/metrics.go | 63 ++++++++++++++++++++++------ tools/traffic/workers/blob_writer.go | 55 ++++++++++-------------- 4 files changed, 135 insertions(+), 68 deletions(-) diff --git a/tools/traffic/cmd2/main.go b/tools/traffic/cmd2/main.go index 51b41f56bc..bff1919b3f 100644 --- a/tools/traffic/cmd2/main.go +++ b/tools/traffic/cmd2/main.go @@ -4,6 +4,8 @@ import ( "fmt" "log" "os" + "os/signal" + "syscall" "github.com/Layr-Labs/eigenda/tools/traffic" "github.com/Layr-Labs/eigenda/tools/traffic/config" @@ -40,5 +42,24 @@ func trafficGeneratorMain(ctx *cli.Context) error { panic(fmt.Sprintf("failed to create new traffic generator\n%s", err)) } - return generator.Start() + // Set up signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + // Run the generator in a goroutine + errChan := make(chan error, 1) + go func() { + errChan <- generator.Start() + }() + + // Wait for either an error or a signal + select { + case err := <-errChan: + return err + case sig := <-sigChan: + fmt.Printf("\nReceived signal %v, shutting down...\n", sig) + // Call Stop() method for graceful shutdown + generator.Stop() + return nil + } } diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index b29971401d..bb7268f896 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -3,10 +3,7 @@ package traffic import ( "context" "fmt" - "os" - "os/signal" "sync" - "syscall" "time" "github.com/Layr-Labs/eigenda/api/clients/v2" @@ -39,7 +36,7 @@ type Generator struct { cancel *context.CancelFunc waitGroup *sync.WaitGroup generatorMetrics metrics.Metrics - logger *logging.Logger + logger logging.Logger disperserClient clients.DisperserClient // eigenDAClient *clients.EigenDAClient #TODO: Add this back in when the client is implemented config *config.Config @@ -77,9 +74,8 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { config.WorkerConfig.MetricsBlacklist, config.WorkerConfig.MetricsFuzzyBlacklist) - uncertifiedKeyChannel := make(chan *workers.UncertifiedKey, 100) + // uncertifiedKeyChannel := make(chan *workers.UncertifiedKey, 100) - // TODO: create a dedicated reservation for traffic generator disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() @@ -94,7 +90,6 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { logger, &config.WorkerConfig, disperserClient, - uncertifiedKeyChannel, generatorMetrics) writers = append(writers, &writer) } @@ -104,35 +99,58 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { cancel: &cancel, waitGroup: &waitGroup, generatorMetrics: generatorMetrics, - logger: &logger, + logger: logger, disperserClient: disperserClient, config: config, writers: writers, }, nil } -// Start instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. +// Start instantiates goroutines that generate read/write traffic. func (generator *Generator) Start() error { - + // Start metrics server generator.generatorMetrics.Start() - // generator.statusTracker.Start() - + // Start writers + generator.logger.Info("Starting writers") for _, writer := range generator.writers { + generator.logger.Info("Starting writer", "writer", writer) writer.Start() time.Sleep(generator.config.InstanceLaunchInterval) } - // for _, reader := range generator.readers { - // reader.Start() - // time.Sleep(generator.config.InstanceLaunchInterval) - // } - - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt, syscall.SIGTERM) - <-signals + // Wait for context cancellation to keep the process running + <-(*generator.ctx).Done() + generator.logger.Info("Generator received stop signal") + return nil +} +func (generator *Generator) Stop() error { + // Cancel context to stop all workers (*generator.cancel)() - generator.waitGroup.Wait() - return nil + + // Set a timeout for graceful shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + // Shutdown metrics server + if err := generator.generatorMetrics.Shutdown(); err != nil { + generator.logger.Error("Failed to shutdown metrics server", "err", err) + } + + // Wait for all workers with timeout + done := make(chan struct{}) + go func() { + generator.waitGroup.Wait() + close(done) + }() + + select { + case <-done: + generator.logger.Info("All workers shut down gracefully") + return nil + case <-shutdownCtx.Done(): + generator.logger.Warn("Shutdown timed out, forcing exit") + return fmt.Errorf("shutdown timed out after 10 seconds") + } } diff --git a/tools/traffic/metrics/metrics.go b/tools/traffic/metrics/metrics.go index e24a52dfc8..6a150285e6 100644 --- a/tools/traffic/metrics/metrics.go +++ b/tools/traffic/metrics/metrics.go @@ -1,19 +1,24 @@ package metrics import ( + "context" "fmt" + "net/http" + "strings" + "time" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" - "strings" ) // Metrics allows the creation of metrics for the traffic generator. type Metrics interface { // Start starts the metrics server. - Start() + Start() error + // Shutdown shuts down the metrics server. + Shutdown() error // NewLatencyMetric creates a new LatencyMetric instance. Useful for reporting the latency of an operation. NewLatencyMetric(description string) LatencyMetric // NewCountMetric creates a new CountMetric instance. Useful for tracking the count of a type of event. @@ -35,6 +40,8 @@ type metrics struct { metricsBlacklist []string metricsFuzzyBlacklist []string + + shutdown func() error } // NewMetrics creates a new Metrics instance. @@ -69,19 +76,49 @@ func NewMetrics( return metrics } -// Start starts the metrics server. -func (metrics *metrics) Start() { - metrics.logger.Info("Starting metrics server at ", "port", metrics.httpPort) +func (metrics *metrics) Start() error { + metrics.logger.Info("Starting metrics server", "port", metrics.httpPort) addr := fmt.Sprintf(":%s", metrics.httpPort) + // Create mux and add /metrics handler + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor( + metrics.registry, + promhttp.HandlerOpts{}, + )) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + } + go func() { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor( - metrics.registry, - promhttp.HandlerOpts{}, - )) - err := http.ListenAndServe(addr, mux) - panic(fmt.Sprintf("Prometheus server failed: %s", err)) + metrics.logger.Info("Starting metrics server", "port", metrics.httpPort) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + metrics.logger.Error("Prometheus server failed", "err", err) + } }() + + // Store shutdown function + metrics.shutdown = func() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + metrics.logger.Info("Shutting down metrics server") + if err := srv.Shutdown(ctx); err != nil { + metrics.logger.Error("Metrics server shutdown failed", "err", err) + return err + } + return nil + } + + return nil +} + +func (metrics *metrics) Shutdown() error { + if metrics.shutdown != nil { + return metrics.shutdown() + } + return nil } // NewLatencyMetric creates a new LatencyMetric instance. diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 52d95caa6e..7c1d8888e5 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -2,7 +2,6 @@ package workers import ( "context" - "crypto/md5" "crypto/rand" "fmt" "sync" @@ -33,9 +32,6 @@ type BlobWriter struct { // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient - // Uncertified keys are sent here. - uncertifiedKeyChannel chan *UncertifiedKey - // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData []byte @@ -56,7 +52,6 @@ func NewBlobWriter( logger logging.Logger, config *config.WorkerConfig, disperser clients.DisperserClient, - uncertifiedKeyChannel chan *UncertifiedKey, generatorMetrics metrics.Metrics) BlobWriter { var fixedRandomData []byte @@ -74,67 +69,63 @@ func NewBlobWriter( } return BlobWriter{ - ctx: ctx, - waitGroup: waitGroup, - logger: logger, - config: config, - disperser: disperser, - uncertifiedKeyChannel: uncertifiedKeyChannel, - fixedRandomData: fixedRandomData, - writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), - writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), - writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + config: config, + disperser: disperser, + fixedRandomData: fixedRandomData, + writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), + writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), + writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), } } // Start begins the blob writer goroutine. func (writer *BlobWriter) Start() { + writer.logger.Info("Starting blob writer") writer.waitGroup.Add(1) ticker := time.NewTicker(writer.config.WriteRequestInterval) go func() { defer writer.waitGroup.Done() + defer ticker.Stop() for { select { case <-(*writer.ctx).Done(): + writer.logger.Info("context cancelled, stopping blob writer") return case <-ticker.C: - writer.writeNextBlob() + if err := writer.writeNextBlob(); err != nil { + writer.logger.Error("failed to write blob", "err", err) + } } } }() } // writeNextBlob attempts to send a random blob to the disperser. -func (writer *BlobWriter) writeNextBlob() { +func (writer *BlobWriter) writeNextBlob() error { data, err := writer.getRandomData() if err != nil { writer.logger.Error("failed to get random data", "err", err) - return + return err } start := time.Now() - key, err := writer.sendRequest(data) + _, err = writer.sendRequest(data) if err != nil { writer.writeFailureMetric.Increment() writer.logger.Error("failed to send blob request", "err", err) - return - } else { - end := time.Now() - duration := end.Sub(start) - writer.writeLatencyMetric.ReportLatency(duration) + return err } + end := time.Now() + duration := end.Sub(start) + writer.writeLatencyMetric.ReportLatency(duration) writer.writeSuccessMetric.Increment() - checksum := md5.Sum(data) - - writer.uncertifiedKeyChannel <- &UncertifiedKey{ - Key: key, - Checksum: checksum, - Size: uint(len(data)), - SubmissionTime: time.Now(), - } + return nil } // getRandomData returns a slice of random data to be used for a blob. From afcc4c03a7281a0e55953f86154cf8bf7a7f965e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 30 Dec 2024 17:47:03 -0600 Subject: [PATCH 08/14] fix --- docker-bake.hcl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-bake.hcl b/docker-bake.hcl index 2c7f04e4b6..9f9b136c13 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -44,7 +44,7 @@ group "all" { "churner", "dataapi", "traffic-generator", - "traffic-generator2", + "traffic-generator-v2", "controller", "relay" ] @@ -85,7 +85,7 @@ group "internal-release" { "churner-internal", "dataapi-internal", "traffic-generator-internal", - "traffic-generator2-internal", + "traffic-generator-v2-internal", "controller-internal", "relay-internal" ] From 892c98caba935355960d2889213156d60de29b04 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 21:54:07 -0800 Subject: [PATCH 09/14] fix test --- tools/traffic/cmd2/main.go | 1 - tools/traffic/config/config.go | 9 +--- tools/traffic/generator_v2.go | 2 - tools/traffic/metrics/mock_metrics.go | 8 +++- tools/traffic/workers/blob_writer_test.go | 56 +++++++---------------- tools/traffic/workers/mock_disperser.go | 35 ++++++-------- 6 files changed, 40 insertions(+), 71 deletions(-) diff --git a/tools/traffic/cmd2/main.go b/tools/traffic/cmd2/main.go index bff1919b3f..98fe5a9da3 100644 --- a/tools/traffic/cmd2/main.go +++ b/tools/traffic/cmd2/main.go @@ -58,7 +58,6 @@ func trafficGeneratorMain(ctx *cli.Context) error { return err case sig := <-sigChan: fmt.Printf("\nReceived signal %v, shutting down...\n", sig) - // Call Stop() method for graceful shutdown generator.Stop() return nil } diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index aea0d36b4a..46c7fa4ea6 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -7,13 +7,11 @@ import ( "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core/thegraph" - "github.com/Layr-Labs/eigenda/retriever" "github.com/urfave/cli" ) // Config configures a traffic generator. type Config struct { - // Logging configuration. LoggingConfig common.LoggerConfig @@ -31,8 +29,10 @@ type Config struct { // The port at which the metrics server listens for HTTP requests. MetricsHTTPPort string + // The timeout for the node client. NodeClientTimeout time.Duration + // The amount of time to sleep after launching each worker thread. InstanceLaunchInterval time.Duration } @@ -55,11 +55,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { customQuorumsUint8[i] = uint8(q) } - retrieverConfig, err := retriever.NewConfig(ctx) - if err != nil { - return nil, err - } - config := &Config{ DisperserClientConfig: &clients.DisperserClientConfig{ Hostname: ctx.GlobalString(HostnameFlag.Name), diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index bb7268f896..16a2735352 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -74,8 +74,6 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { config.WorkerConfig.MetricsBlacklist, config.WorkerConfig.MetricsFuzzyBlacklist) - // uncertifiedKeyChannel := make(chan *workers.UncertifiedKey, 100) - disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() diff --git a/tools/traffic/metrics/mock_metrics.go b/tools/traffic/metrics/mock_metrics.go index 88eb11447b..fb4018a61b 100644 --- a/tools/traffic/metrics/mock_metrics.go +++ b/tools/traffic/metrics/mock_metrics.go @@ -48,8 +48,12 @@ func (m *MockMetrics) GetLatency(description string) time.Duration { return m.latencies[description] } -func (m *MockMetrics) Start() { - // intentional no-op +func (m *MockMetrics) Start() error { + return nil +} + +func (m *MockMetrics) Shutdown() error { + return nil } func (m *MockMetrics) NewLatencyMetric(description string) LatencyMetric { diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 23a494b124..59c2797e0c 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -2,14 +2,14 @@ package workers import ( "context" - "crypto/md5" "fmt" "sync" "testing" "github.com/Layr-Labs/eigenda/common" tu "github.com/Layr-Labs/eigenda/common/testutils" - "github.com/Layr-Labs/eigenda/disperser" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" @@ -27,22 +27,8 @@ func TestBlobWriter(t *testing.T) { assert.Nil(t, err) dataSize := rand.Uint64()%1024 + 64 - encodedDataSize := len(codec.ConvertByPaddingEmptyByte(make([]byte, dataSize))) - - authenticated := rand.Intn(2) == 0 - var signerPrivateKey string - if authenticated { - signerPrivateKey = "asdf" - } - var functionName string - if authenticated { - functionName = "DisperseBlobAuthenticated" - } else { - functionName = "DisperseBlob" - } randomizeBlobs := rand.Intn(2) == 0 - useCustomQuorum := rand.Intn(2) == 0 var customQuorum []uint8 if useCustomQuorum { @@ -50,15 +36,12 @@ func TestBlobWriter(t *testing.T) { } config := &config.WorkerConfig{ - DataSize: dataSize, - SignerPrivateKey: signerPrivateKey, - RandomizeBlobs: randomizeBlobs, - CustomQuorums: customQuorum, + DataSize: dataSize, + RandomizeBlobs: randomizeBlobs, + CustomQuorums: customQuorum, } disperserClient := &MockDisperserClient{} - unconfirmedKeyChannel := make(chan *UncertifiedKey, 100) - generatorMetrics := metrics.NewMockMetrics() writer := NewBlobWriter( @@ -67,7 +50,6 @@ func TestBlobWriter(t *testing.T) { logger, config, disperserClient, - unconfirmedKeyChannel, generatorMetrics) errorCount := 0 @@ -84,37 +66,33 @@ func TestBlobWriter(t *testing.T) { } // This is the Key that will be assigned to the next blob. - keyToReturn := make([]byte, 32) - _, err = rand.Read(keyToReturn) + var keyToReturn corev2.BlobKey + _, err = rand.Read(keyToReturn[:]) assert.Nil(t, err) - status := disperser.Processing + status := dispv2.Queued disperserClient.mock = mock.Mock{} // reset mock state - disperserClient.mock.On(functionName, mock.Anything, customQuorum).Return(&status, keyToReturn, errorToReturn) + disperserClient.mock.On("DisperseBlob", + mock.Anything, + mock.AnythingOfType("[]uint8"), + mock.AnythingOfType("uint16"), + mock.AnythingOfType("[]uint8"), + mock.AnythingOfType("uint32"), + ).Return(&status, keyToReturn, errorToReturn) // Simulate the advancement of time (i.e. allow the writer to write the next blob). writer.writeNextBlob() - disperserClient.mock.AssertNumberOfCalls(t, functionName, 1) + disperserClient.mock.AssertNumberOfCalls(t, "DisperseBlob", 1) if errorToReturn == nil { - dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte) + dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(1).([]byte) assert.NotNil(t, dataSentToDisperser) // Strip away the extra encoding bytes. We should have data of the expected Size. decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser) assert.Equal(t, dataSize, uint64(len(decodedData))) - // Verify that the proper data was sent to the unconfirmed Key handler. - checksum := md5.Sum(dataSentToDisperser) - - unconfirmedKey, ok := <-unconfirmedKeyChannel - - assert.True(t, ok) - assert.Equal(t, keyToReturn, unconfirmedKey.Key) - assert.Equal(t, uint(encodedDataSize), unconfirmedKey.Size) - assert.Equal(t, checksum, unconfirmedKey.Checksum) - // Verify that data has the proper amount of randomness. if previousData != nil { if randomizeBlobs { diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 43d9880ce2..ae0e0f7c29 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -3,9 +3,11 @@ package workers import ( "context" - "github.com/Layr-Labs/eigenda/api/clients" - disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" - "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/api/clients/v2" + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/stretchr/testify/mock" ) @@ -18,29 +20,22 @@ type MockDisperserClient struct { func (m *MockDisperserClient) DisperseBlob( ctx context.Context, data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + blobVersion corev2.BlobVersion, + quorums []core.QuorumID, + salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error) { - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) + args := m.mock.Called(ctx, data, blobVersion, quorums, salt) + return args.Get(0).(*dispv2.BlobStatus), args.Get(1).(corev2.BlobKey), args.Error(2) } -func (m *MockDisperserClient) DisperseBlobAuthenticated( - ctx context.Context, - data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) -} - -func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { - args := m.mock.Called(key) +func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error) { + args := m.mock.Called(blobKey) return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1) } -func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { - args := m.mock.Called(batchHeaderHash, blobIndex) - return args.Get(0).([]byte), args.Error(1) +func (m *MockDisperserClient) GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error) { + args := m.mock.Called(data) + return args.Get(0).(*disperser_rpc.BlobCommitmentReply), args.Error(1) } func (m *MockDisperserClient) Close() error { From 5581f3e3332b609c874d6cc313c7c5baa0bee59e Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 22:08:28 -0800 Subject: [PATCH 10/14] Simplify further --- tools/traffic/config/config.go | 55 +++++----- tools/traffic/config/flags.go | 121 +--------------------- tools/traffic/config/worker_config.go | 51 --------- tools/traffic/generator_v2.go | 7 +- tools/traffic/metrics/count_metric.go | 5 - tools/traffic/metrics/metrics.go | 46 ++------ tools/traffic/workers/blob_writer.go | 8 +- tools/traffic/workers/blob_writer_test.go | 4 +- 8 files changed, 49 insertions(+), 248 deletions(-) diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 46c7fa4ea6..687932cd5d 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -24,8 +24,8 @@ type Config struct { // Configuration for the graph. TheGraphConfig *thegraph.Config - // Configures the traffic generator workers. - WorkerConfig WorkerConfig + // Configures the blob writers. + BlobWriterConfig BlobWriterConfig // The port at which the metrics server listens for HTTP requests. MetricsHTTPPort string @@ -37,6 +37,28 @@ type Config struct { InstanceLaunchInterval time.Duration } +// BlobWriterConfig configures the blob writer. +type BlobWriterConfig struct { + // The number of worker threads that generate write traffic. + NumWriteInstances uint + + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration + + // The Size of each blob dispersed, in bytes. + DataSize uint64 + + // If true, then each blob will contain unique random data. If false, the same random data + // will be dispersed for each blob by a particular worker thread. + RandomizeBlobs bool + + // The amount of time to wait for a blob to be written. + WriteTimeout time.Duration + + // Custom quorum numbers to use for the traffic generator. + CustomQuorums []uint8 +} + func NewConfig(ctx *cli.Context) (*Config, error) { loggerConfig, err := common.ReadLoggerCLIConfig(ctx, FlagPrefix) if err != nil { @@ -63,41 +85,20 @@ func NewConfig(ctx *cli.Context) (*Config, error) { }, SignerPrivateKey: ctx.String(SignerPrivateKeyFlag.Name), - - TheGraphConfig: &thegraph.Config{ - Endpoint: ctx.String(TheGraphUrlFlag.Name), - PullInterval: ctx.Duration(TheGraphPullIntervalFlag.Name), - MaxRetries: ctx.Int(TheGraphRetriesFlag.Name), - }, - - LoggingConfig: *loggerConfig, + LoggingConfig: *loggerConfig, MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name), NodeClientTimeout: ctx.Duration(NodeClientTimeoutFlag.Name), InstanceLaunchInterval: ctx.Duration(InstanceLaunchIntervalFlag.Name), - WorkerConfig: WorkerConfig{ + BlobWriterConfig: BlobWriterConfig{ NumWriteInstances: ctx.GlobalUint(NumWriteInstancesFlag.Name), WriteRequestInterval: ctx.Duration(WriteRequestIntervalFlag.Name), DataSize: ctx.GlobalUint64(DataSizeFlag.Name), - RandomizeBlobs: !ctx.GlobalBool(UniformBlobsFlag.Name), + RandomizeBlobs: ctx.GlobalBool(RandomizeBlobsFlag.Name), WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name), - - TrackerInterval: ctx.Duration(VerifierIntervalFlag.Name), - GetBlobStatusTimeout: ctx.Duration(GetBlobStatusTimeoutFlag.Name), - - NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name), - ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name), - RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name), - FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name), - RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), - StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), - - CustomQuorums: customQuorumsUint8, - - MetricsBlacklist: ctx.StringSlice(MetricsBlacklistFlag.Name), - MetricsFuzzyBlacklist: ctx.StringSlice(MetricsFuzzyBlacklistFlag.Name), + CustomQuorums: customQuorumsUint8, }, } diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index fc0ec776d9..a549ae320a 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -76,22 +76,7 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "INSTANCE_LAUNCH_INTERVAL"), } - MetricsBlacklistFlag = cli.StringSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "metrics-blacklist"), - Usage: "Any metric with a label exactly matching this string will not be sent to the metrics server.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_BLACKLIST"), - } - - MetricsFuzzyBlacklistFlag = cli.StringSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "metrics-fuzzy-blacklist"), - Usage: "Any metric that contains any string in this list will not be sent to the metrics server.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_FUZZY_BLACKLIST"), - } - - /* Configuration for the blob writer. */ - + /* Configuration for the blob writers. */ NumWriteInstancesFlag = cli.UintFlag{ Name: common.PrefixFlag(FlagPrefix, "num-write-instances"), Usage: "Number of writer instances producing traffic to run in parallel.", @@ -113,11 +98,11 @@ var ( Value: 1024, EnvVar: common.PrefixEnvVar(envPrefix, "DATA_SIZE"), } - UniformBlobsFlag = cli.BoolFlag{ - Name: common.PrefixFlag(FlagPrefix, "uniform-blobs"), + RandomizeBlobsFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "randomize-blobs"), Usage: "If set, do not randomize blobs.", Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "UNIFORM_BLOBS"), + EnvVar: common.PrefixEnvVar(envPrefix, "RANDOMIZE_BLOBS"), } WriteTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "write-timeout"), @@ -126,26 +111,6 @@ var ( Value: 10 * time.Second, EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_TIMEOUT"), } - TheGraphUrlFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-url"), - Usage: "URL of the subgraph instance.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_URL"), - } - TheGraphPullIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-pull-interval"), - Usage: "Interval at which to pull data from the subgraph.", - Required: false, - Value: 100 * time.Millisecond, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_PULL_INTERVAL"), - } - TheGraphRetriesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "the-graph-retries"), - Usage: "Number of times to retry a subgraph request.", - Required: false, - Value: 5, - EnvVar: common.PrefixEnvVar(envPrefix, "THE_GRAPH_RETRIES"), - } NodeClientTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "node-client-timeout"), Usage: "The timeout for the node client.", @@ -153,69 +118,6 @@ var ( Value: 10 * time.Second, EnvVar: common.PrefixEnvVar(envPrefix, "NODE_CLIENT_TIMEOUT"), } - - /* Configuration for the blob validator. */ - - VerifierIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "verifier-interval"), - Usage: "Amount of time between verifier checks.", - Required: false, - Value: time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "VERIFIER_INTERVAL"), - } - GetBlobStatusTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "get-blob-status-timeout"), - Usage: "Amount of time to wait for a blob status to be fetched.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "GET_BLOB_STATUS_TIMEOUT"), - } - VerificationChannelCapacityFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "verification-channel-capacity"), - Usage: "Size of the channel used to communicate between the writer and verifier.", - Required: false, - Value: 1000, - EnvVar: common.PrefixEnvVar(envPrefix, "VERIFICATION_CHANNEL_CAPACITY"), - } - - /* Configuration for the blob reader. */ - - NumReadInstancesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "num-read-instances"), - Usage: "Number of reader instances producing traffic to run in parallel.", - Required: false, - Value: 1, - EnvVar: common.PrefixEnvVar(envPrefix, "NUM_READ_INSTANCES"), - } - ReadRequestIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "read-request-interval"), - Usage: "Time between read requests.", - Required: false, - Value: time.Second / 5, - EnvVar: common.PrefixEnvVar(envPrefix, "READ_REQUEST_INTERVAL"), - } - RequiredDownloadsFlag = cli.Float64Flag{ - Name: common.PrefixFlag(FlagPrefix, "required-downloads"), - Usage: "Number of required downloads. Numbers between 0.0 and 1.0 are treated as probabilities, " + - "numbers greater than 1.0 are treated as the number of downloads. -1 allows unlimited downloads.", - Required: false, - Value: 3.0, - EnvVar: common.PrefixEnvVar(envPrefix, "REQUIRED_DOWNLOADS"), - } - FetchBatchHeaderTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "fetch-batch-header-timeout"), - Usage: "Amount of time to wait for a batch header to be fetched.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "FETCH_BATCH_HEADER_TIMEOUT"), - } - RetrieveBlobChunksTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "retrieve-blob-chunks-timeout"), - Usage: "Amount of time to wait for a blob to be retrieved.", - Required: false, - Value: 5 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "RETRIEVE_BLOB_CHUNKS_TIMEOUT"), - } ) var requiredFlags = []cli.Flag{ @@ -225,7 +127,7 @@ var requiredFlags = []cli.Flag{ var optionalFlags = []cli.Flag{ TimeoutFlag, - UniformBlobsFlag, + RandomizeBlobsFlag, InstanceLaunchIntervalFlag, UseSecureGrpcFlag, SignerPrivateKeyFlag, @@ -233,23 +135,10 @@ var optionalFlags = []cli.Flag{ NumWriteInstancesFlag, WriteRequestIntervalFlag, DataSizeFlag, - NumReadInstancesFlag, - ReadRequestIntervalFlag, - RequiredDownloadsFlag, DisableTLSFlag, MetricsHTTPPortFlag, - TheGraphUrlFlag, - TheGraphPullIntervalFlag, - TheGraphRetriesFlag, - VerifierIntervalFlag, NodeClientTimeoutFlag, - FetchBatchHeaderTimeoutFlag, - RetrieveBlobChunksTimeoutFlag, - GetBlobStatusTimeoutFlag, WriteTimeoutFlag, - VerificationChannelCapacityFlag, - MetricsBlacklistFlag, - MetricsFuzzyBlacklistFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go index 0c6ee0eeb2..d912156bec 100644 --- a/tools/traffic/config/worker_config.go +++ b/tools/traffic/config/worker_config.go @@ -1,52 +1 @@ package config - -import "time" - -// WorkerConfig configures the traffic generator workers. -type WorkerConfig struct { - // The number of worker threads that generate write traffic. - NumWriteInstances uint - // The period of the submission rate of new blobs for each write worker thread. - WriteRequestInterval time.Duration - // The Size of each blob dispersed, in bytes. - DataSize uint64 - // If true, then each blob will contain unique random data. If false, the same random data - // will be dispersed for each blob by a particular worker thread. - RandomizeBlobs bool - // The amount of time to wait for a blob to be written. - WriteTimeout time.Duration - - // The amount of time between attempts by the status tracker to confirm the status of blobs. - TrackerInterval time.Duration - // The amount of time to wait for a blob status to be fetched. - GetBlobStatusTimeout time.Duration - // The size of the channel used to communicate between the writer and status tracker. - StatusTrackerChannelCapacity uint - - // The number of worker threads that generate read traffic. - NumReadInstances uint - // The period of the submission rate of read requests for each read worker thread. - ReadRequestInterval time.Duration - // For each blob, how many times should it be downloaded? If between 0.0 and 1.0, blob will be downloaded - // 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded). - // If greater than 1.0, then each blob will be downloaded the specified number of times. - RequiredDownloads float64 - // The amount of time to wait for a batch header to be fetched. - FetchBatchHeaderTimeout time.Duration - // The amount of time to wait for a blob to be retrieved. - RetrieveBlobChunksTimeout time.Duration - - // The address of the EigenDA service manager smart contract, in hex. - EigenDAServiceManager string - - // Custom quorum numbers to use for the traffic generator. - CustomQuorums []uint8 - - // Any metric with a label exactly matching one of the strings in this list will not be sent to the metrics server. - MetricsBlacklist []string - - // Any metric that contains any string in this list will not be sent to the metrics server. For example, - // including the string "_returned_chunk" will cause all metrics in the form of - // "operator_fb390a64122db3957fb220c3c42d5f71e97ab0c995da4e1e5cc3261602dac527_returned_chunk" to be omitted. - MetricsFuzzyBlacklist []string -} diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 16a2735352..4f24729856 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -71,8 +71,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { generatorMetrics := metrics.NewMetrics( config.MetricsHTTPPort, logger, - config.WorkerConfig.MetricsBlacklist, - config.WorkerConfig.MetricsFuzzyBlacklist) + ) disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { @@ -81,12 +80,12 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { } writers := make([]*workers.BlobWriter, 0) - for i := 0; i < int(config.WorkerConfig.NumWriteInstances); i++ { + for i := 0; i < int(config.BlobWriterConfig.NumWriteInstances); i++ { writer := workers.NewBlobWriter( &ctx, + &config.BlobWriterConfig, &waitGroup, logger, - &config.WorkerConfig, disperserClient, generatorMetrics) writers = append(writers, &writer) diff --git a/tools/traffic/metrics/count_metric.go b/tools/traffic/metrics/count_metric.go index daa508bb80..114a69c6eb 100644 --- a/tools/traffic/metrics/count_metric.go +++ b/tools/traffic/metrics/count_metric.go @@ -14,15 +14,10 @@ type CountMetric interface { type countMetric struct { metrics *metrics description string - // disabled specifies whether the metrics should behave as a no-op - disabled bool } // Increment increments the count of a type of event. func (metric *countMetric) Increment() { - if metric.disabled { - return - } metric.metrics.count.WithLabelValues(metric.description).Inc() } diff --git a/tools/traffic/metrics/metrics.go b/tools/traffic/metrics/metrics.go index 6a150285e6..224932744d 100644 --- a/tools/traffic/metrics/metrics.go +++ b/tools/traffic/metrics/metrics.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/http" - "strings" "time" "github.com/Layr-Labs/eigensdk-go/logging" @@ -38,9 +37,6 @@ type metrics struct { httpPort string logger logging.Logger - metricsBlacklist []string - metricsFuzzyBlacklist []string - shutdown func() error } @@ -48,30 +44,20 @@ type metrics struct { func NewMetrics( httpPort string, logger logging.Logger, - metricsBlacklist []string, - metricsFuzzyBlacklist []string) Metrics { +) Metrics { namespace := "eigenda_generator" reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - if metricsBlacklist == nil { - metricsBlacklist = []string{} - } - if metricsFuzzyBlacklist == nil { - metricsFuzzyBlacklist = []string{} - } - metrics := &metrics{ - count: buildCounterCollector(namespace, reg), - latency: buildLatencyCollector(namespace, reg), - gauge: buildGaugeCollector(namespace, reg), - registry: reg, - httpPort: httpPort, - logger: logger.With("component", "GeneratorMetrics"), - metricsBlacklist: metricsBlacklist, - metricsFuzzyBlacklist: metricsFuzzyBlacklist, + count: buildCounterCollector(namespace, reg), + latency: buildLatencyCollector(namespace, reg), + gauge: buildGaugeCollector(namespace, reg), + registry: reg, + httpPort: httpPort, + logger: logger.With("component", "GeneratorMetrics"), } return metrics } @@ -126,7 +112,6 @@ func (metrics *metrics) NewLatencyMetric(description string) LatencyMetric { return &latencyMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), } } @@ -135,7 +120,6 @@ func (metrics *metrics) NewCountMetric(description string) CountMetric { return &countMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), } } @@ -144,21 +128,5 @@ func (metrics *metrics) NewGaugeMetric(description string) GaugeMetric { return &gaugeMetric{ metrics: metrics, description: description, - disabled: metrics.isBlacklisted(description), - } -} - -// isBlacklisted returns true if the metric name is blacklisted. -func (metrics *metrics) isBlacklisted(metricName string) bool { - for _, blacklisted := range metrics.metricsBlacklist { - if metricName == blacklisted { - return true - } - } - for _, blacklisted := range metrics.metricsFuzzyBlacklist { - if strings.Contains(metricName, blacklisted) { - return true - } } - return false } diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 7c1d8888e5..22fd96ebea 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -17,6 +17,9 @@ import ( // BlobWriter sends blobs to a disperser at a configured rate. type BlobWriter struct { + // Config contains the configuration for the generator. + config *config.BlobWriterConfig + // The context for the generator. All work should cease when this context is cancelled. ctx *context.Context @@ -26,9 +29,6 @@ type BlobWriter struct { // All logs should be written using this logger. logger logging.Logger - // Config contains the configuration for the generator. - config *config.WorkerConfig - // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient @@ -48,9 +48,9 @@ type BlobWriter struct { // NewBlobWriter creates a new BlobWriter instance. func NewBlobWriter( ctx *context.Context, + config *config.BlobWriterConfig, waitGroup *sync.WaitGroup, logger logging.Logger, - config *config.WorkerConfig, disperser clients.DisperserClient, generatorMetrics metrics.Metrics) BlobWriter { diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 59c2797e0c..6e89f8778d 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -35,7 +35,7 @@ func TestBlobWriter(t *testing.T) { customQuorum = []uint8{1, 2, 3} } - config := &config.WorkerConfig{ + config := &config.BlobWriterConfig{ DataSize: dataSize, RandomizeBlobs: randomizeBlobs, CustomQuorums: customQuorum, @@ -46,9 +46,9 @@ func TestBlobWriter(t *testing.T) { writer := NewBlobWriter( &ctx, + config, &waitGroup, logger, - config, disperserClient, generatorMetrics) From bc6e22ca448566f1bcaa609286a4de931f6e6139 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 23:08:12 -0800 Subject: [PATCH 11/14] Use runtime configuration to define groups of blob writers --- tools/traffic/config/config.go | 54 +----- .../config/example_runtime_config.yaml | 21 +++ tools/traffic/config/flags.go | 66 +------- tools/traffic/config/runtime_config.go | 155 ++++++++++++++++++ tools/traffic/config/worker_config.go | 1 - tools/traffic/config/writer_config.go | 25 +++ tools/traffic/generator_v2.go | 149 +++++++++++++---- tools/traffic/metrics/metrics.go | 1 - tools/traffic/workers/blob_writer.go | 85 ++++++++-- tools/traffic/workers/blob_writer_test.go | 1 + 10 files changed, 404 insertions(+), 154 deletions(-) create mode 100644 tools/traffic/config/example_runtime_config.yaml create mode 100644 tools/traffic/config/runtime_config.go delete mode 100644 tools/traffic/config/worker_config.go create mode 100644 tools/traffic/config/writer_config.go diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 687932cd5d..14adcc5952 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -1,7 +1,6 @@ package config import ( - "errors" "time" "github.com/Layr-Labs/eigenda/api/clients/v2" @@ -24,39 +23,14 @@ type Config struct { // Configuration for the graph. TheGraphConfig *thegraph.Config - // Configures the blob writers. - BlobWriterConfig BlobWriterConfig - // The port at which the metrics server listens for HTTP requests. MetricsHTTPPort string // The timeout for the node client. NodeClientTimeout time.Duration - // The amount of time to sleep after launching each worker thread. - InstanceLaunchInterval time.Duration -} - -// BlobWriterConfig configures the blob writer. -type BlobWriterConfig struct { - // The number of worker threads that generate write traffic. - NumWriteInstances uint - - // The period of the submission rate of new blobs for each write worker thread. - WriteRequestInterval time.Duration - - // The Size of each blob dispersed, in bytes. - DataSize uint64 - - // If true, then each blob will contain unique random data. If false, the same random data - // will be dispersed for each blob by a particular worker thread. - RandomizeBlobs bool - - // The amount of time to wait for a blob to be written. - WriteTimeout time.Duration - - // Custom quorum numbers to use for the traffic generator. - CustomQuorums []uint8 + // Path to the runtime configuration file that defines writer groups. + RuntimeConfigPath string } func NewConfig(ctx *cli.Context) (*Config, error) { @@ -64,18 +38,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { if err != nil { return nil, err } - customQuorums := ctx.GlobalIntSlice(CustomQuorumNumbersFlag.Name) - if len(customQuorums) == 0 { - return nil, errors.New("no custom quorum numbers provided") - } - - customQuorumsUint8 := make([]uint8, len(customQuorums)) - for i, q := range customQuorums { - if q < 0 || q > 255 { - return nil, errors.New("invalid custom quorum number") - } - customQuorumsUint8[i] = uint8(q) - } config := &Config{ DisperserClientConfig: &clients.DisperserClientConfig{ @@ -89,17 +51,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { MetricsHTTPPort: ctx.GlobalString(MetricsHTTPPortFlag.Name), NodeClientTimeout: ctx.Duration(NodeClientTimeoutFlag.Name), - - InstanceLaunchInterval: ctx.Duration(InstanceLaunchIntervalFlag.Name), - - BlobWriterConfig: BlobWriterConfig{ - NumWriteInstances: ctx.GlobalUint(NumWriteInstancesFlag.Name), - WriteRequestInterval: ctx.Duration(WriteRequestIntervalFlag.Name), - DataSize: ctx.GlobalUint64(DataSizeFlag.Name), - RandomizeBlobs: ctx.GlobalBool(RandomizeBlobsFlag.Name), - WriteTimeout: ctx.Duration(WriteTimeoutFlag.Name), - CustomQuorums: customQuorumsUint8, - }, + RuntimeConfigPath: ctx.GlobalString(RuntimeConfigPathFlag.Name), } return config, nil diff --git a/tools/traffic/config/example_runtime_config.yaml b/tools/traffic/config/example_runtime_config.yaml new file mode 100644 index 0000000000..f5d746b414 --- /dev/null +++ b/tools/traffic/config/example_runtime_config.yaml @@ -0,0 +1,21 @@ +# Example runtime configuration file for the traffic generator +# This file can be modified while the traffic generator is running + +writer_groups: + # 1kb/s writer (1024 bytes, 1 instance) + - name: "small_frequent" + num_write_instances: 1 + write_request_interval: 1s + data_size: 1024 + randomize_blobs: true + write_timeout: 10s + custom_quorums: [1] + + # 2kb/s writer (1024 bytes, 2 instances) + - name: "medium_frequent" + num_write_instances: 2 + write_request_interval: 1s + data_size: 1024 + randomize_blobs: true + write_timeout: 10s + custom_quorums: [1] diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index a549ae320a..4d72e8a561 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -47,12 +47,6 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "SIGNER_PRIVATE_KEY_HEX"), } - CustomQuorumNumbersFlag = cli.IntSliceFlag{ - Name: common.PrefixFlag(FlagPrefix, "custom-quorum-numbers"), - Usage: "Custom quorum numbers to use for the traffic generator.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "CUSTOM_QUORUM_NUMBERS"), - } DisableTLSFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "disable-tls"), Usage: "Whether to disable TLS for an insecure connection.", @@ -65,52 +59,6 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } - - /* Common Configuration. */ - - InstanceLaunchIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "instance-launch-interva"), - Usage: "Duration between generator instance launches.", - Required: false, - Value: 1 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "INSTANCE_LAUNCH_INTERVAL"), - } - - /* Configuration for the blob writers. */ - NumWriteInstancesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "num-write-instances"), - Usage: "Number of writer instances producing traffic to run in parallel.", - Required: false, - Value: 1, - EnvVar: common.PrefixEnvVar(envPrefix, "NUM_WRITE_INSTANCES"), - } - WriteRequestIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "write-request-interval"), - Usage: "Time between write requests.", - Required: false, - Value: 30 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_REQUEST_INTERVAL"), - } - DataSizeFlag = cli.Uint64Flag{ - Name: common.PrefixFlag(FlagPrefix, "data-size"), - Usage: "Size of the data blob.", - Required: false, - Value: 1024, - EnvVar: common.PrefixEnvVar(envPrefix, "DATA_SIZE"), - } - RandomizeBlobsFlag = cli.BoolFlag{ - Name: common.PrefixFlag(FlagPrefix, "randomize-blobs"), - Usage: "If set, do not randomize blobs.", - Required: false, - EnvVar: common.PrefixEnvVar(envPrefix, "RANDOMIZE_BLOBS"), - } - WriteTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "write-timeout"), - Usage: "Amount of time to wait for a blob to be written.", - Required: false, - Value: 10 * time.Second, - EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_TIMEOUT"), - } NodeClientTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "node-client-timeout"), Usage: "The timeout for the node client.", @@ -118,27 +66,27 @@ var ( Value: 10 * time.Second, EnvVar: common.PrefixEnvVar(envPrefix, "NODE_CLIENT_TIMEOUT"), } + RuntimeConfigPathFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "runtime-config-path"), + Usage: "Path to the runtime configuration file that defines writer groups.", + Required: true, + EnvVar: common.PrefixEnvVar(envPrefix, "RUNTIME_CONFIG_PATH"), + } ) var requiredFlags = []cli.Flag{ HostnameFlag, GrpcPortFlag, + RuntimeConfigPathFlag, } var optionalFlags = []cli.Flag{ TimeoutFlag, - RandomizeBlobsFlag, - InstanceLaunchIntervalFlag, UseSecureGrpcFlag, SignerPrivateKeyFlag, - CustomQuorumNumbersFlag, - NumWriteInstancesFlag, - WriteRequestIntervalFlag, - DataSizeFlag, DisableTLSFlag, MetricsHTTPPortFlag, NodeClientTimeoutFlag, - WriteTimeoutFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/tools/traffic/config/runtime_config.go b/tools/traffic/config/runtime_config.go new file mode 100644 index 0000000000..759976b04d --- /dev/null +++ b/tools/traffic/config/runtime_config.go @@ -0,0 +1,155 @@ +package config + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "gopkg.in/yaml.v2" +) + +// RuntimeConfig represents the configuration that can be modified while the traffic generator is running +type RuntimeConfig struct { + // WriterGroups defines different groups of writers with their own configurations + WriterGroups []WriterGroupConfig `yaml:"writer_groups"` +} + +// WriterGroupConfig represents the configuration for a group of writers with the same settings +type WriterGroupConfig struct { + // Name of the writer group for identification + Name string `yaml:"name"` + + // The number of worker threads that generate write traffic. + NumWriteInstances uint `yaml:"num_write_instances"` + + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration `yaml:"write_request_interval"` + + // The Size of each blob dispersed, in bytes. + DataSize uint64 `yaml:"data_size"` + + // If true, then each blob will contain unique random data. If false, the same random data + // will be dispersed for each blob by a particular worker thread. + RandomizeBlobs bool `yaml:"randomize_blobs"` + + // The amount of time to wait for a blob to be written. + WriteTimeout time.Duration `yaml:"write_timeout"` + + // Custom quorum numbers to use for the traffic generator. + CustomQuorums []uint8 `yaml:"custom_quorums"` +} + +// RuntimeConfigManager handles loading and watching of runtime configuration +type RuntimeConfigManager struct { + sync.RWMutex + currentConfig *RuntimeConfig + configPath string + onChange func(*RuntimeConfig) +} + +// NewRuntimeConfigManager creates a new runtime config manager +func NewRuntimeConfigManager(configPath string, onChange func(*RuntimeConfig)) (*RuntimeConfigManager, error) { + manager := &RuntimeConfigManager{ + configPath: configPath, + onChange: onChange, + } + + // Load initial config + if err := manager.loadConfig(); err != nil { + return nil, err + } + + return manager, nil +} + +// GetConfig returns the current runtime configuration +func (m *RuntimeConfigManager) GetConfig() *RuntimeConfig { + m.RLock() + defer m.RUnlock() + return m.currentConfig +} + +// loadConfig loads the configuration from disk +func (m *RuntimeConfigManager) loadConfig() error { + data, err := os.ReadFile(m.configPath) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + var config RuntimeConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return fmt.Errorf("failed to parse config file: %w", err) + } + + // Validate each writer group if any exist + for i, group := range config.WriterGroups { + if group.Name == "" { + return fmt.Errorf("writer group at index %d must have a name", i) + } + if group.NumWriteInstances == 0 { + return fmt.Errorf("writer group '%s' must have at least one writer instance", group.Name) + } + if group.WriteRequestInterval == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero write request interval", group.Name) + } + if group.DataSize == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero data size", group.Name) + } + if group.WriteTimeout == 0 { + return fmt.Errorf("writer group '%s' must have a non-zero write timeout", group.Name) + } + if len(group.CustomQuorums) == 0 { + return fmt.Errorf("writer group '%s' must have at least one custom quorum", group.Name) + } + } + + m.Lock() + defer m.Unlock() + + // Check if config has actually changed + if m.currentConfig != nil { + // Convert both configs to YAML for comparison + currentYAML, err := yaml.Marshal(m.currentConfig) + if err != nil { + return fmt.Errorf("failed to marshal current config: %w", err) + } + newYAML, err := yaml.Marshal(&config) + if err != nil { + return fmt.Errorf("failed to marshal new config: %w", err) + } + + if string(currentYAML) == string(newYAML) { + // No changes, skip update + return nil + } + } + + m.currentConfig = &config + + if m.onChange != nil { + m.onChange(&config) + } + + return nil +} + +// StartWatching begins watching the config file for changes +func (m *RuntimeConfigManager) StartWatching(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + if err := m.loadConfig(); err != nil { + // Just log the error and continue + fmt.Printf("Error reloading config: %v\n", err) + } + } + } + }() +} diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go deleted file mode 100644 index d912156bec..0000000000 --- a/tools/traffic/config/worker_config.go +++ /dev/null @@ -1 +0,0 @@ -package config diff --git a/tools/traffic/config/writer_config.go b/tools/traffic/config/writer_config.go new file mode 100644 index 0000000000..fde5f21026 --- /dev/null +++ b/tools/traffic/config/writer_config.go @@ -0,0 +1,25 @@ +package config + +import "time" + +// BlobWriterConfig configures the blob writer. +type BlobWriterConfig struct { + // The number of worker threads that generate write traffic. + NumWriteInstances uint + + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration + + // The Size of each blob dispersed, in bytes. + DataSize uint64 + + // If true, then each blob will contain unique random data. If false, the same random data + // will be dispersed for each blob by a particular worker thread. + RandomizeBlobs bool + + // The amount of time to wait for a blob to be written. + WriteTimeout time.Duration + + // Custom quorum numbers to use for the traffic generator. + CustomQuorums []uint8 +} diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 4f24729856..4ae91e7835 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -6,10 +6,11 @@ import ( "sync" "time" - "github.com/Layr-Labs/eigenda/api/clients/v2" + clientsv2 "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/common" auth "github.com/Layr-Labs/eigenda/core/auth/v2" "github.com/Layr-Labs/eigenda/tools/traffic/config" + trafficconfig "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/Layr-Labs/eigensdk-go/logging" @@ -31,17 +32,23 @@ import ( // When a writer finishes writing a blob, it sends information about that blob to the statusTracker. // When the statusTracker observes that a blob has been confirmed, it sends information about the blob // to the readers. The readers only attempt to read blobs that have been confirmed by the statusTracker. +type WriterGroup struct { + name string + writers []*workers.BlobWriter + cancels map[*workers.BlobWriter]context.CancelFunc +} + type Generator struct { ctx *context.Context cancel *context.CancelFunc waitGroup *sync.WaitGroup generatorMetrics metrics.Metrics logger logging.Logger - disperserClient clients.DisperserClient - // eigenDAClient *clients.EigenDAClient #TODO: Add this back in when the client is implemented - config *config.Config - - writers []*workers.BlobWriter + disperserClient clientsv2.DisperserClient + config *config.Config + writerGroups map[string]*WriterGroup + configManager *config.RuntimeConfigManager + mu sync.RWMutex } func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { @@ -65,6 +72,10 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { accountId := gethcommon.HexToAddress(signerAccountId) logger.Info("Initializing traffic generator", "accountId", accountId) + if config.RuntimeConfigPath == "" { + return nil, fmt.Errorf("runtime config path is required") + } + ctx, cancel := context.WithCancel(context.Background()) waitGroup := sync.WaitGroup{} @@ -73,25 +84,13 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { logger, ) - disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) + disperserClient, err := clientsv2.NewDisperserClient(config.DisperserClientConfig, signer, nil, nil) if err != nil { cancel() return nil, fmt.Errorf("new disperser-client: %w", err) } - writers := make([]*workers.BlobWriter, 0) - for i := 0; i < int(config.BlobWriterConfig.NumWriteInstances); i++ { - writer := workers.NewBlobWriter( - &ctx, - &config.BlobWriterConfig, - &waitGroup, - logger, - disperserClient, - generatorMetrics) - writers = append(writers, &writer) - } - - return &Generator{ + generator := &Generator{ ctx: &ctx, cancel: &cancel, waitGroup: &waitGroup, @@ -99,8 +98,105 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { logger: logger, disperserClient: disperserClient, config: config, - writers: writers, - }, nil + writerGroups: make(map[string]*WriterGroup), + } + + // Initialize runtime config manager + configManager, err := trafficconfig.NewRuntimeConfigManager(config.RuntimeConfigPath, generator.handleConfigUpdate) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to initialize runtime config manager: %w", err) + } + generator.configManager = configManager + + return generator, nil +} + +// handleConfigUpdate is called when the runtime configuration changes +func (generator *Generator) handleConfigUpdate(runtimeConfig *trafficconfig.RuntimeConfig) { + generator.mu.Lock() + defer generator.mu.Unlock() + + generator.logger.Info("Received runtime configuration update") + + // Track existing groups to identify which ones to remove + existingGroups := make(map[string]bool) + for name := range generator.writerGroups { + existingGroups[name] = true + } + + // Update or create writer groups + for _, groupConfig := range runtimeConfig.WriterGroups { + delete(existingGroups, groupConfig.Name) + + writerConfig := &trafficconfig.BlobWriterConfig{ + NumWriteInstances: groupConfig.NumWriteInstances, + WriteRequestInterval: groupConfig.WriteRequestInterval, + DataSize: groupConfig.DataSize, + RandomizeBlobs: groupConfig.RandomizeBlobs, + WriteTimeout: groupConfig.WriteTimeout, + CustomQuorums: groupConfig.CustomQuorums, + } + + group, exists := generator.writerGroups[groupConfig.Name] + if !exists { + group = &WriterGroup{ + name: groupConfig.Name, + writers: make([]*workers.BlobWriter, 0), + cancels: make(map[*workers.BlobWriter]context.CancelFunc), + } + generator.writerGroups[groupConfig.Name] = group + } + + // Update writer count + currentWriters := len(group.writers) + targetWriters := int(groupConfig.NumWriteInstances) + + // Scale down if needed + if targetWriters < currentWriters { + for i := targetWriters; i < currentWriters; i++ { + if cancel, exists := group.cancels[group.writers[i]]; exists { + cancel() + delete(group.cancels, group.writers[i]) + } + } + group.writers = group.writers[:targetWriters] + } + + // Scale up if needed + if targetWriters > currentWriters { + for i := currentWriters; i < targetWriters; i++ { + writerCtx, writerCancel := context.WithCancel(*generator.ctx) + writer := workers.NewBlobWriter( + groupConfig.Name, + &writerCtx, + writerConfig, + generator.waitGroup, + generator.logger, + generator.disperserClient, + generator.generatorMetrics) + group.writers = append(group.writers, &writer) + group.cancels[&writer] = writerCancel + writer.Start() + } + } + + // Update configuration for existing writers + for _, writer := range group.writers[:min(currentWriters, targetWriters)] { + writer.UpdateConfig(writerConfig) + } + } + + // Remove any groups that are no longer in the config + for name := range existingGroups { + group := generator.writerGroups[name] + for _, writer := range group.writers { + if cancel, exists := group.cancels[writer]; exists { + cancel() + } + } + delete(generator.writerGroups, name) + } } // Start instantiates goroutines that generate read/write traffic. @@ -108,12 +204,9 @@ func (generator *Generator) Start() error { // Start metrics server generator.generatorMetrics.Start() - // Start writers - generator.logger.Info("Starting writers") - for _, writer := range generator.writers { - generator.logger.Info("Starting writer", "writer", writer) - writer.Start() - time.Sleep(generator.config.InstanceLaunchInterval) + // Start runtime config watcher if configured + if generator.configManager != nil { + generator.configManager.StartWatching(*generator.ctx) } // Wait for context cancellation to keep the process running diff --git a/tools/traffic/metrics/metrics.go b/tools/traffic/metrics/metrics.go index 224932744d..2aa5c81608 100644 --- a/tools/traffic/metrics/metrics.go +++ b/tools/traffic/metrics/metrics.go @@ -78,7 +78,6 @@ func (metrics *metrics) Start() error { } go func() { - metrics.logger.Info("Starting metrics server", "port", metrics.httpPort) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { metrics.logger.Error("Prometheus server failed", "err", err) } diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 22fd96ebea..029e17483a 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -17,7 +17,10 @@ import ( // BlobWriter sends blobs to a disperser at a configured rate. type BlobWriter struct { - // Config contains the configuration for the generator. + // Name of the writer group this writer belongs to + name string + + // Config contains the configuration for the blob writer. config *config.BlobWriterConfig // The context for the generator. All work should cease when this context is cancelled. @@ -43,10 +46,20 @@ type BlobWriter struct { // writeFailureMetric is used to record the number of failed write requests. writeFailureMetric metrics.CountMetric + + // Mutex to protect configuration updates + configMutex sync.RWMutex + + // Ticker for controlling write intervals + ticker *time.Ticker + + // cancel is used to cancel the context + cancel *context.CancelFunc } // NewBlobWriter creates a new BlobWriter instance. func NewBlobWriter( + name string, ctx *context.Context, config *config.BlobWriterConfig, waitGroup *sync.WaitGroup, @@ -69,6 +82,7 @@ func NewBlobWriter( } return BlobWriter{ + name: name, ctx: ctx, waitGroup: waitGroup, logger: logger, @@ -83,40 +97,75 @@ func NewBlobWriter( // Start begins the blob writer goroutine. func (writer *BlobWriter) Start() { - writer.logger.Info("Starting blob writer") + writer.logger.Info("Starting blob writer", "name", writer.name) writer.waitGroup.Add(1) - ticker := time.NewTicker(writer.config.WriteRequestInterval) + writer.configMutex.Lock() + writer.ticker = time.NewTicker(writer.config.WriteRequestInterval) + writer.configMutex.Unlock() go func() { defer writer.waitGroup.Done() - defer ticker.Stop() + defer writer.ticker.Stop() for { select { case <-(*writer.ctx).Done(): - writer.logger.Info("context cancelled, stopping blob writer") + writer.logger.Info("context cancelled, stopping blob writer", "name", writer.name) return - case <-ticker.C: + case <-writer.ticker.C: if err := writer.writeNextBlob(); err != nil { - writer.logger.Error("failed to write blob", "err", err) + writer.logger.Error("failed to write blob", "name", writer.name, "err", err) } } } }() } +// UpdateConfig updates the writer's configuration +func (writer *BlobWriter) UpdateConfig(config *config.BlobWriterConfig) { + writer.configMutex.Lock() + defer writer.configMutex.Unlock() + + // Update the ticker if the interval changed + if writer.config.WriteRequestInterval != config.WriteRequestInterval { + writer.ticker.Reset(config.WriteRequestInterval) + } + + // Update the fixed random data if needed + if writer.config.RandomizeBlobs != config.RandomizeBlobs || writer.config.DataSize != config.DataSize { + if config.RandomizeBlobs { + writer.fixedRandomData = nil + } else { + writer.fixedRandomData = make([]byte, config.DataSize) + _, err := rand.Read(writer.fixedRandomData) + if err != nil { + writer.logger.Error("failed to generate new fixed random data", "name", writer.name, "err", err) + return + } + writer.fixedRandomData = codec.ConvertByPaddingEmptyByte(writer.fixedRandomData) + } + } + + writer.config = config + writer.logger.Info("Updated blob writer configuration", + "name", writer.name, + "writeInterval", config.WriteRequestInterval, + "dataSize", config.DataSize, + "randomizeBlobs", config.RandomizeBlobs) +} + // writeNextBlob attempts to send a random blob to the disperser. func (writer *BlobWriter) writeNextBlob() error { data, err := writer.getRandomData() if err != nil { - writer.logger.Error("failed to get random data", "err", err) + writer.logger.Error("failed to get random data", "name", writer.name, "err", err) return err } start := time.Now() _, err = writer.sendRequest(data) if err != nil { writer.writeFailureMetric.Increment() - writer.logger.Error("failed to send blob request", "err", err) + writer.logger.Error("failed to send blob request", "name", writer.name, "err", err) return err } @@ -130,6 +179,9 @@ func (writer *BlobWriter) writeNextBlob() error { // getRandomData returns a slice of random data to be used for a blob. func (writer *BlobWriter) getRandomData() ([]byte, error) { + writer.configMutex.RLock() + defer writer.configMutex.RUnlock() + if writer.fixedRandomData != nil { return writer.fixedRandomData, nil } @@ -146,23 +198,28 @@ func (writer *BlobWriter) getRandomData() ([]byte, error) { // sendRequest sends a blob to a disperser. func (writer *BlobWriter) sendRequest(data []byte) (key v2.BlobKey, err error) { - ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) + writer.configMutex.RLock() + writeTimeout := writer.config.WriteTimeout + customQuorums := writer.config.CustomQuorums + writer.configMutex.RUnlock() + + ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writeTimeout) defer cancel() - writer.logger.Info("sending blob request", "size", len(data)) + writer.logger.Info("sending blob request", "name", writer.name, "size", len(data)) status, key, err := writer.disperser.DisperseBlob( ctxTimeout, data, 0, - writer.config.CustomQuorums, + customQuorums, 0, ) if err != nil { - writer.logger.Error("failed to send blob request", "err", err) + writer.logger.Error("failed to send blob request", "name", writer.name, "err", err) return } - writer.logger.Info("blob request sent", "key", key.Hex(), "status", status.String()) + writer.logger.Info("blob request sent", "name", writer.name, "key", key.Hex(), "status", status.String()) return } diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 6e89f8778d..3f2e2cff21 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -45,6 +45,7 @@ func TestBlobWriter(t *testing.T) { generatorMetrics := metrics.NewMockMetrics() writer := NewBlobWriter( + "test-writer", &ctx, config, &waitGroup, From a8cdba3e21ac9bb5b2c3fa1917bbb20078b73b95 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 23:12:31 -0800 Subject: [PATCH 12/14] fix lint --- tools/traffic/cmd2/main.go | 4 +++- tools/traffic/generator_v2.go | 4 +++- tools/traffic/workers/blob_writer.go | 3 --- tools/traffic/workers/blob_writer_test.go | 7 ++++++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/tools/traffic/cmd2/main.go b/tools/traffic/cmd2/main.go index 98fe5a9da3..6c9a412ebd 100644 --- a/tools/traffic/cmd2/main.go +++ b/tools/traffic/cmd2/main.go @@ -58,7 +58,9 @@ func trafficGeneratorMain(ctx *cli.Context) error { return err case sig := <-sigChan: fmt.Printf("\nReceived signal %v, shutting down...\n", sig) - generator.Stop() + if err := generator.Stop(); err != nil { + fmt.Printf("Failed to stop generator: %v\n", err) + } return nil } } diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 4ae91e7835..5772a3502d 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -202,7 +202,9 @@ func (generator *Generator) handleConfigUpdate(runtimeConfig *trafficconfig.Runt // Start instantiates goroutines that generate read/write traffic. func (generator *Generator) Start() error { // Start metrics server - generator.generatorMetrics.Start() + if err := generator.generatorMetrics.Start(); err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) + } // Start runtime config watcher if configured if generator.configManager != nil { diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 029e17483a..03f759cd77 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -52,9 +52,6 @@ type BlobWriter struct { // Ticker for controlling write intervals ticker *time.Ticker - - // cancel is used to cancel the context - cancel *context.CancelFunc } // NewBlobWriter creates a new BlobWriter instance. diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 3f2e2cff21..2d0bfab82d 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -82,7 +82,12 @@ func TestBlobWriter(t *testing.T) { ).Return(&status, keyToReturn, errorToReturn) // Simulate the advancement of time (i.e. allow the writer to write the next blob). - writer.writeNextBlob() + err = writer.writeNextBlob() + if errorToReturn != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } disperserClient.mock.AssertNumberOfCalls(t, "DisperseBlob", 1) From 8e629888937117579aa9ea2744ed17483d65072a Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 23:21:18 -0800 Subject: [PATCH 13/14] remove unused import --- tools/traffic/config/config.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 14adcc5952..1b21c4db99 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -5,7 +5,6 @@ import ( "github.com/Layr-Labs/eigenda/api/clients/v2" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core/thegraph" "github.com/urfave/cli" ) @@ -20,9 +19,6 @@ type Config struct { // Signer private key SignerPrivateKey string - // Configuration for the graph. - TheGraphConfig *thegraph.Config - // The port at which the metrics server listens for HTTP requests. MetricsHTTPPort string From 800a3797f599a37b613940bb454de15d0785ad7b Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:00:22 -0800 Subject: [PATCH 14/14] Update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b5de6d4e7b..1aab1d2958 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( golang.org/x/sync v0.8.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.64.1 + gopkg.in/yaml.v2 v2.4.0 ) require ( @@ -156,7 +157,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect ) require (