diff --git a/disperser/cmd/controller/config.go b/disperser/cmd/controller/config.go index 00483ee202..3f146631b5 100644 --- a/disperser/cmd/controller/config.go +++ b/disperser/cmd/controller/config.go @@ -63,19 +63,21 @@ func NewConfig(ctx *cli.Context) (Config, error) { AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), LoggerConfig: *loggerConfig, EncodingManagerConfig: controller.EncodingManagerConfig{ - PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name), - EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name), - StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name), - NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), - NumRelayAssignment: uint16(numRelayAssignments), - AvailableRelays: relays, - EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name), + PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name), + EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name), + StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name), + NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), + NumRelayAssignment: uint16(numRelayAssignments), + AvailableRelays: relays, + EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name), + MaxNumBlobsPerIteration: int32(ctx.GlobalInt(flags.MaxNumBlobsPerIterationFlag.Name)), }, DispatcherConfig: controller.DispatcherConfig{ PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name), FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name), NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name), NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name), + MaxBatchSize: int32(ctx.GlobalInt(flags.MaxBatchSizeFlag.Name)), }, NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name), NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name), diff --git a/disperser/cmd/controller/flags/flags.go b/disperser/cmd/controller/flags/flags.go index 4b6c73173d..71654cc4d3 100644 --- a/disperser/cmd/controller/flags/flags.go +++ b/disperser/cmd/controller/flags/flags.go @@ -102,6 +102,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"), Value: 250, } + MaxNumBlobsPerIterationFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-num-blobs-per-iteration"), + Usage: "Max number of blobs to encode in a single iteration", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_BLOBS_PER_ITERATION"), + Value: 128, + } // Dispatcher Flags DispatcherPullIntervalFlag = cli.DurationFlag{ @@ -150,6 +157,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_NUM_ENTRIES"), Value: 400, } + MaxBatchSizeFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-batch-size"), + Usage: "Max number of blobs to disperse in a batch", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BATCH_SIZE"), + Value: 128, + } ) var requiredFlags = []cli.Flag{ @@ -160,6 +174,7 @@ var requiredFlags = []cli.Flag{ EncodingPullIntervalFlag, AvailableRelaysFlag, EncoderAddressFlag, + DispatcherPullIntervalFlag, NodeRequestTimeoutFlag, NumConnectionsToNodesFlag, @@ -172,10 +187,13 @@ var optionalFlags = []cli.Flag{ NumEncodingRetriesFlag, NumRelayAssignmentFlag, NumConcurrentEncodingRequestsFlag, + MaxNumBlobsPerIterationFlag, + FinalizationBlockDelayFlag, NumRequestRetriesFlag, NumConcurrentDispersalRequestsFlag, NodeClientCacheNumEntriesFlag, + MaxBatchSizeFlag, } var Flags []cli.Flag diff --git a/disperser/cmd/controller/main.go b/disperser/cmd/controller/main.go index f3d21bb340..347f0f8c0c 100644 --- a/disperser/cmd/controller/main.go +++ b/disperser/cmd/controller/main.go @@ -82,7 +82,7 @@ func RunController(ctx *cli.Context) error { } encodingPool := workerpool.New(config.NumConcurrentEncodingRequests) encodingManager, err := controller.NewEncodingManager( - config.EncodingManagerConfig, + &config.EncodingManagerConfig, blobMetadataStore, encodingPool, encoderClient, @@ -131,7 +131,7 @@ func RunController(ctx *cli.Context) error { return fmt.Errorf("failed to create node client manager: %v", err) } dispatcher, err := controller.NewDispatcher( - config.DispatcherConfig, + &config.DispatcherConfig, blobMetadataStore, dispatcherPool, ics, diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index 0bdec5a1a4..bbde17ccbe 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -50,6 +50,11 @@ var ( ErrInvalidStateTransition = errors.New("invalid state transition") ) +type StatusIndexCursor struct { + BlobKey *corev2.BlobKey + UpdatedAt uint64 +} + // BlobMetadataStore is a blob metadata storage backed by DynamoDB type BlobMetadataStore struct { dynamoDBClient commondynamodb.Client @@ -123,6 +128,19 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2 return err } +func (s *BlobMetadataStore) DeleteBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) error { + err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + }) + + return err +} + func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) (*v2.BlobMetadata, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ @@ -151,6 +169,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2. // GetBlobMetadataByStatus returns all the metadata with the given status that were updated after lastUpdatedAt // Because this function scans the entire index, it should only be used for status with a limited number of items. +// Results are ordered by UpdatedAt in ascending order. func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) { items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{ ":status": &types.AttributeValueMemberN{ @@ -174,6 +193,110 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status return metadata, nil } +// GetBlobMetadataByStatusPaginated returns all the metadata with the given status that were updated after the given cursor. +// It also returns a new cursor (last evaluated key) to be used for the next page +// even when there are no more results or there are no results at all. +// This cursor can be used to get new set of results when they become available. +// Therefore, it's possible to get an empty result from a request with exclusive start key returned from previous response. +func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated( + ctx context.Context, + status v2.BlobStatus, + exclusiveStartKey *StatusIndexCursor, + limit int32, +) ([]*v2.BlobMetadata, *StatusIndexCursor, error) { + var cursor map[string]types.AttributeValue + if exclusiveStartKey != nil { + pk := blobKeyPrefix + if exclusiveStartKey.BlobKey != nil && len(exclusiveStartKey.BlobKey) == 32 { + pk = blobKeyPrefix + exclusiveStartKey.BlobKey.Hex() + } + cursor = map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: pk, + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + "UpdatedAt": &types.AttributeValueMemberN{ + Value: strconv.FormatUint(exclusiveStartKey.UpdatedAt, 10), + }, + "BlobStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + } + } else { + cursor = map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix, + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + "UpdatedAt": &types.AttributeValueMemberN{ + Value: "0", + }, + "BlobStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + } + } + res, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }, limit, cursor) + if err != nil { + return nil, nil, err + } + + // No results + if len(res.Items) == 0 && res.LastEvaluatedKey == nil { + // return the same cursor + return nil, exclusiveStartKey, nil + } + + metadata := make([]*v2.BlobMetadata, 0, len(res.Items)) + for _, item := range res.Items { + m, err := UnmarshalBlobMetadata(item) + // Skip invalid/corrupt items + if err != nil { + s.logger.Errorf("failed to unmarshal blob metadata: %v", err) + continue + } + metadata = append(metadata, m) + } + + lastEvaludatedKey := res.LastEvaluatedKey + if lastEvaludatedKey == nil { + lastItem := res.Items[len(res.Items)-1] + updatedAt, err := strconv.ParseUint(lastItem["UpdatedAt"].(*types.AttributeValueMemberN).Value, 10, 64) + if err != nil { + return nil, nil, err + } + bk, err := UnmarshalBlobKey(lastItem) + if err != nil { + return nil, nil, err + } + return metadata, &StatusIndexCursor{ + BlobKey: &bk, + UpdatedAt: updatedAt, + }, nil + } + + newCursor := StatusIndexCursor{} + err = attributevalue.UnmarshalMap(lastEvaludatedKey, &newCursor) + if err != nil { + return nil, nil, err + } + bk, err := UnmarshalBlobKey(lastEvaludatedKey) + if err != nil { + return nil, nil, err + } + newCursor.BlobKey = &bk + + return metadata, &newCursor, nil +} + // GetBlobMetadataCountByStatus returns the count of all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { @@ -203,6 +326,19 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co return err } +func (s *BlobMetadataStore) DeleteBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) error { + err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobCertSK, + }, + }) + + return err +} + func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) (*corev2.BlobCertificate, *encoding.FragmentInfo, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ @@ -357,6 +493,19 @@ func (s *BlobMetadataStore) PutBatchHeader(ctx context.Context, batchHeader *cor return err } +func (s *BlobMetadataStore) DeleteBatchHeader(ctx context.Context, batchHeaderHash [32]byte) error { + err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: batchHeaderKeyPrefix + hex.EncodeToString(batchHeaderHash[:]), + }, + "SK": &types.AttributeValueMemberS{ + Value: batchHeaderSK, + }, + }) + + return err +} + func (s *BlobMetadataStore) GetBatchHeader(ctx context.Context, batchHeaderHash [32]byte) (*corev2.BatchHeader, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index 06f739bfd0..081b1d38d9 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -2,6 +2,7 @@ package blobstore_test import ( "context" + "crypto/rand" "encoding/hex" "errors" "math/big" @@ -20,35 +21,13 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestBlobMetadataStoreOperations(t *testing.T) { ctx := context.Background() - blobHeader1 := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - } - blobKey1, err := blobHeader1.BlobKey() - assert.NoError(t, err) - blobHeader2 := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{1}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x456", - BinIndex: 2, - CumulativePayment: big.NewInt(999), - }, - } - blobKey2, err := blobHeader2.BlobKey() - assert.NoError(t, err) - + blobKey1, blobHeader1 := newBlob(t) + blobKey2, blobHeader2 := newBlob(t) now := time.Now() metadata1 := &v2.BlobMetadata{ BlobHeader: blobHeader1, @@ -64,7 +43,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) { NumRetries: 0, UpdatedAt: uint64(now.UnixNano()), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) assert.NoError(t, err) err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) assert.NoError(t, err) @@ -110,21 +89,90 @@ func TestBlobMetadataStoreOperations(t *testing.T) { }) } +func TestBlobMetadataStoreGetBlobMetadataByStatusPaginated(t *testing.T) { + ctx := context.Background() + numBlobs := 103 + pageSize := 10 + keys := make([]corev2.BlobKey, numBlobs) + headers := make([]*corev2.BlobHeader, numBlobs) + metadataList := make([]*v2.BlobMetadata, numBlobs) + dynamoKeys := make([]commondynamodb.Key, numBlobs) + expectedCursors := make([]*blobstore.StatusIndexCursor, 0) + for i := 0; i < numBlobs; i++ { + blobKey, blobHeader := newBlob(t) + now := time.Now() + metadata := &v2.BlobMetadata{ + BlobHeader: blobHeader, + BlobStatus: v2.Encoded, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + + err := blobMetadataStore.PutBlobMetadata(ctx, metadata) + require.NoError(t, err) + keys[i] = blobKey + headers[i] = blobHeader + dynamoKeys[i] = commondynamodb.Key{ + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + } + metadataList[i] = metadata + if (i+1)%pageSize == 0 { + expectedCursors = append(expectedCursors, &blobstore.StatusIndexCursor{ + BlobKey: &blobKey, + UpdatedAt: metadata.UpdatedAt, + }) + } + } + + // Querying blobs in Queued status should return 0 results + cursor := &blobstore.StatusIndexCursor{ + BlobKey: nil, + UpdatedAt: 0, + } + metadata, newCursor, err := blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Queued, cursor, 10) + require.NoError(t, err) + require.Len(t, metadata, 0) + require.Equal(t, cursor, newCursor) + + // Querying blobs in Encoded status should return results + cursor = &blobstore.StatusIndexCursor{ + BlobKey: nil, + UpdatedAt: 0, + } + i := 0 + numIterations := (numBlobs + pageSize - 1) / pageSize + for i < numIterations { + metadata, cursor, err = blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, cursor, int32(pageSize)) + require.NoError(t, err) + if i < len(expectedCursors) { + require.Len(t, metadata, pageSize) + require.NotNil(t, cursor) + require.Equal(t, cursor.BlobKey, expectedCursors[i].BlobKey) + require.Equal(t, cursor.UpdatedAt, expectedCursors[i].UpdatedAt) + } else { + require.Len(t, metadata, numBlobs%pageSize) + require.Equal(t, cursor.BlobKey, &keys[numBlobs-1]) + require.Equal(t, cursor.UpdatedAt, metadataList[numBlobs-1].UpdatedAt) + } + i++ + } + lastCursor := cursor + metadata, cursor, err = blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, cursor, int32(pageSize)) + require.NoError(t, err) + require.Len(t, metadata, 0) + require.Equal(t, cursor, lastCursor) + + deleteItems(t, dynamoKeys) +} + func TestBlobMetadataStoreCerts(t *testing.T) { ctx := context.Background() + blobKey, blobHeader := newBlob(t) blobCert := &corev2.BlobCertificate{ - BlobHeader: &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - Signature: []byte("signature"), - }, - RelayKeys: []corev2.RelayKey{0, 2, 4}, + BlobHeader: blobHeader, + RelayKeys: []corev2.RelayKey{0, 2, 4}, } fragmentInfo := &encoding.FragmentInfo{ TotalChunkSizeBytes: 100, @@ -133,8 +181,6 @@ func TestBlobMetadataStoreCerts(t *testing.T) { err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo) assert.NoError(t, err) - blobKey, err := blobCert.BlobHeader.BlobKey() - assert.NoError(t, err) fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey) assert.NoError(t, err) assert.Equal(t, blobCert, fetchedCert) @@ -142,18 +188,8 @@ func TestBlobMetadataStoreCerts(t *testing.T) { // blob cert with the same key should fail blobCert1 := &corev2.BlobCertificate{ - BlobHeader: &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - Signature: []byte("signature"), - }, - RelayKeys: []corev2.RelayKey{0}, + BlobHeader: blobHeader, + RelayKeys: []corev2.RelayKey{0}, } err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo) assert.ErrorIs(t, err, common.ErrAlreadyExists) @@ -207,18 +243,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) { func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) { ctx := context.Background() - blobHeader := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - } - blobKey, err := blobHeader.BlobKey() - assert.NoError(t, err) + blobKey, blobHeader := newBlob(t) now := time.Now() metadata := &v2.BlobMetadata{ @@ -228,7 +253,7 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) { NumRetries: 0, UpdatedAt: uint64(now.UnixNano()), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadata) + err := blobMetadataStore.PutBlobMetadata(ctx, metadata) assert.NoError(t, err) // Update the blob status to invalid status @@ -471,3 +496,31 @@ func deleteItems(t *testing.T, keys []commondynamodb.Key) { assert.NoError(t, err) assert.Len(t, failed, 0) } + +func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) { + accountBytes := make([]byte, 32) + _, err := rand.Read(accountBytes) + require.NoError(t, err) + accountID := hex.EncodeToString(accountBytes) + binIndex, err := rand.Int(rand.Reader, big.NewInt(256)) + require.NoError(t, err) + cumulativePayment, err := rand.Int(rand.Reader, big.NewInt(1024)) + require.NoError(t, err) + sig := make([]byte, 32) + _, err = rand.Read(sig) + require.NoError(t, err) + bh := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: accountID, + BinIndex: uint32(binIndex.Int64()), + CumulativePayment: cumulativePayment, + }, + Signature: sig, + } + bk, err := bh.BlobKey() + require.NoError(t, err) + return bk, bh +} diff --git a/disperser/controller/controller_test.go b/disperser/controller/controller_test.go index 3c7fc55abd..dd1f0efa86 100644 --- a/disperser/controller/controller_test.go +++ b/disperser/controller/controller_test.go @@ -2,6 +2,8 @@ package controller_test import ( "context" + "crypto/rand" + "encoding/hex" "fmt" "math/big" "os" @@ -11,6 +13,8 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/dynamodb" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/common/aws/s3" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/inabox/deploy" @@ -19,6 +23,7 @@ import ( "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/google/uuid" "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" ) var ( @@ -148,3 +153,31 @@ func teardown() { deploy.PurgeDockertestResources(dockertestPool, dockertestResource) } } + +func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) { + accountBytes := make([]byte, 32) + _, err := rand.Read(accountBytes) + require.NoError(t, err) + accountID := hex.EncodeToString(accountBytes) + binIndex, err := rand.Int(rand.Reader, big.NewInt(256)) + require.NoError(t, err) + cumulativePayment, err := rand.Int(rand.Reader, big.NewInt(1024)) + require.NoError(t, err) + sig := make([]byte, 32) + _, err = rand.Read(sig) + require.NoError(t, err) + bh := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0, 1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: accountID, + BinIndex: uint32(binIndex.Int64()), + CumulativePayment: cumulativePayment, + }, + Signature: sig, + } + bk, err := bh.BlobKey() + require.NoError(t, err) + return bk, bh +} diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 8319b585b5..c5b58ca91b 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -25,10 +25,12 @@ type DispatcherConfig struct { FinalizationBlockDelay uint64 NodeRequestTimeout time.Duration NumRequestRetries int + // MaxBatchSize is the maximum number of blobs to dispatch in a batch + MaxBatchSize int32 } type Dispatcher struct { - DispatcherConfig + *DispatcherConfig blobMetadataStore *blobstore.BlobMetadataStore pool common.WorkerPool @@ -37,7 +39,7 @@ type Dispatcher struct { nodeClientManager NodeClientManager logger logging.Logger - lastUpdatedAt uint64 + cursor *blobstore.StatusIndexCursor } type batchData struct { @@ -48,7 +50,7 @@ type batchData struct { } func NewDispatcher( - config DispatcherConfig, + config *DispatcherConfig, blobMetadataStore *blobstore.BlobMetadataStore, pool common.WorkerPool, chainState core.IndexedChainState, @@ -56,6 +58,12 @@ func NewDispatcher( nodeClientManager NodeClientManager, logger logging.Logger, ) (*Dispatcher, error) { + if config == nil { + return nil, errors.New("config is required") + } + if config.PullInterval == 0 || config.NodeRequestTimeout == 0 || config.MaxBatchSize == 0 { + return nil, errors.New("invalid config") + } return &Dispatcher{ DispatcherConfig: config, @@ -66,7 +74,7 @@ func NewDispatcher( nodeClientManager: nodeClientManager, logger: logger.With("component", "Dispatcher"), - lastUpdatedAt: 0, + cursor: nil, }, nil } @@ -228,7 +236,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, // NewBatch creates a batch of blobs to dispatch // Warning: This function is not thread-safe func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) (*batchData, error) { - blobMetadatas, err := d.blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Encoded, d.lastUpdatedAt) + blobMetadatas, cursor, err := d.blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, d.cursor, d.MaxBatchSize) if err != nil { return nil, fmt.Errorf("failed to get blob metadata by status: %w", err) } @@ -243,7 +251,6 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) } keys := make([]corev2.BlobKey, len(blobMetadatas)) - newLastUpdatedAt := d.lastUpdatedAt for i, metadata := range blobMetadatas { if metadata == nil || metadata.BlobHeader == nil { return nil, fmt.Errorf("invalid blob metadata") @@ -253,9 +260,6 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to get blob key: %w", err) } keys[i] = blobKey - if metadata.UpdatedAt > newLastUpdatedAt { - newLastUpdatedAt = metadata.UpdatedAt - } } certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys) @@ -344,7 +348,10 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to put blob verification infos: %w", err) } - d.lastUpdatedAt = newLastUpdatedAt + if cursor != nil { + d.cursor = cursor + } + return &batchData{ Batch: &corev2.Batch{ BatchHeader: batchHeader, diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go index 8630eda061..8ded2507be 100644 --- a/disperser/controller/dispatcher_test.go +++ b/disperser/controller/dispatcher_test.go @@ -2,8 +2,6 @@ package controller_test import ( "context" - "crypto/rand" - "encoding/hex" "math/big" "testing" "time" @@ -39,6 +37,7 @@ var ( }, }) finalizationBlockDelay = uint64(10) + maxBatchSize = int32(5) ) type dispatcherComponents struct { @@ -111,6 +110,29 @@ func TestDispatcherHandleBatch(t *testing.T) { require.Len(t, att.QuorumAPKs, 2) require.NotNil(t, att.Sigma) require.ElementsMatch(t, att.QuorumNumbers, []core.QuorumID{0, 1}) + + deleteBlobs(t, components.BlobMetadataStore, objs.blobKeys, [][32]byte{bhh}) +} + +func TestDispatcherMaxBatchSize(t *testing.T) { + components := newDispatcherComponents(t) + numBlobs := 12 + objs := setupBlobCerts(t, components.BlobMetadataStore, numBlobs) + ctx := context.Background() + expectedNumBatches := (numBlobs + int(maxBatchSize) - 1) / int(maxBatchSize) + for i := 0; i < expectedNumBatches; i++ { + batchData, err := components.Dispatcher.NewBatch(ctx, blockNumber) + require.NoError(t, err) + if i < expectedNumBatches-1 { + require.Len(t, batchData.Batch.BlobCertificates, int(maxBatchSize)) + } else { + require.Len(t, batchData.Batch.BlobCertificates, numBlobs%int(maxBatchSize)) + } + } + _, err := components.Dispatcher.NewBatch(ctx, blockNumber) + require.ErrorContains(t, err, "no blobs to dispatch") + + deleteBlobs(t, components.BlobMetadataStore, objs.blobKeys, nil) } func TestDispatcherNewBatch(t *testing.T) { @@ -167,6 +189,8 @@ func TestDispatcherNewBatch(t *testing.T) { // Attempt to create a batch with the same blobs _, err = components.Dispatcher.NewBatch(ctx, blockNumber) require.ErrorContains(t, err, "no blobs to dispatch") + + deleteBlobs(t, components.BlobMetadataStore, objs.blobKeys, [][32]byte{bhh}) } func TestDispatcherBuildMerkleTree(t *testing.T) { @@ -238,25 +262,7 @@ func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore metadatas := make([]*v2.BlobMetadata, numObjects) certs := make([]*corev2.BlobCertificate, numObjects) for i := 0; i < numObjects; i++ { - randomBytes := make([]byte, 16) - _, err := rand.Read(randomBytes) - require.NoError(t, err) - randomBinIndex, err := rand.Int(rand.Reader, big.NewInt(1000)) - require.NoError(t, err) - binIndex := uint32(randomBinIndex.Uint64()) - headers[i] = &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0, 1}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: hex.EncodeToString(randomBytes), - BinIndex: binIndex, - CumulativePayment: big.NewInt(532), - }, - } - key, err := headers[i].BlobKey() - require.NoError(t, err) - keys[i] = key + keys[i], headers[i] = newBlob(t) now := time.Now() metadatas[i] = &v2.BlobMetadata{ BlobHeader: headers[i], @@ -265,7 +271,7 @@ func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore NumRetries: 0, UpdatedAt: uint64(now.UnixNano()) - uint64(i), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadatas[i]) + err := blobMetadataStore.PutBlobMetadata(ctx, metadatas[i]) require.NoError(t, err) certs[i] = &corev2.BlobCertificate{ @@ -284,6 +290,21 @@ func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore } } +func deleteBlobs(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, keys []corev2.BlobKey, batchHeaderHashes [][32]byte) { + ctx := context.Background() + for _, key := range keys { + err := blobMetadataStore.DeleteBlobMetadata(ctx, key) + require.NoError(t, err) + err = blobMetadataStore.DeleteBlobCertificate(ctx, key) + require.NoError(t, err) + } + + for _, bhh := range batchHeaderHashes { + err := blobMetadataStore.DeleteBatchHeader(ctx, bhh) + require.NoError(t, err) + } +} + func newDispatcherComponents(t *testing.T) *dispatcherComponents { // logger := logging.NewNoopLogger() logger, err := common.NewLogger(common.DefaultLoggerConfig()) @@ -296,11 +317,12 @@ func newDispatcherComponents(t *testing.T) *dispatcherComponents { require.NoError(t, err) nodeClientManager := &controller.MockClientManager{} mockChainState.On("GetCurrentBlockNumber").Return(uint(blockNumber), nil) - d, err := controller.NewDispatcher(controller.DispatcherConfig{ + d, err := controller.NewDispatcher(&controller.DispatcherConfig{ PullInterval: 1 * time.Second, FinalizationBlockDelay: finalizationBlockDelay, NodeRequestTimeout: 1 * time.Second, NumRequestRetries: 3, + MaxBatchSize: maxBatchSize, }, blobMetadataStore, pool, mockChainState, agg, nodeClientManager, logger) require.NoError(t, err) return &dispatcherComponents{ diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 49b6aca2b4..551f6af14f 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -34,13 +34,15 @@ type EncodingManagerConfig struct { AvailableRelays []corev2.RelayKey // EncoderAddress is the address of the encoder EncoderAddress string + // MaxNumBlobsPerIteration is the maximum number of blobs to encode per iteration + MaxNumBlobsPerIteration int32 } // EncodingManager is responsible for pulling queued blobs from the blob // metadata store periodically and encoding them. It receives the encoder responses // and creates BlobCertificates. type EncodingManager struct { - EncodingManagerConfig + *EncodingManagerConfig // components blobMetadataStore *blobstore.BlobMetadataStore @@ -50,17 +52,22 @@ type EncodingManager struct { logger logging.Logger // state - lastUpdatedAt uint64 + cursor *blobstore.StatusIndexCursor } func NewEncodingManager( - config EncodingManagerConfig, + config *EncodingManagerConfig, blobMetadataStore *blobstore.BlobMetadataStore, pool common.WorkerPool, encodingClient disperser.EncoderClientV2, chainReader core.Reader, logger logging.Logger, ) (*EncodingManager, error) { + if config.NumRelayAssignment < 1 || + len(config.AvailableRelays) == 0 || + config.MaxNumBlobsPerIteration < 1 { + return nil, fmt.Errorf("invalid encoding manager config") + } if int(config.NumRelayAssignment) > len(config.AvailableRelays) { return nil, fmt.Errorf("NumRelayAssignment (%d) cannot be greater than NumRelays (%d)", config.NumRelayAssignment, len(config.AvailableRelays)) } @@ -72,7 +79,7 @@ func NewEncodingManager( chainReader: chainReader, logger: logger.With("component", "EncodingManager"), - lastUpdatedAt: 0, + cursor: nil, }, nil } @@ -102,7 +109,7 @@ func (e *EncodingManager) Start(ctx context.Context) error { func (e *EncodingManager) HandleBatch(ctx context.Context) error { // Get a batch of blobs to encode - blobMetadatas, err := e.blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, e.lastUpdatedAt) + blobMetadatas, cursor, err := e.blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Queued, e.cursor, e.MaxNumBlobsPerIteration) if err != nil { return err } @@ -118,7 +125,6 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { e.logger.Error("failed to get blob key", "err", err, "requestedAt", blob.RequestedAt, "paymentMetadata", blob.BlobHeader.PaymentMetadata) continue } - e.lastUpdatedAt = blob.UpdatedAt // Encode the blobs e.pool.Submit(func() { @@ -171,6 +177,9 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { }) } + if cursor != nil { + e.cursor = cursor + } return nil } diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index f1e652726f..4724cf410c 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -2,12 +2,11 @@ package controller_test import ( "context" - "math/big" "testing" "time" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/core" + commonmock "github.com/Layr-Labs/eigenda/common/mock" coremock "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" dispcommon "github.com/Layr-Labs/eigenda/disperser/common" @@ -19,6 +18,7 @@ import ( "github.com/gammazero/workerpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) var ( @@ -30,6 +30,7 @@ type testComponents struct { Pool common.WorkerPool EncodingClient *dispmock.MockEncoderClientV2 ChainReader *coremock.MockWriter + MockPool *commonmock.MockWorkerpool } func TestGetRelayKeys(t *testing.T) { @@ -76,16 +77,16 @@ func TestGetRelayKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { got, err := controller.GetRelayKeys(tt.numRelays, tt.availableRelays) if err != nil { - assert.Error(t, err) + require.Error(t, err) } else { - assert.NoError(t, tt.err) - assert.Len(t, got, int(tt.numRelays)) + require.NoError(t, tt.err) + require.Len(t, got, int(tt.numRelays)) seen := make(map[corev2.RelayKey]struct{}) for _, relay := range got { - assert.Contains(t, tt.availableRelays, relay) + require.Contains(t, tt.availableRelays, relay) seen[relay] = struct{}{} } - assert.Equal(t, len(seen), len(got)) + require.Equal(t, len(seen), len(got)) } }) } @@ -93,18 +94,7 @@ func TestGetRelayKeys(t *testing.T) { func TestEncodingManagerHandleBatch(t *testing.T) { ctx := context.Background() - blobHeader1 := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x1234", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - } - blobKey1, err := blobHeader1.BlobKey() - assert.NoError(t, err) + blobKey1, blobHeader1 := newBlob(t) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1, @@ -113,55 +103,106 @@ func TestEncodingManagerHandleBatch(t *testing.T) { NumRetries: 0, UpdatedAt: uint64(now.UnixNano()), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) - assert.NoError(t, err) + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + require.NoError(t, err) - c := newTestComponents(t) + c := newTestComponents(t, false) c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{ TotalChunkSizeBytes: 100, FragmentSizeBytes: 1024 * 1024 * 4, }, nil) err = c.EncodingManager.HandleBatch(ctx) - assert.NoError(t, err) + require.NoError(t, err) c.Pool.StopWait() fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) - assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) + require.NoError(t, err) + require.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) + require.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, fetchedCert.BlobHeader, blobHeader1) + require.NoError(t, err) + require.Equal(t, fetchedCert.BlobHeader, blobHeader1) for _, relayKey := range fetchedCert.RelayKeys { - assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) + require.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } - assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) - assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) + require.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) + require.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) + + deleteBlobs(t, blobMetadataStore, []corev2.BlobKey{blobKey1}, nil) +} + +func TestEncodingManagerHandleManyBatches(t *testing.T) { + ctx := context.Background() + numBlobs := 12 + keys := make([]corev2.BlobKey, numBlobs) + headers := make([]*corev2.BlobHeader, numBlobs) + metadata := make([]*commonv2.BlobMetadata, numBlobs) + for i := 0; i < numBlobs; i++ { + keys[i], headers[i] = newBlob(t) + now := time.Now() + metadata[i] = &commonv2.BlobMetadata{ + BlobHeader: headers[i], + BlobStatus: commonv2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + err := blobMetadataStore.PutBlobMetadata(ctx, metadata[i]) + require.NoError(t, err) + } + + c := newTestComponents(t, true) + c.MockPool.On("Submit", mock.Anything).Return(nil).Times(numBlobs + 1) + + numIterations := (numBlobs + int(c.EncodingManager.MaxNumBlobsPerIteration) - 1) / int(c.EncodingManager.MaxNumBlobsPerIteration) + expectedNumTasks := 0 + for i := 0; i < numIterations; i++ { + err := c.EncodingManager.HandleBatch(ctx) + require.NoError(t, err) + if i < numIterations-1 { + expectedNumTasks += int(c.EncodingManager.MaxNumBlobsPerIteration) + c.MockPool.AssertNumberOfCalls(t, "Submit", expectedNumTasks) + } else { + expectedNumTasks += numBlobs % int(c.EncodingManager.MaxNumBlobsPerIteration) + c.MockPool.AssertNumberOfCalls(t, "Submit", expectedNumTasks) + } + } + err := c.EncodingManager.HandleBatch(ctx) + require.ErrorContains(t, err, "no blobs to encode") + + // new record + key, header := newBlob(t) + now := time.Now() + meta := &commonv2.BlobMetadata{ + BlobHeader: header, + BlobStatus: commonv2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + err = blobMetadataStore.PutBlobMetadata(ctx, meta) + require.NoError(t, err) + err = c.EncodingManager.HandleBatch(ctx) + require.NoError(t, err) + c.MockPool.AssertNumberOfCalls(t, "Submit", expectedNumTasks+1) + + deleteBlobs(t, blobMetadataStore, keys, nil) + deleteBlobs(t, blobMetadataStore, []corev2.BlobKey{key}, nil) } func TestEncodingManagerHandleBatchNoBlobs(t *testing.T) { ctx := context.Background() - c := newTestComponents(t) + c := newTestComponents(t, false) + c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) err := c.EncodingManager.HandleBatch(ctx) - assert.ErrorContains(t, err, "no blobs to encode") + require.ErrorContains(t, err, "no blobs to encode") } func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { ctx := context.Background() - blobHeader1 := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x12345", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - } - blobKey1, err := blobHeader1.BlobKey() - assert.NoError(t, err) + blobKey1, blobHeader1 := newBlob(t) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1, @@ -170,10 +211,10 @@ func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { NumRetries: 0, UpdatedAt: uint64(now.UnixNano()), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) - assert.NoError(t, err) + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + require.NoError(t, err) - c := newTestComponents(t) + c := newTestComponents(t, false) c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once() c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{ TotalChunkSizeBytes: 100, @@ -181,39 +222,30 @@ func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { }, nil) err = c.EncodingManager.HandleBatch(ctx) - assert.NoError(t, err) + require.NoError(t, err) c.Pool.StopWait() fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) - assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) + require.NoError(t, err) + require.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) + require.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, fetchedCert.BlobHeader, blobHeader1) + require.NoError(t, err) + require.Equal(t, fetchedCert.BlobHeader, blobHeader1) for _, relayKey := range fetchedCert.RelayKeys { - assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) + require.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } - assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) - assert.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) + require.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) + require.Equal(t, fetchedFragmentInfo.FragmentSizeBytes, uint32(1024*1024*4)) c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) + + deleteBlobs(t, blobMetadataStore, []corev2.BlobKey{blobKey1}, nil) } func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) { ctx := context.Background() - blobHeader1 := &corev2.BlobHeader{ - BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0}, - BlobCommitments: mockCommitment, - PaymentMetadata: core.PaymentMetadata{ - AccountID: "0x123456", - BinIndex: 0, - CumulativePayment: big.NewInt(532), - }, - } - blobKey1, err := blobHeader1.BlobKey() - assert.NoError(t, err) + blobKey1, blobHeader1 := newBlob(t) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1, @@ -222,50 +254,61 @@ func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) { NumRetries: 0, UpdatedAt: uint64(now.UnixNano()), } - err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) - assert.NoError(t, err) + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + require.NoError(t, err) - c := newTestComponents(t) + c := newTestComponents(t, false) c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Twice() err = c.EncodingManager.HandleBatch(ctx) - assert.NoError(t, err) + require.NoError(t, err) c.Pool.StopWait() fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) + require.NoError(t, err) // marked as failed - assert.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus) - assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) + require.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus) + require.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) - assert.ErrorIs(t, err, dispcommon.ErrMetadataNotFound) - assert.Nil(t, fetchedCert) - assert.Nil(t, fetchedFragmentInfo) + require.ErrorIs(t, err, dispcommon.ErrMetadataNotFound) + require.Nil(t, fetchedCert) + require.Nil(t, fetchedFragmentInfo) c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) + + deleteBlobs(t, blobMetadataStore, []corev2.BlobKey{blobKey1}, nil) } -func newTestComponents(t *testing.T) *testComponents { +func newTestComponents(t *testing.T, mockPool bool) *testComponents { logger := logging.NewNoopLogger() // logger, err := common.NewLogger(common.DefaultLoggerConfig()) - // assert.NoError(t, err) - pool := workerpool.New(5) + // require.NoError(t, err) + var pool common.WorkerPool + var mockP *commonmock.MockWorkerpool + if mockPool { + mockP = &commonmock.MockWorkerpool{} + pool = mockP + } else { + pool = workerpool.New(5) + } encodingClient := dispmock.NewMockEncoderClientV2() chainReader := &coremock.MockWriter{} chainReader.On("GetCurrentBlockNumber").Return(blockNumber, nil) - em, err := controller.NewEncodingManager(controller.EncodingManagerConfig{ - PullInterval: 1 * time.Second, - EncodingRequestTimeout: 5 * time.Second, - StoreTimeout: 5 * time.Second, - NumEncodingRetries: 1, - NumRelayAssignment: 2, - AvailableRelays: []corev2.RelayKey{0, 1, 2, 3}, + em, err := controller.NewEncodingManager(&controller.EncodingManagerConfig{ + PullInterval: 1 * time.Second, + EncodingRequestTimeout: 5 * time.Second, + StoreTimeout: 5 * time.Second, + NumEncodingRetries: 1, + NumRelayAssignment: 2, + AvailableRelays: []corev2.RelayKey{0, 1, 2, 3}, + MaxNumBlobsPerIteration: 5, }, blobMetadataStore, pool, encodingClient, chainReader, logger) - assert.NoError(t, err) + require.NoError(t, err) return &testComponents{ EncodingManager: em, Pool: pool, EncodingClient: encodingClient, ChainReader: chainReader, + MockPool: mockP, } }