From 38e41299f09d8ca21958c5dd073f8e09d5ea7d2b Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Mon, 18 Nov 2024 09:42:08 -0800 Subject: [PATCH] variety of fixes in disperser --- core/serialization.go | 6 ++- core/v2/types.go | 25 +++++++++- core/v2/types_test.go | 61 ++++++++++++++++++++++++ disperser/apiserver/disperse_blob_v2.go | 4 +- disperser/apiserver/server_v2_test.go | 2 +- disperser/cmd/apiserver/config.go | 21 +++++++- disperser/controller/dispatcher.go | 6 ++- disperser/controller/dispatcher_test.go | 2 +- disperser/controller/encoding_manager.go | 1 + disperser/encoder/server_v2.go | 6 +++ encoding/encoding.go | 2 + encoding/kzg/prover/prover.go | 4 ++ encoding/mock/encoder.go | 5 ++ relay/relay_test_utils.go | 19 ++++---- 14 files changed, 144 insertions(+), 20 deletions(-) diff --git a/core/serialization.go b/core/serialization.go index 00ae61f3d4..c9e7f7a1be 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -493,8 +493,10 @@ func SerializeMerkleProof(proof *merkletree.Proof) []byte { return proofBytes } -func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) { - proof := &merkletree.Proof{} +func DeserializeMerkleProof(data []byte, index uint64) (*merkletree.Proof, error) { + proof := &merkletree.Proof{ + Index: index, + } if len(data)%32 != 0 { return nil, fmt.Errorf("invalid proof length") } diff --git a/core/v2/types.go b/core/v2/types.go index df4f364cb9..aeb3c34fc5 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -84,7 +84,7 @@ type BlobHeader struct { Signature []byte } -func NewBlobHeader(proto *commonpb.BlobHeader) (*BlobHeader, error) { +func BlobHeaderFromProtobuf(proto *commonpb.BlobHeader) (*BlobHeader, error) { commitment, err := new(encoding.G1Commitment).Deserialize(proto.GetCommitment().GetCommitment()) if err != nil { return nil, err @@ -202,6 +202,27 @@ func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) { }, nil } +func BlobCertificateFromProtobuf(proto *commonpb.BlobCertificate) (*BlobCertificate, error) { + if proto.GetBlobHeader() == nil { + return nil, errors.New("missing blob header in blob certificate") + } + + blobHeader, err := BlobHeaderFromProtobuf(proto.GetBlobHeader()) + if err != nil { + return nil, fmt.Errorf("failed to create blob header: %v", err) + } + + relayKeys := make([]RelayKey, len(proto.GetRelays())) + for i, r := range proto.GetRelays() { + relayKeys[i] = RelayKey(r) + } + + return &BlobCertificate{ + BlobHeader: blobHeader, + RelayKeys: relayKeys, + }, nil +} + type BatchHeader struct { // BatchRoot is the root of a Merkle tree whose leaves are the keys of the blobs in the batch BatchRoot [32]byte @@ -272,7 +293,7 @@ func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) { blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates())) for i, cert := range proto.GetBlobCertificates() { - blobHeader, err := NewBlobHeader(cert.GetBlobHeader()) + blobHeader, err := BlobHeaderFromProtobuf(cert.GetBlobHeader()) if err != nil { return nil, fmt.Errorf("failed to create blob header: %v", err) } diff --git a/core/v2/types_test.go b/core/v2/types_test.go index 38425d73d6..89ea5256ad 100644 --- a/core/v2/types_test.go +++ b/core/v2/types_test.go @@ -65,3 +65,64 @@ func TestConvertBatchToFromProtobuf(t *testing.T) { assert.Equal(t, batch, newBatch) } + +func TestConvertBlobHeaderToFromProtobuf(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + + pb, err := bh.ToProtobuf() + assert.NoError(t, err) + + newBH, err := v2.BlobHeaderFromProtobuf(pb) + assert.NoError(t, err) + + assert.Equal(t, bh, newBH) +} + +func TestConvertBlobCertToFromProtobuf(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + + blobCert := &v2.BlobCertificate{ + BlobHeader: bh, + RelayKeys: []v2.RelayKey{0, 1}, + } + + pb, err := blobCert.ToProtobuf() + assert.NoError(t, err) + + newBlobCert, err := v2.BlobCertificateFromProtobuf(pb) + assert.NoError(t, err) + + assert.Equal(t, blobCert, newBlobCert) +} diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index 72411164bf..d39d75f2cd 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -25,7 +25,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl } data := req.GetData() - blobHeader, err := corev2.NewBlobHeader(req.GetBlobHeader()) + blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader()) if err != nil { return nil, api.NewErrorInternal(err.Error()) } @@ -105,7 +105,7 @@ func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), validVersions)) } - blobHeader, err := corev2.NewBlobHeader(blobHeaderProto) + blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto) if err != nil { return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error())) } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index d262f8812c..0c2697366f 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -67,7 +67,7 @@ func TestV2DisperseBlob(t *testing.T) { CumulativePayment: big.NewInt(100).Bytes(), }, } - blobHeader, err := corev2.NewBlobHeader(blobHeaderProto) + blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto) assert.NoError(t, err) signer := auth.NewLocalBlobRequestSigner(privateKeyHex) sig, err := signer.SignBlobRequest(blobHeader) diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index b1259020dc..6e25b8af65 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -72,6 +72,25 @@ func NewConfig(ctx *cli.Context) (Config, error) { return Config{}, err } + encodingConfig := kzg.ReadCLIConfig(ctx) + if version == uint(V2) { + if encodingConfig.G1Path == "" { + return Config{}, fmt.Errorf("G1Path must be specified for disperser version 2") + } + if encodingConfig.G2Path == "" { + return Config{}, fmt.Errorf("G2Path must be specified for disperser version 2") + } + if encodingConfig.CacheDir == "" { + return Config{}, fmt.Errorf("CacheDir must be specified for disperser version 2") + } + if encodingConfig.SRSOrder <= 0 { + return Config{}, fmt.Errorf("SRSOrder must be specified for disperser version 2") + } + if encodingConfig.SRSNumberToLoad <= 0 { + return Config{}, fmt.Errorf("SRSNumberToLoad must be specified for disperser version 2") + } + } + config := Config{ DisperserVersion: DisperserVersion(version), AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), @@ -90,7 +109,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { }, RatelimiterConfig: ratelimiterConfig, RateConfig: rateConfig, - EncodingConfig: kzg.ReadCLIConfig(ctx), + EncodingConfig: encodingConfig, EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), EnablePaymentMeterer: ctx.GlobalBool(flags.EnablePaymentMeterer.Name), ReservationsTableName: ctx.GlobalString(flags.ReservationsTableName.Name), diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 584e4a4390..8319b585b5 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -243,6 +243,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) } keys := make([]corev2.BlobKey, len(blobMetadatas)) + newLastUpdatedAt := d.lastUpdatedAt for i, metadata := range blobMetadatas { if metadata == nil || metadata.BlobHeader == nil { return nil, fmt.Errorf("invalid blob metadata") @@ -252,8 +253,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to get blob key: %w", err) } keys[i] = blobKey - if metadata.UpdatedAt > d.lastUpdatedAt { - d.lastUpdatedAt = metadata.UpdatedAt + if metadata.UpdatedAt > newLastUpdatedAt { + newLastUpdatedAt = metadata.UpdatedAt } } @@ -343,6 +344,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to put blob verification infos: %w", err) } + d.lastUpdatedAt = newLastUpdatedAt return &batchData{ Batch: &corev2.Batch{ BatchHeader: batchHeader, diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go index 718384fc6a..8630eda061 100644 --- a/disperser/controller/dispatcher_test.go +++ b/disperser/controller/dispatcher_test.go @@ -158,7 +158,7 @@ func TestDispatcherNewBatch(t *testing.T) { require.Equal(t, bh, vi0.BatchHeader) certHash, err := cert.Hash() require.NoError(t, err) - proof, err := core.DeserializeMerkleProof(vi0.InclusionProof) + proof, err := core.DeserializeMerkleProof(vi0.InclusionProof, uint64(vi0.BlobIndex)) require.NoError(t, err) verified, err := merkletree.VerifyProofUsing(certHash[:], false, proof, [][]byte{vi0.BatchRoot[:]}, keccak256.New()) require.NoError(t, err) diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 43a49531e1..49b6aca2b4 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -112,6 +112,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { } for _, blob := range blobMetadatas { + blob := blob blobKey, err := blob.BlobHeader.BlobKey() if err != nil { e.logger.Error("failed to get blob key", "err", err, "requestedAt", blob.RequestedAt, "paymentMetadata", blob.BlobHeader.PaymentMetadata) diff --git a/disperser/encoder/server_v2.go b/disperser/encoder/server_v2.go index 45890ac857..498955594c 100644 --- a/disperser/encoder/server_v2.go +++ b/disperser/encoder/server_v2.go @@ -152,6 +152,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p encodingStart := time.Now() frames, err := s.prover.GetFrames(data, encodingParams) if err != nil { + s.logger.Error("failed to encode frames", "error", err) return nil, status.Errorf(codes.Internal, "encoding failed: %v", err) } s.logger.Info("encoding frames", "duration", time.Since(encodingStart)) @@ -204,6 +205,11 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co NumChunks: req.EncodingParams.NumChunks, } + err = encoding.ValidateEncodingParams(params, s.prover.GetSRSOrder()) + if err != nil { + return blobKey, params, status.Errorf(codes.InvalidArgument, "invalid encoding parameters: %v", err) + } + return blobKey, params, nil } diff --git a/encoding/encoding.go b/encoding/encoding.go index b7c9263ef7..221e851ad8 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -17,6 +17,8 @@ type Prover interface { GetFrames(data []byte, params EncodingParams) ([]*Frame, error) GetMultiFrameProofs(data []byte, params EncodingParams) ([]Proof, error) + + GetSRSOrder() uint64 } type Verifier interface { diff --git a/encoding/kzg/prover/prover.go b/encoding/kzg/prover/prover.go index d81cec4360..d315c04b8d 100644 --- a/encoding/kzg/prover/prover.go +++ b/encoding/kzg/prover/prover.go @@ -265,6 +265,10 @@ func (g *Prover) GetKzgEncoder(params encoding.EncodingParams) (*ParametrizedPro return enc, err } +func (g *Prover) GetSRSOrder() uint64 { + return g.SRSOrder +} + // Detect the precomputed table from the specified directory // the file name follow the name convention of // diff --git a/encoding/mock/encoder.go b/encoding/mock/encoder.go index 0cae8bdde7..87ed90d026 100644 --- a/encoding/mock/encoder.go +++ b/encoding/mock/encoder.go @@ -41,6 +41,11 @@ func (e *MockEncoder) GetMultiFrameProofs(data []byte, params encoding.EncodingP return args.Get(0).([]encoding.Proof), args.Error(1) } +func (e *MockEncoder) GetSRSOrder() uint64 { + args := e.Called() + return args.Get(0).(uint64) +} + func (e *MockEncoder) VerifyFrames(chunks []*encoding.Frame, indices []encoding.ChunkNumber, commitments encoding.BlobCommitments, params encoding.EncodingParams) error { args := e.Called(chunks, indices, commitments, params) time.Sleep(e.Delay) diff --git a/relay/relay_test_utils.go b/relay/relay_test_utils.go index 3c6d297b9e..0f5fdf1cf9 100644 --- a/relay/relay_test_utils.go +++ b/relay/relay_test_utils.go @@ -3,6 +3,15 @@ package relay import ( "context" "fmt" + "log" + "math/big" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" pbcommonv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2" "github.com/Layr-Labs/eigenda/common" @@ -24,14 +33,6 @@ import ( "github.com/google/uuid" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" - "log" - "math/big" - "os" - "path/filepath" - "runtime" - "strings" - "testing" - "time" ) var ( @@ -196,7 +197,7 @@ func randomBlob(t *testing.T) (*v2.BlobHeader, []byte) { CumulativePayment: big.NewInt(100).Bytes(), }, } - blobHeader, err := v2.NewBlobHeader(blobHeaderProto) + blobHeader, err := v2.BlobHeaderFromProtobuf(blobHeaderProto) require.NoError(t, err) return blobHeader, data