Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Variety of fixes in disperser #907

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,10 @@ func SerializeMerkleProof(proof *merkletree.Proof) []byte {
return proofBytes
}

func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) {
proof := &merkletree.Proof{}
func DeserializeMerkleProof(data []byte, index uint64) (*merkletree.Proof, error) {
proof := &merkletree.Proof{
Index: index,
}
if len(data)%32 != 0 {
return nil, fmt.Errorf("invalid proof length")
}
Expand Down
25 changes: 23 additions & 2 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type BlobHeader struct {
Signature []byte
}

func NewBlobHeader(proto *commonpb.BlobHeader) (*BlobHeader, error) {
func BlobHeaderFromProtobuf(proto *commonpb.BlobHeader) (*BlobHeader, error) {
commitment, err := new(encoding.G1Commitment).Deserialize(proto.GetCommitment().GetCommitment())
if err != nil {
return nil, err
Expand Down Expand Up @@ -202,6 +202,27 @@ func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) {
}, nil
}

func BlobCertificateFromProtobuf(proto *commonpb.BlobCertificate) (*BlobCertificate, error) {
if proto.GetBlobHeader() == nil {
return nil, errors.New("missing blob header in blob certificate")
}

blobHeader, err := BlobHeaderFromProtobuf(proto.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}

relayKeys := make([]RelayKey, len(proto.GetRelays()))
for i, r := range proto.GetRelays() {
relayKeys[i] = RelayKey(r)
}

return &BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: relayKeys,
}, nil
}

type BatchHeader struct {
// BatchRoot is the root of a Merkle tree whose leaves are the keys of the blobs in the batch
BatchRoot [32]byte
Expand Down Expand Up @@ -272,7 +293,7 @@ func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) {

blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates()))
for i, cert := range proto.GetBlobCertificates() {
blobHeader, err := NewBlobHeader(cert.GetBlobHeader())
blobHeader, err := BlobHeaderFromProtobuf(cert.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}
Expand Down
61 changes: 61 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,64 @@ func TestConvertBatchToFromProtobuf(t *testing.T) {

assert.Equal(t, batch, newBatch)
}

func TestConvertBlobHeaderToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}

pb, err := bh.ToProtobuf()
assert.NoError(t, err)

newBH, err := v2.BlobHeaderFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, bh, newBH)
}

func TestConvertBlobCertToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}

blobCert := &v2.BlobCertificate{
BlobHeader: bh,
RelayKeys: []v2.RelayKey{0, 1},
}

pb, err := blobCert.ToProtobuf()
assert.NoError(t, err)

newBlobCert, err := v2.BlobCertificateFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, blobCert, newBlobCert)
}
4 changes: 2 additions & 2 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
}

data := req.GetData()
blobHeader, err := corev2.NewBlobHeader(req.GetBlobHeader())
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInternal(err.Error())
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *DispersalServerV2) validateDispersalRequest(req *pb.DisperseBlobRequest
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob version %d; valid blob versions are: %v", blobHeaderProto.GetVersion(), validVersions))
}

blobHeader, err := corev2.NewBlobHeader(blobHeaderProto)
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
if err != nil {
return api.NewErrorInvalidArg(fmt.Sprintf("invalid blob header: %s", err.Error()))
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestV2DisperseBlob(t *testing.T) {
CumulativePayment: big.NewInt(100).Bytes(),
},
}
blobHeader, err := corev2.NewBlobHeader(blobHeaderProto)
blobHeader, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto)
assert.NoError(t, err)
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
sig, err := signer.SignBlobRequest(blobHeader)
Expand Down
21 changes: 20 additions & 1 deletion disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ func NewConfig(ctx *cli.Context) (Config, error) {
return Config{}, err
}

encodingConfig := kzg.ReadCLIConfig(ctx)
if version == uint(V2) {
if encodingConfig.G1Path == "" {
return Config{}, fmt.Errorf("G1Path must be specified for disperser version 2")
}
if encodingConfig.G2Path == "" {
return Config{}, fmt.Errorf("G2Path must be specified for disperser version 2")
}
if encodingConfig.CacheDir == "" {
return Config{}, fmt.Errorf("CacheDir must be specified for disperser version 2")
}
if encodingConfig.SRSOrder <= 0 {
return Config{}, fmt.Errorf("SRSOrder must be specified for disperser version 2")
}
if encodingConfig.SRSNumberToLoad <= 0 {
return Config{}, fmt.Errorf("SRSNumberToLoad must be specified for disperser version 2")
}
}

config := Config{
DisperserVersion: DisperserVersion(version),
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
Expand All @@ -90,7 +109,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
},
RatelimiterConfig: ratelimiterConfig,
RateConfig: rateConfig,
EncodingConfig: kzg.ReadCLIConfig(ctx),
EncodingConfig: encodingConfig,
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
EnablePaymentMeterer: ctx.GlobalBool(flags.EnablePaymentMeterer.Name),
ReservationsTableName: ctx.GlobalString(flags.ReservationsTableName.Name),
Expand Down
6 changes: 4 additions & 2 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
newLastUpdatedAt := d.lastUpdatedAt
for i, metadata := range blobMetadatas {
if metadata == nil || metadata.BlobHeader == nil {
return nil, fmt.Errorf("invalid blob metadata")
Expand All @@ -252,8 +253,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob key: %w", err)
}
keys[i] = blobKey
if metadata.UpdatedAt > d.lastUpdatedAt {
d.lastUpdatedAt = metadata.UpdatedAt
if metadata.UpdatedAt > newLastUpdatedAt {
newLastUpdatedAt = metadata.UpdatedAt
}
}

Expand Down Expand Up @@ -343,6 +344,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to put blob verification infos: %w", err)
}

d.lastUpdatedAt = newLastUpdatedAt
return &batchData{
Batch: &corev2.Batch{
BatchHeader: batchHeader,
Expand Down
2 changes: 1 addition & 1 deletion disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestDispatcherNewBatch(t *testing.T) {
require.Equal(t, bh, vi0.BatchHeader)
certHash, err := cert.Hash()
require.NoError(t, err)
proof, err := core.DeserializeMerkleProof(vi0.InclusionProof)
proof, err := core.DeserializeMerkleProof(vi0.InclusionProof, uint64(vi0.BlobIndex))
require.NoError(t, err)
verified, err := merkletree.VerifyProofUsing(certHash[:], false, proof, [][]byte{vi0.BatchRoot[:]}, keccak256.New())
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

for _, blob := range blobMetadatas {
blob := blob
blobKey, err := blob.BlobHeader.BlobKey()
if err != nil {
e.logger.Error("failed to get blob key", "err", err, "requestedAt", blob.RequestedAt, "paymentMetadata", blob.BlobHeader.PaymentMetadata)
Expand Down
6 changes: 6 additions & 0 deletions disperser/encoder/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (s *EncoderServerV2) handleEncodingToChunkStore(ctx context.Context, req *p
encodingStart := time.Now()
frames, err := s.prover.GetFrames(data, encodingParams)
if err != nil {
s.logger.Error("failed to encode frames", "error", err)
return nil, status.Errorf(codes.Internal, "encoding failed: %v", err)
}
s.logger.Info("encoding frames", "duration", time.Since(encodingStart))
Expand Down Expand Up @@ -204,6 +205,11 @@ func (s *EncoderServerV2) validateAndParseRequest(req *pb.EncodeBlobRequest) (co
NumChunks: req.EncodingParams.NumChunks,
}

err = encoding.ValidateEncodingParams(params, s.prover.GetSRSOrder())
if err != nil {
return blobKey, params, status.Errorf(codes.InvalidArgument, "invalid encoding parameters: %v", err)
}

return blobKey, params, nil
}

Expand Down
2 changes: 2 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Prover interface {
GetFrames(data []byte, params EncodingParams) ([]*Frame, error)

GetMultiFrameProofs(data []byte, params EncodingParams) ([]Proof, error)

GetSRSOrder() uint64
}

type Verifier interface {
Expand Down
4 changes: 4 additions & 0 deletions encoding/kzg/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (g *Prover) GetKzgEncoder(params encoding.EncodingParams) (*ParametrizedPro
return enc, err
}

func (g *Prover) GetSRSOrder() uint64 {
return g.SRSOrder
}

// Detect the precomputed table from the specified directory
// the file name follow the name convention of
//
Expand Down
5 changes: 5 additions & 0 deletions encoding/mock/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (e *MockEncoder) GetMultiFrameProofs(data []byte, params encoding.EncodingP
return args.Get(0).([]encoding.Proof), args.Error(1)
}

func (e *MockEncoder) GetSRSOrder() uint64 {
args := e.Called()
return args.Get(0).(uint64)
}

func (e *MockEncoder) VerifyFrames(chunks []*encoding.Frame, indices []encoding.ChunkNumber, commitments encoding.BlobCommitments, params encoding.EncodingParams) error {
args := e.Called(chunks, indices, commitments, params)
time.Sleep(e.Delay)
Expand Down
19 changes: 10 additions & 9 deletions relay/relay_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ package relay
import (
"context"
"fmt"
"log"
"math/big"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common"
pbcommonv2 "github.com/Layr-Labs/eigenda/api/grpc/common/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -24,14 +33,6 @@ import (
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"log"
"math/big"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -196,7 +197,7 @@ func randomBlob(t *testing.T) (*v2.BlobHeader, []byte) {
CumulativePayment: big.NewInt(100).Bytes(),
},
}
blobHeader, err := v2.NewBlobHeader(blobHeaderProto)
blobHeader, err := v2.BlobHeaderFromProtobuf(blobHeaderProto)
require.NoError(t, err)

return blobHeader, data
Expand Down
Loading