diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store.go b/disperser/common/v2/blobstore/dynamo_metadata_store.go index be272b30aa..f039be6044 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store.go @@ -71,64 +71,7 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v return err } -func (s *BlobMetadataStore) MarkBlobEncoded(ctx context.Context, blobKey core.BlobKey, fragmentInfo *encoding.FragmentInfo) error { - validStatuses := statusUpdatePrecondition[v2.Encoded] - if len(validStatuses) == 0 { - return fmt.Errorf("%w: invalid status transition to Encoded", ErrInvalidStateTransition) - } - - expValues := make([]expression.OperandBuilder, len(validStatuses)) - for i, validStatus := range validStatuses { - expValues[i] = expression.Value(int(validStatus)) - } - condition := expression.Name("BlobStatus").In(expValues[0], expValues[1:]...) - _, err := s.dynamoDBClient.UpdateItemWithCondition(ctx, s.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: blobKeyPrefix + blobKey.Hex(), - }, - "SK": &types.AttributeValueMemberS{ - Value: blobMetadataSK, - }, - }, map[string]types.AttributeValue{ - "BlobStatus": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(v2.Encoded)), - }, - "UpdatedAt": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().UnixNano(), 10), - }, - "TotalChunkSizeBytes": &types.AttributeValueMemberN{ - Value: strconv.Itoa(fragmentInfo.TotalChunkSizeBytes), - }, - "NumFragments": &types.AttributeValueMemberN{ - Value: strconv.Itoa(fragmentInfo.NumFragments), - }, - }, condition) - - if errors.Is(err, commondynamodb.ErrConditionFailed) { - blob, err := s.GetBlobMetadata(ctx, blobKey) - if err != nil { - s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err) - } - - if blob.BlobStatus == v2.Encoded { - return fmt.Errorf("%w: blob already in Encoded status", common.ErrAlreadyExists) - } - - return fmt.Errorf("%w: invalid status transition to Encoded status", ErrInvalidStateTransition) - } - - return err -} - -func (s *BlobMetadataStore) MarkBlobCertified(ctx context.Context, blobKey core.BlobKey) error { - return s.updateBlobStatus(ctx, blobKey, v2.Certified) -} - -func (s *BlobMetadataStore) MarkBlobFailed(ctx context.Context, blobKey core.BlobKey) error { - return s.updateBlobStatus(ctx, blobKey, v2.Failed) -} - -func (s *BlobMetadataStore) updateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error { +func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error { validStatuses := statusUpdatePrecondition[status] if len(validStatuses) == 0 { return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String()) @@ -237,8 +180,8 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st return count, nil } -func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error { - item, err := MarshalBlobCertificate(blobCert) +func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) error { + item, err := MarshalBlobCertificate(blobCert, fragmentInfo) if err != nil { return err } @@ -251,7 +194,7 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co return err } -func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) { +func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, *encoding.FragmentInfo, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ Value: blobKeyPrefix + blobKey.Hex(), @@ -262,19 +205,19 @@ func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core }) if err != nil { - return nil, err + return nil, nil, err } if item == nil { - return nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex()) + return nil, nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex()) } - cert, err := UnmarshalBlobCertificate(item) + cert, fragmentInfo, err := UnmarshalBlobCertificate(item) if err != nil { - return nil, err + return nil, nil, err } - return cert, nil + return cert, fragmentInfo, nil } func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { @@ -432,12 +375,21 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { return &metadata, nil } -func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) { +func MarshalBlobCertificate(blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) (commondynamodb.Item, error) { fields, err := attributevalue.MarshalMap(blobCert) if err != nil { return nil, fmt.Errorf("failed to marshal blob certificate: %w", err) } + // merge fragment info + fragmentInfoFields, err := attributevalue.MarshalMap(fragmentInfo) + if err != nil { + return nil, fmt.Errorf("failed to marshal fragment info: %w", err) + } + for k, v := range fragmentInfoFields { + fields[k] = v + } + // Add PK and SK fields blobKey, err := blobCert.BlobHeader.BlobKey() if err != nil { @@ -449,11 +401,16 @@ func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item return fields, nil } -func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) { +func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, *encoding.FragmentInfo, error) { cert := core.BlobCertificate{} err := attributevalue.UnmarshalMap(item, &cert) if err != nil { - return nil, err + return nil, nil, fmt.Errorf("failed to unmarshal blob certificate: %w", err) + } + fragmentInfo := encoding.FragmentInfo{} + err = attributevalue.UnmarshalMap(item, &fragmentInfo) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal fragment info: %w", err) } - return &cert, nil + return &cert, &fragmentInfo, nil } diff --git a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go index f11b0702f5..ae18404cba 100644 --- a/disperser/common/v2/blobstore/dynamo_metadata_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_metadata_store_test.go @@ -117,14 +117,19 @@ func TestBlobMetadataStoreCerts(t *testing.T) { ReferenceBlockNumber: uint64(100), RelayKeys: []corev2.RelayKey{0, 2, 4}, } - err := blobMetadataStore.PutBlobCertificate(ctx, blobCert) + fragmentInfo := &encoding.FragmentInfo{ + TotalChunkSizeBytes: 100, + NumFragments: 10, + } + err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo) assert.NoError(t, err) blobKey, err := blobCert.BlobHeader.BlobKey() assert.NoError(t, err) - fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey) + fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey) assert.NoError(t, err) assert.Equal(t, blobCert, fetchedCert) + assert.Equal(t, fragmentInfo, fetchedFragmentInfo) // blob cert with the same key should fail blobCert1 := &corev2.BlobCertificate{ @@ -142,7 +147,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) { ReferenceBlockNumber: uint64(1234), RelayKeys: []corev2.RelayKey{0}, } - err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1) + err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo) assert.ErrorIs(t, err, common.ErrAlreadyExists) deleteItems(t, []commondynamodb.Key{ @@ -180,29 +185,29 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) { assert.NoError(t, err) // Update the blob status to invalid status - err = blobMetadataStore.MarkBlobCertified(ctx, blobKey) + err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified) assert.ErrorIs(t, err, blobstore.ErrInvalidStateTransition) // Update the blob status to a valid status - err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{ - TotalChunkSizeBytes: 100, - NumFragments: 10, - }) + err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded) assert.NoError(t, err) // Update the blob status to same status - err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{ - TotalChunkSizeBytes: 200, - NumFragments: 20, - }) + err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded) assert.ErrorIs(t, err, common.ErrAlreadyExists) fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) assert.Equal(t, fetchedMetadata.BlobStatus, v2.Encoded) assert.Greater(t, fetchedMetadata.UpdatedAt, metadata.UpdatedAt) - assert.Equal(t, fetchedMetadata.TotalChunkSizeBytes, 100) - assert.Equal(t, fetchedMetadata.NumFragments, 10) + + // Update the blob status to a valid status + err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed) + assert.NoError(t, err) + + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, fetchedMetadata.BlobStatus, v2.Failed) deleteItems(t, []commondynamodb.Key{ { diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 867d633b57..3527033671 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -145,7 +145,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { } storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout) - err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert) + err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert, fragmentInfo) cancel() if err != nil && !errors.Is(err, dispcommon.ErrAlreadyExists) { e.logger.Error("failed to put blob certificate", "err", err) @@ -153,7 +153,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { } storeCtx, cancel = context.WithTimeout(ctx, e.StoreTimeout) - err = e.blobMetadataStore.MarkBlobEncoded(storeCtx, blobKey, fragmentInfo) + err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Encoded) cancel() if err == nil || errors.Is(err, dispcommon.ErrAlreadyExists) { // Successfully updated the status to Encoded @@ -165,7 +165,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { } storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout) - err = e.blobMetadataStore.MarkBlobFailed(storeCtx, blobKey) + err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Failed) cancel() if err != nil { e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err) diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index f956cefd75..916490d5e4 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -131,13 +131,15 @@ func TestHandleBatch(t *testing.T) { assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) - fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) + fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) assert.NoError(t, err) assert.Equal(t, fetchedCert.BlobHeader, blobHeader1) assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber) for _, relayKey := range fetchedCert.RelayKeys { assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } + assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) + assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5)) } func TestHandleBatchNoBlobs(t *testing.T) { @@ -188,13 +190,15 @@ func TestHandleBatchRetrySuccess(t *testing.T) { assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus) assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) - fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) + fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) assert.NoError(t, err) assert.Equal(t, fetchedCert.BlobHeader, blobHeader1) assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber) for _, relayKey := range fetchedCert.RelayKeys { assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey) } + assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100)) + assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5)) c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) } @@ -236,9 +240,10 @@ func TestHandleBatchRetryFailure(t *testing.T) { assert.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus) assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt) - fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) + fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1) assert.ErrorIs(t, err, dispcommon.ErrMetadataNotFound) assert.Nil(t, fetchedCert) + assert.Nil(t, fetchedFragmentInfo) c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2) } diff --git a/encoding/data.go b/encoding/data.go index b26c4c95d0..f8c208cc4d 100644 --- a/encoding/data.go +++ b/encoding/data.go @@ -63,6 +63,6 @@ type SubBatch struct { type ChunkNumber = uint type FragmentInfo struct { - TotalChunkSizeBytes int - NumFragments int + TotalChunkSizeBytes uint32 + NumFragments uint32 }