diff --git a/api/grpc/node/node_grpc.pb.go b/api/grpc/node/node_grpc.pb.go index 28a516cec7..6c44467fac 100644 --- a/api/grpc/node/node_grpc.pb.go +++ b/api/grpc/node/node_grpc.pb.go @@ -39,9 +39,11 @@ type DispersalClient interface { // StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema // so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch. // StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method. + // DEPRECATED: StoreBlobs method is not used StoreBlobs(ctx context.Context, in *StoreBlobsRequest, opts ...grpc.CallOption) (*StoreBlobsReply, error) // AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch. // It will return a signature at the end to attest to the aggregated batch. + // DEPRECATED: AttestBatch method is not used AttestBatch(ctx context.Context, in *AttestBatchRequest, opts ...grpc.CallOption) (*AttestBatchReply, error) // Retrieve node info metadata NodeInfo(ctx context.Context, in *NodeInfoRequest, opts ...grpc.CallOption) (*NodeInfoReply, error) @@ -105,9 +107,11 @@ type DispersalServer interface { // StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema // so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch. // StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method. + // DEPRECATED: StoreBlobs method is not used StoreBlobs(context.Context, *StoreBlobsRequest) (*StoreBlobsReply, error) // AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch. // It will return a signature at the end to attest to the aggregated batch. + // DEPRECATED: AttestBatch method is not used AttestBatch(context.Context, *AttestBatchRequest) (*AttestBatchReply, error) // Retrieve node info metadata NodeInfo(context.Context, *NodeInfoRequest) (*NodeInfoReply, error) diff --git a/api/proto/node/node.proto b/api/proto/node/node.proto index f32fea64a8..d8f52310e7 100644 --- a/api/proto/node/node.proto +++ b/api/proto/node/node.proto @@ -18,9 +18,11 @@ service Dispersal { // StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema // so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch. // StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method. + // DEPRECATED: StoreBlobs method is not used rpc StoreBlobs(StoreBlobsRequest) returns (StoreBlobsReply) {} // AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch. // It will return a signature at the end to attest to the aggregated batch. + // DEPRECATED: AttestBatch method is not used rpc AttestBatch(AttestBatchRequest) returns (AttestBatchReply) {} // Retrieve node info metadata rpc NodeInfo(NodeInfoRequest) returns (NodeInfoReply) {} diff --git a/node/grpc/server.go b/node/grpc/server.go index 8394951f6f..f6fa598411 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -17,7 +17,6 @@ import ( "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/golang/protobuf/ptypes/wrappers" "github.com/shirou/gopsutil/mem" "github.com/wealdtech/go-merkletree/v2" "github.com/wealdtech/go-merkletree/v2/keccak256" @@ -25,7 +24,6 @@ import ( _ "go.uber.org/automaxprocs" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/wrapperspb" ) // Server implements the Node proto APIs. @@ -170,125 +168,12 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p return reply, err } -func (s *Server) validateStoreBlobsRequest(in *pb.StoreBlobsRequest) error { - if in.GetReferenceBlockNumber() == 0 { - return api.NewErrorInvalidArg("missing reference_block_number in request") - } - - if len(in.GetBlobs()) == 0 { - return api.NewErrorInvalidArg("missing blobs in request") - } - for _, blob := range in.Blobs { - if blob.GetHeader() == nil { - return api.NewErrorInvalidArg("missing blob header in request") - } - if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil { - return api.NewErrorInvalidArg("invalid points contained in the blob header in request") - } - if len(blob.GetHeader().GetQuorumHeaders()) == 0 { - return api.NewErrorInvalidArg("missing quorum headers in request") - } - if len(blob.GetHeader().GetQuorumHeaders()) != len(blob.GetBundles()) { - return api.NewErrorInvalidArg("the number of quorums must be the same as the number of bundles") - } - for _, q := range blob.GetHeader().GetQuorumHeaders() { - if q.GetQuorumId() > core.MaxQuorumID { - return api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId())) - } - if err := core.ValidateSecurityParam(q.GetConfirmationThreshold(), q.GetAdversaryThreshold()); err != nil { - return err - } - } - if in.GetReferenceBlockNumber() != blob.GetHeader().GetReferenceBlockNumber() { - return api.NewErrorInvalidArg("reference_block_number must be the same for all blobs") - } - } - return nil -} - func (s *Server) StoreBlobs(ctx context.Context, in *pb.StoreBlobsRequest) (*pb.StoreBlobsReply, error) { - start := time.Now() - - err := s.validateStoreBlobsRequest(in) - if err != nil { - return nil, err - } - - blobHeadersSize := 0 - bundleSize := 0 - for _, blob := range in.Blobs { - blobHeadersSize += proto.Size(blob.GetHeader()) - for _, bundle := range blob.GetBundles() { - bundleSize += proto.Size(bundle) - } - } - s.node.Logger.Info("StoreBlobs RPC request received", "numBlobs", len(in.Blobs), "reqMsgSize", proto.Size(in), "blobHeadersSize", blobHeadersSize, "bundleSize", bundleSize, "referenceBlockNumber", in.GetReferenceBlockNumber()) - - // Process the request - blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers) - if err != nil { - return nil, err - } - - s.node.Metrics.ObserveLatency("StoreBlobs", "deserialization", float64(time.Since(start).Milliseconds())) - s.node.Logger.Info("StoreBlobsRequest deserialized", "duration", time.Since(start)) - - signatures, err := s.node.ProcessBlobs(ctx, blobs, in.GetBlobs()) - if err != nil { - return nil, err - } - - signaturesBytes := make([]*wrappers.BytesValue, len(signatures)) - for i, sig := range signatures { - if sig == nil { - signaturesBytes[i] = nil - continue - } - signaturesBytes[i] = wrapperspb.Bytes(sig.Serialize()) - } - - return &pb.StoreBlobsReply{Signatures: signaturesBytes}, nil + return &pb.StoreBlobsReply{}, api.NewErrorUnimplemented() } func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*pb.AttestBatchReply, error) { - start := time.Now() - - // Validate the batch root - blobHeaderHashes := make([][32]byte, len(in.GetBlobHeaderHashes())) - for i, hash := range in.GetBlobHeaderHashes() { - if len(hash) != 32 { - return nil, api.NewErrorInvalidArg("invalid blob header hash") - } - var h [32]byte - copy(h[:], hash) - blobHeaderHashes[i] = h - } - batchHeader, err := node.GetBatchHeader(in.GetBatchHeader()) - if err != nil { - return nil, fmt.Errorf("failed to get the batch header: %w", err) - } - err = s.node.ValidateBatchContents(ctx, blobHeaderHashes, batchHeader) - if err != nil { - return nil, fmt.Errorf("failed to validate the batch header root: %w", err) - } - - // Store the mapping from batch header + blob index to blob header hashes - err = s.node.Store.StoreBatchBlobMapping(ctx, batchHeader, blobHeaderHashes) - if err != nil { - return nil, fmt.Errorf("failed to store the batch blob mapping: %w", err) - } - - // Sign the batch header - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - if err != nil { - return nil, fmt.Errorf("failed to get the batch header hash: %w", err) - } - sig := s.node.KeyPair.SignMessage(batchHeaderHash) - - s.node.Logger.Info("AttestBatch complete", "duration", time.Since(start)) - return &pb.AttestBatchReply{ - Signature: sig.Serialize(), - }, nil + return &pb.AttestBatchReply{}, api.NewErrorUnimplemented() } func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) { diff --git a/node/grpc/server_test.go b/node/grpc/server_test.go index 852a19c2e9..27819f59e0 100644 --- a/node/grpc/server_test.go +++ b/node/grpc/server_test.go @@ -381,187 +381,6 @@ func TestStoreChunksRequestValidation(t *testing.T) { assert.True(t, strings.Contains(err.Error(), "adversary threshold equals 0")) } -func TestStoreBlobs(t *testing.T) { - server := newTestServer(t, true) - - reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33) - reqToCopy.BatchHeader = nil - req := &pb.StoreBlobsRequest{ - Blobs: reqToCopy.Blobs, - ReferenceBlockNumber: 1, - } - reply, err := server.StoreBlobs(context.Background(), req) - assert.NoError(t, err) - assert.NotNil(t, reply.GetSignatures()) - assert.Len(t, reply.GetSignatures(), len(blobHeaders)) - for i, sig := range reply.GetSignatures() { - assert.NotNil(t, sig) - assert.NotNil(t, sig.Value) - batchHeader := &core.BatchHeader{ - ReferenceBlockNumber: 1, - BatchRoot: [32]byte{}, - } - _, err := batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[i]}) - assert.NoError(t, err) - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - assert.NoError(t, err) - point, err := new(core.Signature).Deserialize(sig.Value) - assert.NoError(t, err) - s := &core.Signature{G1Point: point} - ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash) - assert.True(t, ok) - } -} - -func TestMinibatchDispersalAndRetrieval(t *testing.T) { - server := newTestServer(t, true) - - reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33) - reqToCopy.BatchHeader = nil - req := &pb.StoreBlobsRequest{ - Blobs: reqToCopy.Blobs, - ReferenceBlockNumber: 1, - } - ctx := context.Background() - reply, err := server.StoreBlobs(ctx, req) - assert.NoError(t, err) - assert.NotNil(t, reply.GetSignatures()) - - assert.Len(t, blobHeaders, 2) - bhh0, err := blobHeaders[0].GetBlobHeaderHash() - assert.NoError(t, err) - bhh1, err := blobHeaders[1].GetBlobHeaderHash() - assert.NoError(t, err) - batchHeader := &core.BatchHeader{ - ReferenceBlockNumber: 1, - BatchRoot: [32]byte{}, - } - _, err = batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[0], blobHeaders[1]}) - assert.NoError(t, err) - attestReq := &pb.AttestBatchRequest{ - BatchHeader: &pb.BatchHeader{ - BatchRoot: batchHeader.BatchRoot[:], - ReferenceBlockNumber: 1, - }, - BlobHeaderHashes: [][]byte{bhh0[:], bhh1[:]}, - } - attestReply, err := server.AttestBatch(ctx, attestReq) - assert.NotNil(t, reply) - assert.NoError(t, err) - sig := attestReply.GetSignature() - assert.NotNil(t, sig) - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - assert.NoError(t, err) - point, err := new(core.Signature).Deserialize(sig) - assert.NoError(t, err) - s := &core.Signature{G1Point: point} - ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash) - assert.True(t, ok) - - // Get blob headers - blobHeaderReply, err := server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 0, - QuorumId: 0, - }) - assert.NoError(t, err) - assert.NotNil(t, blobHeaderReply) - blobHeader, err := node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader()) - assert.NoError(t, err) - assert.Equal(t, blobHeader, blobHeaders[0]) - proof := &merkletree.Proof{ - Hashes: blobHeaderReply.GetProof().GetHashes(), - Index: uint64(blobHeaderReply.GetProof().GetIndex()), - } - ok, err = merkletree.VerifyProofUsing(bhh0[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New()) - assert.NoError(t, err) - assert.True(t, ok) - - blobHeaderReply, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 1, - QuorumId: 0, - }) - assert.NoError(t, err) - assert.NotNil(t, blobHeaderReply) - blobHeader, err = node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader()) - assert.NoError(t, err) - assert.Equal(t, blobHeader, blobHeaders[1]) - proof = &merkletree.Proof{ - Hashes: blobHeaderReply.GetProof().GetHashes(), - Index: uint64(blobHeaderReply.GetProof().GetIndex()), - } - ok, err = merkletree.VerifyProofUsing(bhh1[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New()) - assert.NoError(t, err) - assert.True(t, ok) - - // non-existent blob index - _, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 2, - QuorumId: 0, - }) - assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "commit not found in db")) - - // Test GetChunks - p := &peer.Peer{ - Addr: &net.TCPAddr{ - IP: net.ParseIP("0.0.0.0"), - Port: 3000, - }, - } - ctx = peer.NewContext(context.Background(), p) - retrieveChunksReply, err := server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 0, - QuorumId: 0, - }) - assert.NoError(t, err) - assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat) - assert.Len(t, retrieveChunksReply.GetChunks(), 1) - recovered, err := new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0]) - assert.NoError(t, err) - chunk, err := new(encoding.Frame).Deserialize(encodedChunk) - assert.NoError(t, err) - assert.Equal(t, recovered, chunk) - - retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 1, - QuorumId: 0, - }) - assert.NoError(t, err) - assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat) - assert.Len(t, retrieveChunksReply.GetChunks(), 1) - recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0]) - assert.NoError(t, err) - chunk, err = new(encoding.Frame).Deserialize(encodedChunk) - assert.NoError(t, err) - assert.Equal(t, recovered, chunk) - - retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 0, - QuorumId: 0, - }) - assert.NoError(t, err) - assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat) - assert.Len(t, retrieveChunksReply.GetChunks(), 1) - recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0]) - assert.NoError(t, err) - chunk, err = new(encoding.Frame).Deserialize(encodedChunk) - assert.NoError(t, err) - assert.Equal(t, recovered, chunk) - - _, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{ - BatchHeaderHash: batchHeaderHash[:], - BlobIndex: 1, - QuorumId: 1, - }) - assert.ErrorContains(t, err, "quorum ID 1 not found in blob header") -} - func TestRetrieveChunks(t *testing.T) { server := newTestServer(t, true) batchHeaderHash, _, _, _ := storeChunks(t, server, false) diff --git a/node/node.go b/node/node.go index 8da4030e51..a45097f246 100644 --- a/node/node.go +++ b/node/node.go @@ -392,126 +392,6 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs return sig, nil } -// ProcessBlobs validates the blobs are correct, stores data into the node's Store, and then returns a signature for each blob. -// This method is similar to ProcessBatch method except that it doesn't require a batch. -// -// Notes: -// - If the blob is stored already, it's no-op to store it more than once -// - If the blob is stored, but the processing fails after that, these data items will not be rollback -// - These data items will be garbage collected eventually when they become stale. -func (n *Node) ProcessBlobs(ctx context.Context, blobs []*core.BlobMessage, rawBlobs []*node.Blob) ([]*core.Signature, error) { - start := time.Now() - log := n.Logger - - if len(blobs) == 0 { - return nil, errors.New("number of blobs must be greater than zero") - } - - if len(blobs) != len(rawBlobs) { - return nil, errors.New("number of parsed blobs must be the same as number of blobs from protobuf request") - } - - // Measure num batches received and its size in bytes - batchSize := uint64(0) - for _, blob := range blobs { - for quorumID, bundle := range blob.Bundles { - n.Metrics.AcceptBlobs(quorumID, bundle.Size()) - } - batchSize += blob.Bundles.Size() - } - n.Metrics.AcceptBatches("received", batchSize) - - log.Debug("Start processing blobs", "batchSizeBytes", batchSize, "numBlobs", len(blobs)) - - // Store the blobs - // Run this in a goroutine so we can parallelize the blob storing and blob verifaction work. - // This should be able to improve latency without needing more CPUs, because blob - // storing is an IO operation. - type storeResult struct { - // Whether StoreBatch failed. - err error - - // The keys that are stored to database for a single batch. - // Defined only if the batch not already exists and gets stored to database successfully. - keys *[][]byte - - // Latency to store the batch. - // Defined only if the batch not already exists and gets stored to database successfully. - latency time.Duration - } - storeChan := make(chan storeResult) - go func(n *Node) { - start := time.Now() - keys, err := n.Store.StoreBlobs(ctx, blobs, rawBlobs) - if err != nil { - // If batch already exists, we don't store it again, but we should not - // error out in such case. - if errors.Is(err, ErrBatchAlreadyExist) { - storeChan <- storeResult{err: nil, keys: nil, latency: 0} - } else { - storeChan <- storeResult{err: fmt.Errorf("failed to store batch: %w", err), keys: nil, latency: 0} - } - return - } - storeChan <- storeResult{err: nil, keys: keys, latency: time.Since(start)} - }(n) - - // Validate batch - // Assumes that all blobs have been validated to have the same reference block number. - stageTimer := time.Now() - referenceBlockNumber := uint(0) - for _, blob := range rawBlobs { - blobRefBlock := blob.GetHeader().GetReferenceBlockNumber() - if referenceBlockNumber == 0 && blobRefBlock != 0 { - referenceBlockNumber = uint(blobRefBlock) - break - } - } - if referenceBlockNumber == 0 { - return nil, errors.New("reference block number is not set") - } - - err := n.ValidateBlobs(ctx, blobs, referenceBlockNumber) - if err != nil { - // If we have already stored the batch into database, but it's not valid, we - // revert all the keys for that batch. - result := <-storeChan - if result.keys != nil { - log.Debug("Batch validation failed, rolling back the key/value entries stored in database", "number of entires", len(*result.keys), "referenceBlockNumber", referenceBlockNumber) - if deleteKeysErr := n.Store.DeleteKeys(ctx, result.keys); deleteKeysErr != nil { - log.Error("Failed to delete the invalid batch that should be rolled back", "err", deleteKeysErr) - } - } - return nil, err - } - n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer)) - log.Debug("Validate blobs took", "duration:", time.Since(stageTimer)) - - // Before we sign the blobs, we should first complete the batch storing successfully. - result := <-storeChan - if result.err != nil { - return nil, fmt.Errorf("failed to store blobs: %w", result.err) - } - if result.keys != nil { - n.Metrics.RecordStoreChunksStage("stored", batchSize, result.latency) - n.Logger.Debug("StoreBlobs succeeded", "duration:", result.latency) - } else { - n.Logger.Warn("StoreBlobs skipped because the batch already exists in the store") - } - - // Sign all blobs if all validation checks pass and data items are written to database. - stageTimer = time.Now() - signatures, err := n.SignBlobs(blobs, referenceBlockNumber) - if err != nil { - return nil, fmt.Errorf("failed to sign blobs: %w", err) - } - n.Metrics.RecordStoreChunksStage("signed", batchSize, time.Since(stageTimer)) - log.Debug("SignBlobs succeeded", "pubkey", hexutil.Encode(n.KeyPair.GetPubKeyG2().Serialize()), "duration", time.Since(stageTimer)) - - log.Debug("Exiting ProcessBlobs", "duration", time.Since(start)) - return signatures, nil -} - func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blobs []*core.BlobMessage) error { start := time.Now() operatorState, err := n.ChainState.GetOperatorStateByOperator(ctx, header.ReferenceBlockNumber, n.Config.ID) @@ -538,61 +418,6 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob return nil } -// ValidateBlobs validates the blob commitments are correct -func (n *Node) ValidateBlobs(ctx context.Context, blobs []*core.BlobMessage, referenceBlockNumber uint) error { - start := time.Now() - operatorState, err := n.ChainState.GetOperatorStateByOperator(ctx, referenceBlockNumber, n.Config.ID) - if err != nil { - return err - } - getStateDuration := time.Since(start) - - pool := workerpool.New(n.Config.NumBatchValidators) - err = n.Validator.ValidateBlobs(blobs, operatorState, pool) - if err != nil { - h, hashErr := operatorState.Hash() - if hashErr != nil { - n.Logger.Error("failed to get operator state hash", "err", hashErr) - } - - hStr := make([]string, 0, len(h)) - for q, hash := range h { - hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) - } - return fmt.Errorf("failed to validate batch with operator state %x: %w", strings.Join(hStr, ","), err) - } - n.Logger.Debug("ValidateBlob completed", "get operator state duration", getStateDuration, "total duration", time.Since(start)) - return nil -} - -func (n *Node) SignBlobs(blobs []*core.BlobMessage, referenceBlockNumber uint) ([]*core.Signature, error) { - start := time.Now() - signatures := make([]*core.Signature, len(blobs)) - for i, blob := range blobs { - if blob == nil || blob.BlobHeader == nil { - signatures[i] = nil - continue - } - batchHeader := &core.BatchHeader{ - ReferenceBlockNumber: referenceBlockNumber, - BatchRoot: [32]byte{}, - } - _, err := batchHeader.SetBatchRoot([]*core.BlobHeader{blob.BlobHeader}) - if err != nil { - return nil, fmt.Errorf("failed to set batch root: %w", err) - } - batchHeaderHash, err := batchHeader.GetBatchHeaderHash() - if err != nil { - return nil, fmt.Errorf("failed to get batch header hash: %w", err) - } - sig := n.KeyPair.SignMessage(batchHeaderHash) - signatures[i] = sig - } - - n.Logger.Debug("SignBlobs completed", "duration", time.Since(start)) - return signatures, nil -} - // ValidateBlobHeadersRoot validates the blob headers root hash // by comparing it with the merkle tree root hash of the blob headers. // It also checks if all blob headers have the same reference block number