From 13e31d064f54f0d800ae5bb6b19358d8a86acd7f Mon Sep 17 00:00:00 2001
From: Ian Shim <shim@fastmail.com>
Date: Mon, 21 Oct 2024 16:30:13 -0700
Subject: [PATCH 1/2] v2 encoding manager

---
 common/aws/dynamodb/client.go                 |  43 +++
 common/aws/dynamodb/client_test.go            |  18 ++
 disperser/apiserver/server.go                 |   5 +-
 .../common/blobstore/blob_metadata_store.go   |   5 +-
 disperser/{ => common}/errors.go              |   2 +-
 disperser/common/inmem/store.go               |  25 +-
 disperser/common/v2/blob.go                   |   3 +
 .../v2/blobstore/dynamo_metadata_store.go     | 124 +++++++-
 .../blobstore/dynamo_metadata_store_test.go   |  67 ++++-
 .../common/v2/blobstore/s3_blob_store.go      |   7 +
 disperser/controller/controller_test.go       | 150 ++++++++++
 disperser/controller/encoding_manager.go      | 200 +++++++++++++
 disperser/controller/encoding_manager_test.go | 268 ++++++++++++++++++
 disperser/encoder_client_v2.go                |  12 +
 disperser/mock/encoder_v2.go                  |  29 ++
 encoding/data.go                              |   5 +
 16 files changed, 937 insertions(+), 26 deletions(-)
 rename disperser/{ => common}/errors.go (91%)
 create mode 100644 disperser/controller/controller_test.go
 create mode 100644 disperser/controller/encoding_manager.go
 create mode 100644 disperser/controller/encoding_manager_test.go
 create mode 100644 disperser/encoder_client_v2.go
 create mode 100644 disperser/mock/encoder_v2.go

diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go
index e50c4afbf0..fac3042b98 100644
--- a/common/aws/dynamodb/client.go
+++ b/common/aws/dynamodb/client.go
@@ -170,6 +170,49 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
 	return resp.Attributes, err
 }
 
+func (c *Client) UpdateItemWithCondition(
+	ctx context.Context,
+	tableName string,
+	key Key,
+	item Item,
+	condition expression.ConditionBuilder,
+) (Item, error) {
+	update := expression.UpdateBuilder{}
+	for itemKey, itemValue := range item {
+		// Ignore primary key updates
+		if _, ok := key[itemKey]; ok {
+			continue
+		}
+		update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
+	}
+
+	expr, err := expression.NewBuilder().WithUpdate(update).WithCondition(condition).Build()
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := c.dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{
+		TableName:                 aws.String(tableName),
+		Key:                       key,
+		ConditionExpression:       expr.Condition(),
+		ExpressionAttributeNames:  expr.Names(),
+		ExpressionAttributeValues: expr.Values(),
+		UpdateExpression:          expr.Update(),
+		ReturnValues:              types.ReturnValueUpdatedNew,
+	})
+
+	var ccfe *types.ConditionalCheckFailedException
+	if errors.As(err, &ccfe) {
+		return nil, ErrConditionFailed
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	return resp.Attributes, err
+}
+
 // IncrementBy increments the attribute by the value for item that matches with the key
 func (c *Client) IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error) {
 	// ADD numeric values
diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go
index 4089fa9880..3c21886526 100644
--- a/common/aws/dynamodb/client_test.go
+++ b/common/aws/dynamodb/client_test.go
@@ -14,6 +14,7 @@ import (
 	test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
 	"github.com/Layr-Labs/eigenda/inabox/deploy"
 	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
 	"github.com/ory/dockertest/v3"
@@ -217,6 +218,22 @@ func TestBasicOperations(t *testing.T) {
 	})
 	assert.NoError(t, err)
 
+	// Attempt to update the item with invalid condition
+	_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
+		"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
+	}, commondynamodb.Item{
+		"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
+	}, expression.Name("Status").In(expression.Value("Dispersing")))
+	assert.Error(t, err)
+
+	// Attempt to update the item with valid condition
+	_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
+		"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
+	}, commondynamodb.Item{
+		"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
+	}, expression.Name("Status").In(expression.Value("Confirmed")))
+	assert.NoError(t, err)
+
 	_, err = dynamoClient.IncrementBy(ctx, tableName, commondynamodb.Key{
 		"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
 	}, "BlobSize", 1000)
@@ -231,6 +248,7 @@ func TestBasicOperations(t *testing.T) {
 	assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
 	assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value)
 	assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)
+	assert.Equal(t, "456", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)
 
 	err = dynamoClient.DeleteTable(ctx, tableName)
 	assert.NoError(t, err)
diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go
index cf9b1e8538..2f8ed8007f 100644
--- a/disperser/apiserver/server.go
+++ b/disperser/apiserver/server.go
@@ -22,6 +22,7 @@ import (
 	"github.com/Layr-Labs/eigenda/core/auth"
 	"github.com/Layr-Labs/eigenda/core/meterer"
 	"github.com/Layr-Labs/eigenda/disperser"
+	dispcommon "github.com/Layr-Labs/eigenda/disperser/common"
 	"github.com/Layr-Labs/eigenda/encoding"
 	"github.com/Layr-Labs/eigenda/encoding/rs"
 	"github.com/Layr-Labs/eigensdk-go/logging"
@@ -635,7 +636,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
 	s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
 	metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
 	if err != nil {
-		if errors.Is(err, disperser.ErrMetadataNotFound) {
+		if errors.Is(err, dispcommon.ErrMetadataNotFound) {
 			s.metrics.HandleNotFoundRpcRequest("GetBlobStatus")
 			s.metrics.HandleNotFoundRequest("GetBlobStatus")
 			return nil, api.NewErrorNotFound("no metadata found for the requestID")
@@ -778,7 +779,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
 	blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
 	if err != nil {
 		s.logger.Error("Failed to retrieve blob metadata", "err", err)
-		if errors.Is(err, disperser.ErrMetadataNotFound) {
+		if errors.Is(err, dispcommon.ErrMetadataNotFound) {
 			s.metrics.HandleNotFoundRpcRequest("RetrieveBlob")
 			s.metrics.HandleNotFoundRequest("RetrieveBlob")
 			return nil, api.NewErrorNotFound("no metadata found for the given batch header hash and blob index")
diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go
index 3712e7a2c4..3a4b272e55 100644
--- a/disperser/common/blobstore/blob_metadata_store.go
+++ b/disperser/common/blobstore/blob_metadata_store.go
@@ -8,6 +8,7 @@ import (
 
 	commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
 	"github.com/Layr-Labs/eigenda/disperser"
+	"github.com/Layr-Labs/eigenda/disperser/common"
 	"github.com/Layr-Labs/eigensdk-go/logging"
 	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
@@ -65,7 +66,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey dispers
 	})
 
 	if item == nil {
-		return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey)
+		return nil, fmt.Errorf("%w: metadata not found for key %s", common.ErrMetadataNotFound, blobKey)
 	}
 
 	if err != nil {
@@ -312,7 +313,7 @@ func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHea
 	}
 
 	if len(items) == 0 {
-		return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
+		return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", common.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
 	}
 
 	if len(items) > 1 {
diff --git a/disperser/errors.go b/disperser/common/errors.go
similarity index 91%
rename from disperser/errors.go
rename to disperser/common/errors.go
index 06f5a43c03..ec3eb469e8 100644
--- a/disperser/errors.go
+++ b/disperser/common/errors.go
@@ -1,4 +1,4 @@
-package disperser
+package common
 
 import "errors"
 
diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go
index 791f76f75f..8814975805 100644
--- a/disperser/common/inmem/store.go
+++ b/disperser/common/inmem/store.go
@@ -12,6 +12,7 @@ import (
 
 	"github.com/Layr-Labs/eigenda/core"
 	"github.com/Layr-Labs/eigenda/disperser"
+	"github.com/Layr-Labs/eigenda/disperser/common"
 )
 
 // BlobStore is an in-memory implementation of the BlobStore interface
@@ -75,7 +76,7 @@ func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobH
 	if holder, ok := q.Blobs[blobHash]; ok {
 		return holder.Data, nil
 	} else {
-		return nil, disperser.ErrBlobNotFound
+		return nil, common.ErrBlobNotFound
 	}
 }
 
@@ -93,7 +94,7 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
 	}
 	blobKey := existingMetadata.GetBlobKey()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return nil, disperser.ErrBlobNotFound
+		return nil, common.ErrBlobNotFound
 	}
 	newMetadata := *existingMetadata
 	newMetadata.BlobStatus = disperser.Confirmed
@@ -106,7 +107,7 @@ func (q *BlobStore) MarkBlobDispersing(ctx context.Context, blobKey disperser.Bl
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 	q.Metadata[blobKey].BlobStatus = disperser.Dispersing
 	return nil
@@ -117,7 +118,7 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing
 	defer q.mu.Unlock()
 	blobKey := existingMetadata.GetBlobKey()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return nil, disperser.ErrBlobNotFound
+		return nil, common.ErrBlobNotFound
 	}
 	newMetadata := *existingMetadata
 	newMetadata.BlobStatus = disperser.InsufficientSignatures
@@ -130,7 +131,7 @@ func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.Blo
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 
 	q.Metadata[blobKey].BlobStatus = disperser.Finalized
@@ -141,7 +142,7 @@ func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.Bl
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 
 	q.Metadata[blobKey].BlobStatus = disperser.Processing
@@ -152,7 +153,7 @@ func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKe
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[blobKey]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 
 	q.Metadata[blobKey].BlobStatus = disperser.Failed
@@ -163,7 +164,7 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 
 	q.Metadata[existingMetadata.GetBlobKey()].NumRetries++
@@ -174,7 +175,7 @@ func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingM
 	q.mu.Lock()
 	defer q.mu.Unlock()
 	if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
-		return disperser.ErrBlobNotFound
+		return common.ErrBlobNotFound
 	}
 
 	if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil {
@@ -196,7 +197,7 @@ func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperse
 				Data:          holder.Data,
 			}
 		} else {
-			return nil, disperser.ErrBlobNotFound
+			return nil, common.ErrBlobNotFound
 		}
 	}
 	return blobs, nil
@@ -266,7 +267,7 @@ func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]
 		}
 	}
 
-	return nil, disperser.ErrBlobNotFound
+	return nil, common.ErrBlobNotFound
 }
 
 func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
@@ -327,7 +328,7 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK
 	if meta, ok := q.Metadata[blobKey]; ok {
 		return meta, nil
 	}
-	return nil, disperser.ErrBlobNotFound
+	return nil, common.ErrBlobNotFound
 }
 
 func (q *BlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go
index 9b4f16da38..804f5f6fb7 100644
--- a/disperser/common/v2/blob.go
+++ b/disperser/common/v2/blob.go
@@ -3,6 +3,7 @@ package v2
 import (
 	pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
 	core "github.com/Layr-Labs/eigenda/core/v2"
+	"github.com/Layr-Labs/eigenda/encoding"
 )
 
 type BlobStatus uint
@@ -65,4 +66,6 @@ type BlobMetadata struct {
 	RequestedAt uint64
 	// UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_
 	UpdatedAt uint64
+
+	*encoding.FragmentInfo
 }
diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go
index 2d745ee715..be272b30aa 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go
@@ -10,11 +10,13 @@ import (
 
 	commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
 	core "github.com/Layr-Labs/eigenda/core/v2"
-	"github.com/Layr-Labs/eigenda/disperser"
+	"github.com/Layr-Labs/eigenda/disperser/common"
 	v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
+	"github.com/Layr-Labs/eigenda/encoding"
 	"github.com/Layr-Labs/eigensdk-go/logging"
 	"github.com/aws/aws-sdk-go-v2/aws"
 	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
+	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
 )
@@ -29,6 +31,16 @@ const (
 	blobCertSK     = "BlobCertificate"
 )
 
+var (
+	statusUpdatePrecondition = map[v2.BlobStatus][]v2.BlobStatus{
+		v2.Queued:    {},
+		v2.Encoded:   {v2.Queued},
+		v2.Certified: {v2.Encoded},
+		v2.Failed:    {v2.Queued, v2.Encoded},
+	}
+	ErrInvalidStateTransition = errors.New("invalid state transition")
+)
+
 // BlobMetadataStore is a blob metadata storage backed by DynamoDB
 type BlobMetadataStore struct {
 	dynamoDBClient *commondynamodb.Client
@@ -53,7 +65,107 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v
 
 	err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
 	if errors.Is(err, commondynamodb.ErrConditionFailed) {
-		return disperser.ErrAlreadyExists
+		return common.ErrAlreadyExists
+	}
+
+	return err
+}
+
+func (s *BlobMetadataStore) MarkBlobEncoded(ctx context.Context, blobKey core.BlobKey, fragmentInfo *encoding.FragmentInfo) error {
+	validStatuses := statusUpdatePrecondition[v2.Encoded]
+	if len(validStatuses) == 0 {
+		return fmt.Errorf("%w: invalid status transition to Encoded", ErrInvalidStateTransition)
+	}
+
+	expValues := make([]expression.OperandBuilder, len(validStatuses))
+	for i, validStatus := range validStatuses {
+		expValues[i] = expression.Value(int(validStatus))
+	}
+	condition := expression.Name("BlobStatus").In(expValues[0], expValues[1:]...)
+	_, err := s.dynamoDBClient.UpdateItemWithCondition(ctx, s.tableName, map[string]types.AttributeValue{
+		"PK": &types.AttributeValueMemberS{
+			Value: blobKeyPrefix + blobKey.Hex(),
+		},
+		"SK": &types.AttributeValueMemberS{
+			Value: blobMetadataSK,
+		},
+	}, map[string]types.AttributeValue{
+		"BlobStatus": &types.AttributeValueMemberN{
+			Value: strconv.Itoa(int(v2.Encoded)),
+		},
+		"UpdatedAt": &types.AttributeValueMemberN{
+			Value: strconv.FormatInt(time.Now().UnixNano(), 10),
+		},
+		"TotalChunkSizeBytes": &types.AttributeValueMemberN{
+			Value: strconv.Itoa(fragmentInfo.TotalChunkSizeBytes),
+		},
+		"NumFragments": &types.AttributeValueMemberN{
+			Value: strconv.Itoa(fragmentInfo.NumFragments),
+		},
+	}, condition)
+
+	if errors.Is(err, commondynamodb.ErrConditionFailed) {
+		blob, err := s.GetBlobMetadata(ctx, blobKey)
+		if err != nil {
+			s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
+		}
+
+		if blob.BlobStatus == v2.Encoded {
+			return fmt.Errorf("%w: blob already in Encoded status", common.ErrAlreadyExists)
+		}
+
+		return fmt.Errorf("%w: invalid status transition to Encoded status", ErrInvalidStateTransition)
+	}
+
+	return err
+}
+
+func (s *BlobMetadataStore) MarkBlobCertified(ctx context.Context, blobKey core.BlobKey) error {
+	return s.updateBlobStatus(ctx, blobKey, v2.Certified)
+}
+
+func (s *BlobMetadataStore) MarkBlobFailed(ctx context.Context, blobKey core.BlobKey) error {
+	return s.updateBlobStatus(ctx, blobKey, v2.Failed)
+}
+
+func (s *BlobMetadataStore) updateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error {
+	validStatuses := statusUpdatePrecondition[status]
+	if len(validStatuses) == 0 {
+		return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
+	}
+
+	expValues := make([]expression.OperandBuilder, len(validStatuses))
+	for i, validStatus := range validStatuses {
+		expValues[i] = expression.Value(int(validStatus))
+	}
+	condition := expression.Name("BlobStatus").In(expValues[0], expValues[1:]...)
+	_, err := s.dynamoDBClient.UpdateItemWithCondition(ctx, s.tableName, map[string]types.AttributeValue{
+		"PK": &types.AttributeValueMemberS{
+			Value: blobKeyPrefix + blobKey.Hex(),
+		},
+		"SK": &types.AttributeValueMemberS{
+			Value: blobMetadataSK,
+		},
+	}, map[string]types.AttributeValue{
+		"BlobStatus": &types.AttributeValueMemberN{
+			Value: strconv.Itoa(int(status)),
+		},
+		"UpdatedAt": &types.AttributeValueMemberN{
+			Value: strconv.FormatInt(time.Now().UnixNano(), 10),
+		},
+	}, condition)
+
+	if errors.Is(err, commondynamodb.ErrConditionFailed) {
+		blob, err := s.GetBlobMetadata(ctx, blobKey)
+		if err != nil {
+			s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
+		}
+
+		if blob.BlobStatus == status {
+			return fmt.Errorf("%w: blob already in status %s", common.ErrAlreadyExists, status.String())
+		}
+
+		return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
 	}
 
 	return err
@@ -70,7 +182,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.Bl
 	})
 
 	if item == nil {
-		return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex())
+		return nil, fmt.Errorf("%w: metadata not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
 	}
 
 	if err != nil {
@@ -85,7 +197,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.Bl
 	return metadata, nil
 }
 
-// GetBlobMetadataByStatus returns all the metadata with the given status
+// GetBlobMetadataByStatus returns all the metadata with the given status that were updated after lastUpdatedAt
 // Because this function scans the entire index, it should only be used for status with a limited number of items.
 func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) {
 	items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{
@@ -133,7 +245,7 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co
 
 	err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
 	if errors.Is(err, commondynamodb.ErrConditionFailed) {
-		return disperser.ErrAlreadyExists
+		return common.ErrAlreadyExists
 	}
 
 	return err
@@ -154,7 +266,7 @@ func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core
 	}
 
 	if item == nil {
-		return nil, fmt.Errorf("%w: certificate not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex())
+		return nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
 	}
 
 	cert, err := UnmarshalBlobCertificate(item)
diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
index 04abf3451b..f11b0702f5 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
@@ -9,8 +9,10 @@ import (
 	commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
 	core "github.com/Layr-Labs/eigenda/core"
 	corev2 "github.com/Layr-Labs/eigenda/core/v2"
-	"github.com/Layr-Labs/eigenda/disperser"
+	"github.com/Layr-Labs/eigenda/disperser/common"
 	v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
+	"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
+	"github.com/Layr-Labs/eigenda/encoding"
 	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
 	"github.com/stretchr/testify/assert"
 )
@@ -84,7 +86,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
 
 	// attempt to put metadata with the same key should fail
 	err = blobMetadataStore.PutBlobMetadata(ctx, metadata1)
-	assert.ErrorIs(t, err, disperser.ErrAlreadyExists)
+	assert.ErrorIs(t, err, common.ErrAlreadyExists)
 
 	deleteItems(t, []commondynamodb.Key{
 		{
@@ -141,7 +143,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
 		RelayKeys:            []corev2.RelayKey{0},
 	}
 	err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1)
-	assert.ErrorIs(t, err, disperser.ErrAlreadyExists)
+	assert.ErrorIs(t, err, common.ErrAlreadyExists)
 
 	deleteItems(t, []commondynamodb.Key{
 		{
@@ -151,6 +153,65 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
 	})
 }
 
+func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) {
+	ctx := context.Background()
+	blobHeader := &corev2.BlobHeader{
+		BlobVersion:     0,
+		QuorumNumbers:   []core.QuorumID{0},
+		BlobCommitments: mockCommitment,
+		PaymentMetadata: core.PaymentMetadata{
+			AccountID:         "0x123",
+			BinIndex:          0,
+			CumulativePayment: big.NewInt(532),
+		},
+	}
+	blobKey, err := blobHeader.BlobKey()
+	assert.NoError(t, err)
+
+	now := time.Now()
+	metadata := &v2.BlobMetadata{
+		BlobHeader: blobHeader,
+		BlobStatus: v2.Queued,
+		Expiry:     uint64(now.Add(time.Hour).Unix()),
+		NumRetries: 0,
+		UpdatedAt:  uint64(now.UnixNano()),
+	}
+	err = blobMetadataStore.PutBlobMetadata(ctx, metadata)
+	assert.NoError(t, err)
+
+	// Update the blob status to invalid status
+	err = blobMetadataStore.MarkBlobCertified(ctx, blobKey)
+	assert.ErrorIs(t, err, blobstore.ErrInvalidStateTransition)
+
+	// Update the blob status to a valid status
+	err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
+		TotalChunkSizeBytes: 100,
+		NumFragments:        10,
+	})
+	assert.NoError(t, err)
+
+	// Update the blob status to same status
+	err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
+		TotalChunkSizeBytes: 200,
+		NumFragments:        20,
+	})
+	assert.ErrorIs(t, err, common.ErrAlreadyExists)
+
+	fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey)
+	assert.NoError(t, err)
+	assert.Equal(t, fetchedMetadata.BlobStatus, v2.Encoded)
+	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata.UpdatedAt)
+	assert.Equal(t, fetchedMetadata.TotalChunkSizeBytes, 100)
+	assert.Equal(t, fetchedMetadata.NumFragments, 10)
+
+	deleteItems(t, []commondynamodb.Key{
+		{
+			"PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()},
+			"SK": &types.AttributeValueMemberS{Value: "BlobMetadata"},
+		},
+	})
+}
+
 func deleteItems(t *testing.T, keys []commondynamodb.Key) {
 	failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys)
 	assert.NoError(t, err)
diff --git a/disperser/common/v2/blobstore/s3_blob_store.go b/disperser/common/v2/blobstore/s3_blob_store.go
index ac1da3da63..c7575c4a99 100644
--- a/disperser/common/v2/blobstore/s3_blob_store.go
+++ b/disperser/common/v2/blobstore/s3_blob_store.go
@@ -4,7 +4,9 @@ import (
 	"context"
 
 	"github.com/Layr-Labs/eigenda/common/aws/s3"
+	"github.com/Layr-Labs/eigenda/disperser/common"
 	"github.com/Layr-Labs/eigensdk-go/logging"
+	"github.com/pkg/errors"
 )
 
 type BlobStore struct {
@@ -34,6 +36,11 @@ func (b *BlobStore) StoreBlob(ctx context.Context, blobKey string, data []byte)
 // GetBlob retrieves a blob from the blob store
 func (b *BlobStore) GetBlob(ctx context.Context, blobKey string) ([]byte, error) {
 	data, err := b.s3Client.DownloadObject(ctx, b.bucketName, blobKey)
+	if errors.Is(err, s3.ErrObjectNotFound) {
+		b.logger.Warnf("blob not found in bucket %s: %s", b.bucketName, blobKey)
+		return nil, common.ErrBlobNotFound
+	}
+
 	if err != nil {
 		b.logger.Errorf("failed to download blob from bucket %s: %v", b.bucketName, err)
 		return nil, err
diff --git a/disperser/controller/controller_test.go b/disperser/controller/controller_test.go
new file mode 100644
index 0000000000..82300f436e
--- /dev/null
+++ b/disperser/controller/controller_test.go
@@ -0,0 +1,150 @@
+package controller_test
+
+import (
+	"context"
+	"fmt"
+	"math/big"
+	"os"
+	"testing"
+
+	"github.com/Layr-Labs/eigenda/common/aws"
+	"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
+	test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
+	"github.com/Layr-Labs/eigenda/common/aws/s3"
+	"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
+	"github.com/Layr-Labs/eigenda/encoding"
+	"github.com/Layr-Labs/eigenda/inabox/deploy"
+	"github.com/Layr-Labs/eigensdk-go/logging"
+	"github.com/consensys/gnark-crypto/ecc/bn254"
+	"github.com/consensys/gnark-crypto/ecc/bn254/fp"
+	"github.com/google/uuid"
+	"github.com/ory/dockertest/v3"
+)
+
+var (
+	logger = logging.NewNoopLogger()
+
+	dockertestPool     *dockertest.Pool
+	dockertestResource *dockertest.Resource
+
+	deployLocalStack bool
+	localStackPort   = "4571"
+
+	s3Client          s3.Client
+	dynamoClient      *dynamodb.Client
+	blobStore         *blobstore.BlobStore
+	blobMetadataStore *blobstore.BlobMetadataStore
+
+	UUID              = uuid.New()
+	s3BucketName      = "test-eigenda-blobstore"
+	metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
+
+	mockCommitment = encoding.BlobCommitments{}
+)
+
+func TestMain(m *testing.M) {
+	setup(m)
+	code := m.Run()
+	teardown()
+	os.Exit(code)
+}
+
+func setup(m *testing.M) {
+
+	deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false")
+	if !deployLocalStack {
+		localStackPort = os.Getenv("LOCALSTACK_PORT")
+	}
+
+	if deployLocalStack {
+		var err error
+		dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort)
+		if err != nil {
+			teardown()
+			panic("failed to start localstack container")
+		}
+	}
+
+	cfg := aws.ClientConfig{
+		Region:          "us-east-1",
+		AccessKey:       "localstack",
+		SecretAccessKey: "localstack",
+		EndpointURL:     fmt.Sprintf("http://0.0.0.0:%s", localStackPort),
+	}
+
+	_, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstore.GenerateTableSchema(metadataTableName, 10, 10))
+	if err != nil {
+		teardown()
+		panic("failed to create dynamodb table: " + err.Error())
+	}
+
+	dynamoClient, err = dynamodb.NewClient(cfg, logger)
+	if err != nil {
+		teardown()
+		panic("failed to create dynamodb client: " + err.Error())
+	}
+
+	blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName)
+
+	s3Client, err = s3.NewClient(context.Background(), cfg, logger)
+	if err != nil {
+		teardown()
+		panic("failed to create s3 client: " + err.Error())
+	}
+	err = s3Client.CreateBucket(context.Background(), s3BucketName)
+	if err != nil {
+		teardown()
+		panic("failed to create s3 bucket: " + err.Error())
+	}
+	blobStore = blobstore.NewBlobStore(s3BucketName, s3Client, logger)
+
+	var X1, Y1 fp.Element
+	X1 = *X1.SetBigInt(big.NewInt(1))
+	Y1 = *Y1.SetBigInt(big.NewInt(2))
+
+	var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element
+	_, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781")
+	if err != nil {
+		teardown()
+		panic("failed to create mock commitment: " + err.Error())
+	}
+	_, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634")
+	if err != nil {
+		teardown()
+		panic("failed to create mock commitment: " + err.Error())
+	}
+	_, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930")
+	if err != nil {
+		teardown()
+		panic("failed to create mock commitment: " + err.Error())
+	}
+	_, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531")
+	if err != nil {
+		teardown()
+		panic("failed to create mock commitment: " + err.Error())
+	}
+
+	var lengthProof, lengthCommitment bn254.G2Affine
+	lengthProof.X.A0 = lengthXA0
+	lengthProof.X.A1 = lengthXA1
+	lengthProof.Y.A0 = lengthYA0
+	lengthProof.Y.A1 = lengthYA1
+
+	lengthCommitment = lengthProof
+
+	mockCommitment = encoding.BlobCommitments{
+		Commitment: &encoding.G1Commitment{
+			X: X1,
+			Y: Y1,
+		},
+		LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment),
+		LengthProof:      (*encoding.G2Commitment)(&lengthProof),
+		Length:           16,
+	}
+}
+
+func teardown() {
+	if deployLocalStack {
+		deploy.PurgeDockertestResources(dockertestPool, dockertestResource)
+	}
+}
diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go
new file mode 100644
index 0000000000..867d633b57
--- /dev/null
+++ b/disperser/controller/encoding_manager.go
@@ -0,0 +1,200 @@
+package controller
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math"
+	"math/rand"
+	"time"
+
+	"github.com/Layr-Labs/eigenda/common"
+	"github.com/Layr-Labs/eigenda/core"
+	corev2 "github.com/Layr-Labs/eigenda/core/v2"
+	"github.com/Layr-Labs/eigenda/disperser"
+	dispcommon "github.com/Layr-Labs/eigenda/disperser/common"
+	v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
+	"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
+	"github.com/Layr-Labs/eigenda/encoding"
+	"github.com/Layr-Labs/eigensdk-go/logging"
+)
+
+var errNoBlobsToEncode = errors.New("no blobs to encode")
+
+type EncodingManagerConfig struct {
+	PullInterval time.Duration
+
+	EncodingRequestTimeout time.Duration
+	StoreTimeout           time.Duration
+	// NumEncodingRetries defines how many times the encoding will be retried
+	NumEncodingRetries int
+	// NumRelayAssignment defines how many relays will be assigned to a blob
+	NumRelayAssignment uint16
+	// AvailableRelays is a list of available relays
+	AvailableRelays []corev2.RelayKey
+}
+
+// EncodingManager is responsible for pulling queued blobs from the blob
+// metadata store periodically and encoding them. It receives the encoder responses
+// and creates BlobCertificates.
+type EncodingManager struct {
+	EncodingManagerConfig
+
+	// components
+	blobMetadataStore *blobstore.BlobMetadataStore
+	pool              common.WorkerPool
+	encodingClient    disperser.EncoderClientV2
+	chainReader       core.Reader
+	logger            logging.Logger
+
+	// state
+	lastUpdatedAt uint64
+}
+
+func NewEncodingManager(
+	config EncodingManagerConfig,
+	blobMetadataStore *blobstore.BlobMetadataStore,
+	pool common.WorkerPool,
+	encodingClient disperser.EncoderClientV2,
+	chainReader core.Reader,
+	logger logging.Logger,
+) (*EncodingManager, error) {
+	if int(config.NumRelayAssignment) > len(config.AvailableRelays) {
+		return nil, fmt.Errorf("NumRelayAssignment (%d) cannot be greater than NumRelays (%d)", config.NumRelayAssignment, len(config.AvailableRelays))
+	}
+	return &EncodingManager{
+		EncodingManagerConfig: config,
+		blobMetadataStore:     blobMetadataStore,
+		pool:                  pool,
+		encodingClient:        encodingClient,
+		chainReader:           chainReader,
+		logger:                logger.With("component", "EncodingManager"),
+
+		lastUpdatedAt: 0,
+	}, nil
+}
+
+func (e *EncodingManager) Start(ctx context.Context) error {
+	go func() {
+		ticker := time.NewTicker(e.PullInterval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-ticker.C:
+				err := e.HandleBatch(ctx)
+				if err != nil {
+					if errors.Is(err, errNoBlobsToEncode) {
+						e.logger.Warn("no blobs to encode")
+					} else {
+						e.logger.Error("failed to process a batch", "err", err)
+					}
+				}
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (e *EncodingManager) HandleBatch(ctx context.Context) error {
+	// Get a batch of blobs to encode
+	blobMetadatas, err := e.blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, e.lastUpdatedAt)
+	if err != nil {
+		return err
+	}
+
+	if len(blobMetadatas) == 0 {
+		return errNoBlobsToEncode
+	}
+
+	currentBlockNumber, err := e.chainReader.GetCurrentBlockNumber(ctx)
+	if err != nil {
+		return fmt.Errorf("failed to get current block number: %w", err)
+	}
+
+	for _, blob := range blobMetadatas {
+		blobKey, err := blob.BlobHeader.BlobKey()
+		if err != nil {
+			e.logger.Error("failed to get blob key", "err", err, "requestedAt", blob.RequestedAt, "paymentMetadata", blob.BlobHeader.PaymentMetadata)
+			continue
+		}
+		e.lastUpdatedAt = blob.UpdatedAt
+
+		// Encode the blobs
+		e.pool.Submit(func() {
+			for i := 0; i < e.NumEncodingRetries+1; i++ {
+				encodingCtx, cancel := context.WithTimeout(ctx, e.EncodingRequestTimeout)
+				fragmentInfo, err := e.encodeBlob(encodingCtx, blobKey, blob)
+				cancel()
+				if err != nil {
+					e.logger.Error("failed to encode blob", "blobKey", blobKey.Hex(), "err", err)
+					continue
+				}
+				relayKeys, err := GetRelayKeys(e.NumRelayAssignment, e.AvailableRelays)
+				if err != nil {
+					e.logger.Error("failed to get relay keys", "err", err)
+					// Stop retrying
+					break
+				}
+				cert := &corev2.BlobCertificate{
+					BlobHeader:           blob.BlobHeader,
+					ReferenceBlockNumber: uint64(currentBlockNumber),
+					RelayKeys:            relayKeys,
+				}
+
+				storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
+				err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert)
+				cancel()
+				if err != nil && !errors.Is(err, dispcommon.ErrAlreadyExists) {
+					e.logger.Error("failed to put blob certificate", "err", err)
+					continue
+				}
+
+				storeCtx, cancel = context.WithTimeout(ctx, e.StoreTimeout)
+				err = e.blobMetadataStore.MarkBlobEncoded(storeCtx, blobKey, fragmentInfo)
+				cancel()
+				if err == nil || errors.Is(err, dispcommon.ErrAlreadyExists) {
+					// Successfully updated the status to Encoded
+					return
+				}
+
+				e.logger.Error("failed to update blob status to Encoded", "blobKey", blobKey.Hex(), "err", err)
+				time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
+			}
+
+			storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
+			err = e.blobMetadataStore.MarkBlobFailed(storeCtx, blobKey)
+			cancel()
+			if err != nil {
+				e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
+				return
+			}
+		})
+	}
+
+	return nil
+}
+
+func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata) (*encoding.FragmentInfo, error) {
+	encodingParams, err := blob.BlobHeader.GetEncodingParams()
+	if err != nil {
+		return nil, fmt.Errorf("failed to get encoding params: %w", err)
+	}
+	return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams)
+}
+
+func GetRelayKeys(numAssignment uint16, availableRelays []corev2.RelayKey) ([]corev2.RelayKey, error) {
+	if int(numAssignment) > len(availableRelays) {
+		return nil, fmt.Errorf("numAssignment (%d) cannot be greater than numRelays (%d)", numAssignment, len(availableRelays))
+	}
+	relayKeys := availableRelays
+	// shuffle relay keys
+	for i := len(relayKeys) - 1; i > 0; i-- {
+		j := rand.Intn(i + 1)
+		relayKeys[i], relayKeys[j] = relayKeys[j], relayKeys[i]
+	}
+
+	return relayKeys[:numAssignment], nil
+}
diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go
new file mode 100644
index 0000000000..f956cefd75
--- /dev/null
+++ b/disperser/controller/encoding_manager_test.go
@@ -0,0 +1,268 @@
+package controller_test
+
+import (
+	"context"
+	"math/big"
+	"testing"
+	"time"
+
+	"github.com/Layr-Labs/eigenda/common"
+	"github.com/Layr-Labs/eigenda/core"
+	coremock "github.com/Layr-Labs/eigenda/core/mock"
+	v2 "github.com/Layr-Labs/eigenda/core/v2"
+	dispcommon "github.com/Layr-Labs/eigenda/disperser/common"
+	commonv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
+	"github.com/Layr-Labs/eigenda/disperser/controller"
+	dispmock "github.com/Layr-Labs/eigenda/disperser/mock"
+	"github.com/Layr-Labs/eigenda/encoding"
+	"github.com/Layr-Labs/eigensdk-go/logging"
+	"github.com/gammazero/workerpool"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
+)
+
+var (
+	blockNumber = uint32(100)
+)
+
+type testComponents struct {
+	EncodingManager *controller.EncodingManager
+	Pool            common.WorkerPool
+	EncodingClient  *dispmock.MockEncoderClientV2
+	ChainReader     *coremock.MockWriter
+}
+
+func TestGetRelayKeys(t *testing.T) {
+	// Test cases for GetRelayKeys function
+	tests := []struct {
+		name            string
+		numRelays       uint16
+		availableRelays []v2.RelayKey
+		err             error
+	}{
+		{
+			name:            "Single relay",
+			numRelays:       1,
+			availableRelays: []v2.RelayKey{0},
+			err:             nil,
+		},
+		{
+			name:            "Choose more than whats available",
+			numRelays:       2,
+			availableRelays: []v2.RelayKey{0},
+			err:             nil,
+		},
+		{
+			name:            "Choose 1 from multiple relays",
+			numRelays:       3,
+			availableRelays: []v2.RelayKey{0, 1, 2, 3},
+			err:             nil,
+		},
+		{
+			name:            "Choose 2 from multiple relays",
+			numRelays:       2,
+			availableRelays: []v2.RelayKey{0, 1, 2, 3},
+			err:             nil,
+		},
+		{
+			name:            "No relays",
+			numRelays:       0,
+			availableRelays: []v2.RelayKey{},
+			err:             nil,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := controller.GetRelayKeys(tt.numRelays, tt.availableRelays)
+			if err != nil {
+				assert.Error(t, err)
+			} else {
+				assert.NoError(t, tt.err)
+				assert.Len(t, got, int(tt.numRelays))
+				seen := make(map[v2.RelayKey]struct{})
+				for _, relay := range got {
+					assert.Contains(t, tt.availableRelays, relay)
+					seen[relay] = struct{}{}
+				}
+				assert.Equal(t, len(seen), len(got))
+			}
+		})
+	}
+}
+
+func TestHandleBatch(t *testing.T) {
+	ctx := context.Background()
+	blobHeader1 := &v2.BlobHeader{
+		BlobVersion:     0,
+		QuorumNumbers:   []core.QuorumID{0},
+		BlobCommitments: mockCommitment,
+		PaymentMetadata: core.PaymentMetadata{
+			AccountID:         "0x1234",
+			BinIndex:          0,
+			CumulativePayment: big.NewInt(532),
+		},
+	}
+	blobKey1, err := blobHeader1.BlobKey()
+	assert.NoError(t, err)
+	now := time.Now()
+	metadata1 := &commonv2.BlobMetadata{
+		BlobHeader: blobHeader1,
+		BlobStatus: commonv2.Queued,
+		Expiry:     uint64(now.Add(time.Hour).Unix()),
+		NumRetries: 0,
+		UpdatedAt:  uint64(now.UnixNano()),
+	}
+	err = blobMetadataStore.PutBlobMetadata(ctx, metadata1)
+	assert.NoError(t, err)
+
+	c := newTestComponents(t)
+	c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{
+		TotalChunkSizeBytes: 100,
+		NumFragments:        5,
+	}, nil)
+
+	err = c.EncodingManager.HandleBatch(ctx)
+	assert.NoError(t, err)
+	c.Pool.StopWait()
+
+	fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
+	assert.NoError(t, err)
+	assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
+	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
+
+	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	assert.NoError(t, err)
+	assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
+	assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
+	for _, relayKey := range fetchedCert.RelayKeys {
+		assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
+	}
+}
+
+func TestHandleBatchNoBlobs(t *testing.T) {
+	ctx := context.Background()
+	c := newTestComponents(t)
+	err := c.EncodingManager.HandleBatch(ctx)
+	assert.ErrorContains(t, err, "no blobs to encode")
+}
+
+func TestHandleBatchRetrySuccess(t *testing.T) {
+	ctx := context.Background()
+	blobHeader1 := &v2.BlobHeader{
+		BlobVersion:     0,
+		QuorumNumbers:   []core.QuorumID{0},
+		BlobCommitments: mockCommitment,
+		PaymentMetadata: core.PaymentMetadata{
+			AccountID:         "0x12345",
+			BinIndex:          0,
+			CumulativePayment: big.NewInt(532),
+		},
+	}
+	blobKey1, err := blobHeader1.BlobKey()
+	assert.NoError(t, err)
+	now := time.Now()
+	metadata1 := &commonv2.BlobMetadata{
+		BlobHeader: blobHeader1,
+		BlobStatus: commonv2.Queued,
+		Expiry:     uint64(now.Add(time.Hour).Unix()),
+		NumRetries: 0,
+		UpdatedAt:  uint64(now.UnixNano()),
+	}
+	err = blobMetadataStore.PutBlobMetadata(ctx, metadata1)
+	assert.NoError(t, err)
+
+	c := newTestComponents(t)
+	c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
+	c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(&encoding.FragmentInfo{
+		TotalChunkSizeBytes: 100,
+		NumFragments:        5,
+	}, nil)
+
+	err = c.EncodingManager.HandleBatch(ctx)
+	assert.NoError(t, err)
+	c.Pool.StopWait()
+
+	fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
+	assert.NoError(t, err)
+	assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
+	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
+
+	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	assert.NoError(t, err)
+	assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
+	assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
+	for _, relayKey := range fetchedCert.RelayKeys {
+		assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
+	}
+	c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
+}
+
+func TestHandleBatchRetryFailure(t *testing.T) {
+	ctx := context.Background()
+	blobHeader1 := &v2.BlobHeader{
+		BlobVersion:     0,
+		QuorumNumbers:   []core.QuorumID{0},
+		BlobCommitments: mockCommitment,
+		PaymentMetadata: core.PaymentMetadata{
+			AccountID:         "0x123456",
+			BinIndex:          0,
+			CumulativePayment: big.NewInt(532),
+		},
+	}
+	blobKey1, err := blobHeader1.BlobKey()
+	assert.NoError(t, err)
+	now := time.Now()
+	metadata1 := &commonv2.BlobMetadata{
+		BlobHeader: blobHeader1,
+		BlobStatus: commonv2.Queued,
+		Expiry:     uint64(now.Add(time.Hour).Unix()),
+		NumRetries: 0,
+		UpdatedAt:  uint64(now.UnixNano()),
+	}
+	err = blobMetadataStore.PutBlobMetadata(ctx, metadata1)
+	assert.NoError(t, err)
+
+	c := newTestComponents(t)
+	c.EncodingClient.On("EncodeBlob", mock.Anything, mock.Anything, mock.Anything).Return(nil, assert.AnError).Twice()
+
+	err = c.EncodingManager.HandleBatch(ctx)
+	assert.NoError(t, err)
+	c.Pool.StopWait()
+
+	fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
+	assert.NoError(t, err)
+	// marked as failed
+	assert.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus)
+	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
+
+	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	assert.ErrorIs(t, err, dispcommon.ErrMetadataNotFound)
+	assert.Nil(t, fetchedCert)
+	c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
+}
+
+func newTestComponents(t *testing.T) *testComponents {
+	logger := logging.NewNoopLogger()
+	// logger, err := common.NewLogger(common.DefaultLoggerConfig())
+	// assert.NoError(t, err)
+	pool := workerpool.New(5)
+	encodingClient := dispmock.NewMockEncoderClientV2()
+	chainReader := &coremock.MockWriter{}
+	chainReader.On("GetCurrentBlockNumber").Return(blockNumber, nil)
+	em, err := controller.NewEncodingManager(controller.EncodingManagerConfig{
+		PullInterval:           1 * time.Second,
+		EncodingRequestTimeout: 5 * time.Second,
+		StoreTimeout:           5 * time.Second,
+		NumEncodingRetries:     1,
+		NumRelayAssignment:     2,
+		AvailableRelays:        []v2.RelayKey{0, 1, 2, 3},
+	}, blobMetadataStore, pool, encodingClient, chainReader, logger)
+	assert.NoError(t, err)
+	return &testComponents{
+		EncodingManager: em,
+		Pool:            pool,
+		EncodingClient:  encodingClient,
+		ChainReader:     chainReader,
+	}
+}
diff --git a/disperser/encoder_client_v2.go b/disperser/encoder_client_v2.go
new file mode 100644
index 0000000000..5a81d3188c
--- /dev/null
+++ b/disperser/encoder_client_v2.go
@@ -0,0 +1,12 @@
+package disperser
+
+import (
+	"context"
+
+	v2 "github.com/Layr-Labs/eigenda/core/v2"
+	"github.com/Layr-Labs/eigenda/encoding"
+)
+
+type EncoderClientV2 interface {
+	EncodeBlob(ctx context.Context, blobKey v2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error)
+}
diff --git a/disperser/mock/encoder_v2.go b/disperser/mock/encoder_v2.go
new file mode 100644
index 0000000000..c7544574fa
--- /dev/null
+++ b/disperser/mock/encoder_v2.go
@@ -0,0 +1,29 @@
+package mock
+
+import (
+	"context"
+
+	v2 "github.com/Layr-Labs/eigenda/core/v2"
+	"github.com/Layr-Labs/eigenda/disperser"
+	"github.com/Layr-Labs/eigenda/encoding"
+	"github.com/stretchr/testify/mock"
+)
+
+type MockEncoderClientV2 struct {
+	mock.Mock
+}
+
+var _ disperser.EncoderClientV2 = (*MockEncoderClientV2)(nil)
+
+func NewMockEncoderClientV2() *MockEncoderClientV2 {
+	return &MockEncoderClientV2{}
+}
+
+func (m *MockEncoderClientV2) EncodeBlob(ctx context.Context, blobKey v2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) {
+	args := m.Called()
+	var fragmentInfo *encoding.FragmentInfo
+	if args.Get(0) != nil {
+		fragmentInfo = args.Get(0).(*encoding.FragmentInfo)
+	}
+	return fragmentInfo, args.Error(1)
+}
diff --git a/encoding/data.go b/encoding/data.go
index 48ad18a156..9909874f98 100644
--- a/encoding/data.go
+++ b/encoding/data.go
@@ -87,3 +87,8 @@ type SubBatch struct {
 }
 
 type ChunkNumber = uint
+
+type FragmentInfo struct {
+	TotalChunkSizeBytes int
+	NumFragments        int
+}

From f1bd032c58a4dba2bee403978087cb83e97173c4 Mon Sep 17 00:00:00 2001
From: Ian Shim <shim@fastmail.com>
Date: Wed, 30 Oct 2024 20:56:39 -0700
Subject: [PATCH 2/2] store fragmentInfo in blobCert

---
 .../v2/blobstore/dynamo_metadata_store.go     | 97 ++++++-------------
 .../blobstore/dynamo_metadata_store_test.go   | 33 ++++---
 disperser/controller/encoding_manager.go      |  6 +-
 disperser/controller/encoding_manager_test.go | 11 ++-
 encoding/data.go                              |  4 +-
 5 files changed, 59 insertions(+), 92 deletions(-)

diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go
index be272b30aa..f039be6044 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go
@@ -71,64 +71,7 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v
 	return err
 }
 
-func (s *BlobMetadataStore) MarkBlobEncoded(ctx context.Context, blobKey core.BlobKey, fragmentInfo *encoding.FragmentInfo) error {
-	validStatuses := statusUpdatePrecondition[v2.Encoded]
-	if len(validStatuses) == 0 {
-		return fmt.Errorf("%w: invalid status transition to Encoded", ErrInvalidStateTransition)
-	}
-
-	expValues := make([]expression.OperandBuilder, len(validStatuses))
-	for i, validStatus := range validStatuses {
-		expValues[i] = expression.Value(int(validStatus))
-	}
-	condition := expression.Name("BlobStatus").In(expValues[0], expValues[1:]...)
-	_, err := s.dynamoDBClient.UpdateItemWithCondition(ctx, s.tableName, map[string]types.AttributeValue{
-		"PK": &types.AttributeValueMemberS{
-			Value: blobKeyPrefix + blobKey.Hex(),
-		},
-		"SK": &types.AttributeValueMemberS{
-			Value: blobMetadataSK,
-		},
-	}, map[string]types.AttributeValue{
-		"BlobStatus": &types.AttributeValueMemberN{
-			Value: strconv.Itoa(int(v2.Encoded)),
-		},
-		"UpdatedAt": &types.AttributeValueMemberN{
-			Value: strconv.FormatInt(time.Now().UnixNano(), 10),
-		},
-		"TotalChunkSizeBytes": &types.AttributeValueMemberN{
-			Value: strconv.Itoa(fragmentInfo.TotalChunkSizeBytes),
-		},
-		"NumFragments": &types.AttributeValueMemberN{
-			Value: strconv.Itoa(fragmentInfo.NumFragments),
-		},
-	}, condition)
-
-	if errors.Is(err, commondynamodb.ErrConditionFailed) {
-		blob, err := s.GetBlobMetadata(ctx, blobKey)
-		if err != nil {
-			s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
-		}
-
-		if blob.BlobStatus == v2.Encoded {
-			return fmt.Errorf("%w: blob already in Encoded status", common.ErrAlreadyExists)
-		}
-
-		return fmt.Errorf("%w: invalid status transition to Encoded status", ErrInvalidStateTransition)
-	}
-
-	return err
-}
-
-func (s *BlobMetadataStore) MarkBlobCertified(ctx context.Context, blobKey core.BlobKey) error {
-	return s.updateBlobStatus(ctx, blobKey, v2.Certified)
-}
-
-func (s *BlobMetadataStore) MarkBlobFailed(ctx context.Context, blobKey core.BlobKey) error {
-	return s.updateBlobStatus(ctx, blobKey, v2.Failed)
-}
-
-func (s *BlobMetadataStore) updateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error {
+func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error {
 	validStatuses := statusUpdatePrecondition[status]
 	if len(validStatuses) == 0 {
 		return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
@@ -237,8 +180,8 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st
 	return count, nil
 }
 
-func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error {
-	item, err := MarshalBlobCertificate(blobCert)
+func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) error {
+	item, err := MarshalBlobCertificate(blobCert, fragmentInfo)
 	if err != nil {
 		return err
 	}
@@ -251,7 +194,7 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co
 	return err
 }
 
-func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) {
+func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, *encoding.FragmentInfo, error) {
 	item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
 		"PK": &types.AttributeValueMemberS{
 			Value: blobKeyPrefix + blobKey.Hex(),
@@ -262,19 +205,19 @@ func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core
 	})
 
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	if item == nil {
-		return nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
+		return nil, nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
 	}
 
-	cert, err := UnmarshalBlobCertificate(item)
+	cert, fragmentInfo, err := UnmarshalBlobCertificate(item)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
-	return cert, nil
+	return cert, fragmentInfo, nil
 }
 
 func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput {
@@ -432,12 +375,21 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) {
 	return &metadata, nil
 }
 
-func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) {
+func MarshalBlobCertificate(blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) (commondynamodb.Item, error) {
 	fields, err := attributevalue.MarshalMap(blobCert)
 	if err != nil {
 		return nil, fmt.Errorf("failed to marshal blob certificate: %w", err)
 	}
 
+	// merge fragment info
+	fragmentInfoFields, err := attributevalue.MarshalMap(fragmentInfo)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal fragment info: %w", err)
+	}
+	for k, v := range fragmentInfoFields {
+		fields[k] = v
+	}
+
 	// Add PK and SK fields
 	blobKey, err := blobCert.BlobHeader.BlobKey()
 	if err != nil {
@@ -449,11 +401,16 @@ func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item
 	return fields, nil
 }
 
-func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) {
+func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, *encoding.FragmentInfo, error) {
 	cert := core.BlobCertificate{}
 	err := attributevalue.UnmarshalMap(item, &cert)
 	if err != nil {
-		return nil, err
+		return nil, nil, fmt.Errorf("failed to unmarshal blob certificate: %w", err)
+	}
+	fragmentInfo := encoding.FragmentInfo{}
+	err = attributevalue.UnmarshalMap(item, &fragmentInfo)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to unmarshal fragment info: %w", err)
 	}
-	return &cert, nil
+	return &cert, &fragmentInfo, nil
 }
diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
index f11b0702f5..ae18404cba 100644
--- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
+++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go
@@ -117,14 +117,19 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
 		ReferenceBlockNumber: uint64(100),
 		RelayKeys:            []corev2.RelayKey{0, 2, 4},
 	}
-	err := blobMetadataStore.PutBlobCertificate(ctx, blobCert)
+	fragmentInfo := &encoding.FragmentInfo{
+		TotalChunkSizeBytes: 100,
+		NumFragments:        10,
+	}
+	err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo)
 	assert.NoError(t, err)
 
 	blobKey, err := blobCert.BlobHeader.BlobKey()
 	assert.NoError(t, err)
-	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey)
+	fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey)
 	assert.NoError(t, err)
 	assert.Equal(t, blobCert, fetchedCert)
+	assert.Equal(t, fragmentInfo, fetchedFragmentInfo)
 
 	// blob cert with the same key should fail
 	blobCert1 := &corev2.BlobCertificate{
@@ -142,7 +147,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
 		ReferenceBlockNumber: uint64(1234),
 		RelayKeys:            []corev2.RelayKey{0},
 	}
-	err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1)
+	err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo)
 	assert.ErrorIs(t, err, common.ErrAlreadyExists)
 
 	deleteItems(t, []commondynamodb.Key{
@@ -180,29 +185,29 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) {
 	assert.NoError(t, err)
 
 	// Update the blob status to invalid status
-	err = blobMetadataStore.MarkBlobCertified(ctx, blobKey)
+	err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
 	assert.ErrorIs(t, err, blobstore.ErrInvalidStateTransition)
 
 	// Update the blob status to a valid status
-	err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
-		TotalChunkSizeBytes: 100,
-		NumFragments:        10,
-	})
+	err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded)
 	assert.NoError(t, err)
 
 	// Update the blob status to same status
-	err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
-		TotalChunkSizeBytes: 200,
-		NumFragments:        20,
-	})
+	err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded)
 	assert.ErrorIs(t, err, common.ErrAlreadyExists)
 
 	fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey)
 	assert.NoError(t, err)
 	assert.Equal(t, fetchedMetadata.BlobStatus, v2.Encoded)
 	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata.UpdatedAt)
-	assert.Equal(t, fetchedMetadata.TotalChunkSizeBytes, 100)
-	assert.Equal(t, fetchedMetadata.NumFragments, 10)
+
+	// Update the blob status to a valid status
+	err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
+	assert.NoError(t, err)
+
+	fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey)
+	assert.NoError(t, err)
+	assert.Equal(t, fetchedMetadata.BlobStatus, v2.Failed)
 
 	deleteItems(t, []commondynamodb.Key{
 		{
diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go
index 867d633b57..3527033671 100644
--- a/disperser/controller/encoding_manager.go
+++ b/disperser/controller/encoding_manager.go
@@ -145,7 +145,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
 				}
 
 				storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
-				err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert)
+				err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert, fragmentInfo)
 				cancel()
 				if err != nil && !errors.Is(err, dispcommon.ErrAlreadyExists) {
 					e.logger.Error("failed to put blob certificate", "err", err)
@@ -153,7 +153,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
 				}
 
 				storeCtx, cancel = context.WithTimeout(ctx, e.StoreTimeout)
-				err = e.blobMetadataStore.MarkBlobEncoded(storeCtx, blobKey, fragmentInfo)
+				err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Encoded)
 				cancel()
 				if err == nil || errors.Is(err, dispcommon.ErrAlreadyExists) {
 					// Successfully updated the status to Encoded
@@ -165,7 +165,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
 			}
 
 			storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
-			err = e.blobMetadataStore.MarkBlobFailed(storeCtx, blobKey)
+			err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Failed)
 			cancel()
 			if err != nil {
 				e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go
index f956cefd75..916490d5e4 100644
--- a/disperser/controller/encoding_manager_test.go
+++ b/disperser/controller/encoding_manager_test.go
@@ -131,13 +131,15 @@ func TestHandleBatch(t *testing.T) {
 	assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
 	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
 
-	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
 	assert.NoError(t, err)
 	assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
 	assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
 	for _, relayKey := range fetchedCert.RelayKeys {
 		assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
 	}
+	assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
+	assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
 }
 
 func TestHandleBatchNoBlobs(t *testing.T) {
@@ -188,13 +190,15 @@ func TestHandleBatchRetrySuccess(t *testing.T) {
 	assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
 	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
 
-	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
 	assert.NoError(t, err)
 	assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
 	assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
 	for _, relayKey := range fetchedCert.RelayKeys {
 		assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
 	}
+	assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
+	assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
 	c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
 }
 
@@ -236,9 +240,10 @@ func TestHandleBatchRetryFailure(t *testing.T) {
 	assert.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus)
 	assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)
 
-	fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
+	fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
 	assert.ErrorIs(t, err, dispcommon.ErrMetadataNotFound)
 	assert.Nil(t, fetchedCert)
+	assert.Nil(t, fetchedFragmentInfo)
 	c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
 }
 
diff --git a/encoding/data.go b/encoding/data.go
index 9909874f98..2c43561456 100644
--- a/encoding/data.go
+++ b/encoding/data.go
@@ -89,6 +89,6 @@ type SubBatch struct {
 type ChunkNumber = uint
 
 type FragmentInfo struct {
-	TotalChunkSizeBytes int
-	NumFragments        int
+	TotalChunkSizeBytes uint32
+	NumFragments        uint32
 }