Skip to content

Commit

Permalink
fix: rm v1 payment api, add paymentState to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Nov 15, 2024
1 parent 89cc3d0 commit 4cf7d2c
Show file tree
Hide file tree
Showing 11 changed files with 883 additions and 1,162 deletions.
12 changes: 6 additions & 6 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
disperser_v2_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)
Expand All @@ -19,8 +19,8 @@ var requiredQuorums = []uint8{0, 1}

type Accountant interface {
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, []byte, error)
AuthenticatePaymentStateRequest() (*disperser_rpc.GetPaymentStateRequest, error)
SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply)
AuthenticatePaymentStateRequest() (*disperser_v2_rpc.GetPaymentStateRequest, error)
SetPaymentState(paymentState *disperser_v2_rpc.GetPaymentStateReply)
}

var _ Accountant = &accountant{}
Expand Down Expand Up @@ -182,7 +182,7 @@ func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error {
return nil
}

func (a *accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) {
func (a *accountant) SetPaymentState(paymentState *disperser_v2_rpc.GetPaymentStateReply) {
quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
for i, quorum := range paymentState.Reservation.QuorumNumbers {
quorumNumbers[i] = uint8(quorum)
Expand Down Expand Up @@ -213,15 +213,15 @@ func (a *accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentState
a.reservation.EndTimestamp = uint64(paymentState.Reservation.EndTimestamp)
}

func (a *accountant) AuthenticatePaymentStateRequest() (*disperser_rpc.GetPaymentStateRequest, error) {
func (a *accountant) AuthenticatePaymentStateRequest() (*disperser_v2_rpc.GetPaymentStateRequest, error) {
accountID := a.paymentSigner.GetAccountID()

signature, err := a.paymentSigner.SignAccountID(accountID)
if err != nil {
return nil, err
}

request := &disperser_rpc.GetPaymentStateRequest{
request := &disperser_v2_rpc.GetPaymentStateRequest{
AccountId: accountID,
Signature: signature,
}
Expand Down
104 changes: 0 additions & 104 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -55,10 +54,8 @@ type DisperserClient interface {
// DisperseBlobAuthenticated disperses a blob with an authenticated request.
// The BlobStatus returned will always be PROCESSSING if error is nil.
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
InitializePaymentState(ctx context.Context) error
}

// See the NewDisperserClient constructor's documentation for details and usage examples.
Expand Down Expand Up @@ -190,59 +187,6 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums
return blobStatus, reply.GetRequestId(), nil
}

// DispersePaidBlob disperses a blob with a payment header and signature. Similar to DisperseBlob but with signed payment header.
func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
if c.accountant == nil {
return nil, nil, api.NewErrorInternal("not implemented")
}

err := c.initOnceGrpcConnection()
if err != nil {
return nil, nil, fmt.Errorf("error initializing connection: %w", err)
}

ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout)
defer cancel()

quorumNumbers := make([]uint32, len(quorums))
for i, q := range quorums {
quorumNumbers[i] = uint32(q)
}

// check every 32 bytes of data are within the valid range for a bn254 field element
_, err = rs.ToFrArray(data)
if err != nil {
return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err)
}

header, signature, err := c.accountant.AccountBlob(ctx, uint64(encoding.GetBlobLength(uint(len(data)))), quorums)
if header == nil {
return nil, nil, errors.New("accountant returned nil pointer to header")
}
if err != nil {
return nil, nil, err
}

request := &disperser_rpc.DispersePaidBlobRequest{
Data: data,
QuorumNumbers: quorumNumbers,
PaymentHeader: header,
PaymentSignature: signature,
}

reply, err := c.client.DispersePaidBlob(ctxTimeout, request)
if err != nil {
return nil, nil, err
}

blobStatus, err := disperser.FromBlobStatusProto(reply.GetResult())
if err != nil {
return nil, nil, err
}

return blobStatus, reply.GetRequestId(), nil
}

func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
err := c.initOnceGrpcConnection()
if err != nil {
Expand Down Expand Up @@ -384,54 +328,6 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by
return reply.Data, nil
}

func (c *disperserClient) getPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, fmt.Errorf("error initializing connection: %w", err)
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()

request, err := c.accountant.AuthenticatePaymentStateRequest()
if err != nil {
return nil, err
}
reply, err := c.client.GetPaymentState(ctxTimeout, request)
if err != nil {
return nil, err
}

return reply, nil
}

func (c *disperserClient) InitializePaymentState(ctx context.Context) error {
paymentState, err := c.getPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state from disperser: %w", err)
}
c.accountant.SetPaymentState(paymentState)
// c.accountant.binUsages = []uint64{uint64(paymentState.CurrentBinUsage), uint64(paymentState.NextBinUsage), uint64(paymentState.OverflowBinUsage)}
// c.accountant.cumulativePayment = new(big.Int).SetBytes(paymentState.CumulativePayment)
// quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
// for i, q := range paymentState.Reservation.QuorumNumbers {
// quorumNumbers[i] = uint8(q)
// }
// c.accountant.reservation = core.ActiveReservation{
// StartTimestamp: uint64(paymentState.Reservation.StartTimestamp),
// EndTimestamp: uint64(paymentState.Reservation.EndTimestamp),
// SymbolsPerSec: paymentState.Reservation.SymbolsPerSecond,
// QuorumNumbers: quorumNumbers,
// }
// c.accountant.onDemand = core.OnDemandPayment{
// CumulativePayment: new(big.Int).SetBytes(paymentState.OnChainCumulativePayment),
// }
// c.accountant.reservationWindow = paymentState.PaymentGlobalParams.ReservationWindow
// c.accountant.pricePerSymbol = paymentState.PaymentGlobalParams.PricePerSymbol
// c.accountant.minNumSymbols = paymentState.PaymentGlobalParams.MinNumSymbols
return nil
}

// initOnceGrpcConnection initializes the grpc connection and client if they are not already initialized.
// If initialization fails, it caches the error and will return it on every subsequent call.
func (c *disperserClient) initOnceGrpcConnection() error {
Expand Down
9 changes: 5 additions & 4 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
if err != nil {
return nil, fmt.Errorf("new disperser-client: %w", err)
}
err = disperserClient.InitializePaymentState(context.Background())
if err != nil {
return nil, fmt.Errorf("error setting payment state: %w", err)
}
// TODO: uncomment this when we are using disperser client v2
// err = disperserClient.InitializePaymentState(context.Background())
// if err != nil {
// return nil, fmt.Errorf("error setting payment state: %w", err)
// }

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
Expand Down
Loading

0 comments on commit 4cf7d2c

Please sign in to comment.