Skip to content

Commit

Permalink
store fragmentInfo in blobCert
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Oct 31, 2024
1 parent 1d25e75 commit e76e3a6
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 92 deletions.
97 changes: 27 additions & 70 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,64 +71,7 @@ func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v
return err
}

func (s *BlobMetadataStore) MarkBlobEncoded(ctx context.Context, blobKey core.BlobKey, fragmentInfo *encoding.FragmentInfo) error {
validStatuses := statusUpdatePrecondition[v2.Encoded]
if len(validStatuses) == 0 {
return fmt.Errorf("%w: invalid status transition to Encoded", ErrInvalidStateTransition)
}

expValues := make([]expression.OperandBuilder, len(validStatuses))
for i, validStatus := range validStatuses {
expValues[i] = expression.Value(int(validStatus))
}
condition := expression.Name("BlobStatus").In(expValues[0], expValues[1:]...)
_, err := s.dynamoDBClient.UpdateItemWithCondition(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
}, map[string]types.AttributeValue{
"BlobStatus": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(v2.Encoded)),
},
"UpdatedAt": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().UnixNano(), 10),
},
"TotalChunkSizeBytes": &types.AttributeValueMemberN{
Value: strconv.Itoa(fragmentInfo.TotalChunkSizeBytes),
},
"NumFragments": &types.AttributeValueMemberN{
Value: strconv.Itoa(fragmentInfo.NumFragments),
},
}, condition)

if errors.Is(err, commondynamodb.ErrConditionFailed) {
blob, err := s.GetBlobMetadata(ctx, blobKey)
if err != nil {
s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
}

if blob.BlobStatus == v2.Encoded {
return fmt.Errorf("%w: blob already in Encoded status", common.ErrAlreadyExists)
}

return fmt.Errorf("%w: invalid status transition to Encoded status", ErrInvalidStateTransition)
}

return err
}

func (s *BlobMetadataStore) MarkBlobCertified(ctx context.Context, blobKey core.BlobKey) error {
return s.updateBlobStatus(ctx, blobKey, v2.Certified)
}

func (s *BlobMetadataStore) MarkBlobFailed(ctx context.Context, blobKey core.BlobKey) error {
return s.updateBlobStatus(ctx, blobKey, v2.Failed)
}

func (s *BlobMetadataStore) updateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error {
func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey core.BlobKey, status v2.BlobStatus) error {
validStatuses := statusUpdatePrecondition[status]
if len(validStatuses) == 0 {
return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
Expand Down Expand Up @@ -237,8 +180,8 @@ func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st
return count, nil
}

func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate) error {
item, err := MarshalBlobCertificate(blobCert)
func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) error {
item, err := MarshalBlobCertificate(blobCert, fragmentInfo)
if err != nil {
return err
}
Expand All @@ -251,7 +194,7 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co
return err
}

func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, error) {
func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core.BlobKey) (*core.BlobCertificate, *encoding.FragmentInfo, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
Expand All @@ -262,19 +205,19 @@ func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey core
})

if err != nil {
return nil, err
return nil, nil, err
}

if item == nil {
return nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
return nil, nil, fmt.Errorf("%w: certificate not found for key %s", common.ErrMetadataNotFound, blobKey.Hex())
}

cert, err := UnmarshalBlobCertificate(item)
cert, fragmentInfo, err := UnmarshalBlobCertificate(item)
if err != nil {
return nil, err
return nil, nil, err
}

return cert, nil
return cert, fragmentInfo, nil
}

func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput {
Expand Down Expand Up @@ -432,12 +375,21 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) {
return &metadata, nil
}

func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item, error) {
func MarshalBlobCertificate(blobCert *core.BlobCertificate, fragmentInfo *encoding.FragmentInfo) (commondynamodb.Item, error) {
fields, err := attributevalue.MarshalMap(blobCert)
if err != nil {
return nil, fmt.Errorf("failed to marshal blob certificate: %w", err)
}

// merge fragment info
fragmentInfoFields, err := attributevalue.MarshalMap(fragmentInfo)
if err != nil {
return nil, fmt.Errorf("failed to marshal fragment info: %w", err)
}
for k, v := range fragmentInfoFields {
fields[k] = v
}

// Add PK and SK fields
blobKey, err := blobCert.BlobHeader.BlobKey()
if err != nil {
Expand All @@ -449,11 +401,16 @@ func MarshalBlobCertificate(blobCert *core.BlobCertificate) (commondynamodb.Item
return fields, nil
}

func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, error) {
func UnmarshalBlobCertificate(item commondynamodb.Item) (*core.BlobCertificate, *encoding.FragmentInfo, error) {
cert := core.BlobCertificate{}
err := attributevalue.UnmarshalMap(item, &cert)
if err != nil {
return nil, err
return nil, nil, fmt.Errorf("failed to unmarshal blob certificate: %w", err)
}
fragmentInfo := encoding.FragmentInfo{}
err = attributevalue.UnmarshalMap(item, &fragmentInfo)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal fragment info: %w", err)
}
return &cert, nil
return &cert, &fragmentInfo, nil
}
33 changes: 19 additions & 14 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,19 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
ReferenceBlockNumber: uint64(100),
RelayKeys: []corev2.RelayKey{0, 2, 4},
}
err := blobMetadataStore.PutBlobCertificate(ctx, blobCert)
fragmentInfo := &encoding.FragmentInfo{
TotalChunkSizeBytes: 100,
NumFragments: 10,
}
err := blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo)
assert.NoError(t, err)

blobKey, err := blobCert.BlobHeader.BlobKey()
assert.NoError(t, err)
fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey)
fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, blobCert, fetchedCert)
assert.Equal(t, fragmentInfo, fetchedFragmentInfo)

// blob cert with the same key should fail
blobCert1 := &corev2.BlobCertificate{
Expand All @@ -142,7 +147,7 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
ReferenceBlockNumber: uint64(1234),
RelayKeys: []corev2.RelayKey{0},
}
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1)
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

deleteItems(t, []commondynamodb.Key{
Expand Down Expand Up @@ -180,29 +185,29 @@ func TestBlobMetadataStoreUpdateBlobStatus(t *testing.T) {
assert.NoError(t, err)

// Update the blob status to invalid status
err = blobMetadataStore.MarkBlobCertified(ctx, blobKey)
err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
assert.ErrorIs(t, err, blobstore.ErrInvalidStateTransition)

// Update the blob status to a valid status
err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
TotalChunkSizeBytes: 100,
NumFragments: 10,
})
err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded)
assert.NoError(t, err)

// Update the blob status to same status
err = blobMetadataStore.MarkBlobEncoded(ctx, blobKey, &encoding.FragmentInfo{
TotalChunkSizeBytes: 200,
NumFragments: 20,
})
err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Encoded)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, fetchedMetadata.BlobStatus, v2.Encoded)
assert.Greater(t, fetchedMetadata.UpdatedAt, metadata.UpdatedAt)
assert.Equal(t, fetchedMetadata.TotalChunkSizeBytes, 100)
assert.Equal(t, fetchedMetadata.NumFragments, 10)

// Update the blob status to a valid status
err = blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
assert.NoError(t, err)

fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, fetchedMetadata.BlobStatus, v2.Failed)

deleteItems(t, []commondynamodb.Key{
{
Expand Down
6 changes: 3 additions & 3 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert)
err = e.blobMetadataStore.PutBlobCertificate(storeCtx, cert, fragmentInfo)
cancel()
if err != nil && !errors.Is(err, dispcommon.ErrAlreadyExists) {
e.logger.Error("failed to put blob certificate", "err", err)
continue
}

storeCtx, cancel = context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.MarkBlobEncoded(storeCtx, blobKey, fragmentInfo)
err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Encoded)
cancel()
if err == nil || errors.Is(err, dispcommon.ErrAlreadyExists) {
// Successfully updated the status to Encoded
Expand All @@ -165,7 +165,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.MarkBlobFailed(storeCtx, blobKey)
err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Failed)
cancel()
if err != nil {
e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
Expand Down
11 changes: 8 additions & 3 deletions disperser/controller/encoding_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ func TestHandleBatch(t *testing.T) {
assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)

fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
assert.NoError(t, err)
assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
for _, relayKey := range fetchedCert.RelayKeys {
assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
}
assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
}

func TestHandleBatchNoBlobs(t *testing.T) {
Expand Down Expand Up @@ -188,13 +190,15 @@ func TestHandleBatchRetrySuccess(t *testing.T) {
assert.Equal(t, commonv2.Encoded, fetchedMetadata.BlobStatus)
assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)

fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
assert.NoError(t, err)
assert.Equal(t, fetchedCert.BlobHeader, blobHeader1)
assert.Equal(t, uint32(fetchedCert.ReferenceBlockNumber), blockNumber)
for _, relayKey := range fetchedCert.RelayKeys {
assert.Contains(t, c.EncodingManager.AvailableRelays, relayKey)
}
assert.Equal(t, fetchedFragmentInfo.TotalChunkSizeBytes, uint32(100))
assert.Equal(t, fetchedFragmentInfo.NumFragments, uint32(5))
c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
}

Expand Down Expand Up @@ -236,9 +240,10 @@ func TestHandleBatchRetryFailure(t *testing.T) {
assert.Equal(t, commonv2.Failed, fetchedMetadata.BlobStatus)
assert.Greater(t, fetchedMetadata.UpdatedAt, metadata1.UpdatedAt)

fetchedCert, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
fetchedCert, fetchedFragmentInfo, err := blobMetadataStore.GetBlobCertificate(ctx, blobKey1)
assert.ErrorIs(t, err, dispcommon.ErrMetadataNotFound)
assert.Nil(t, fetchedCert)
assert.Nil(t, fetchedFragmentInfo)
c.EncodingClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
}

Expand Down
4 changes: 2 additions & 2 deletions encoding/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ type SubBatch struct {
type ChunkNumber = uint

type FragmentInfo struct {
TotalChunkSizeBytes int
NumFragments int
TotalChunkSizeBytes uint32
NumFragments uint32
}

0 comments on commit e76e3a6

Please sign in to comment.