Skip to content

Commit

Permalink
[v2] disperser client payments api (#928)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Dec 6, 2024
1 parent 9f68f74 commit 100eb3e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 58 deletions.
80 changes: 63 additions & 17 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ import (
"sync"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)

var requiredQuorums = []uint8{0, 1}

type Accountant interface {
AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error)
}

var _ Accountant = &accountant{}

type accountant struct {
type Accountant struct {
// on-chain states
accountID string
reservation *core.ActiveReservation
Expand All @@ -45,15 +39,15 @@ type BinRecord struct {
Usage uint64
}

func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *accountant {
func NewAccountant(accountID string, reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, numBins uint32) *Accountant {
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
// There's a subsequent PR that handles populating the accountant with on-chain state from the disperser
binRecords := make([]BinRecord, numBins)
for i := range binRecords {
binRecords[i] = BinRecord{Index: uint32(i), Usage: 0}
}
a := accountant{
a := Accountant{
accountID: accountID,
reservation: reservation,
onDemand: onDemand,
Expand All @@ -73,7 +67,7 @@ func NewAccountant(accountID string, reservation *core.ActiveReservation, onDema
// then on-demand if the reservation is not available. The returned values are
// bin index for reservation payments and cumulative payment for on-demand payments,
// and both fields are used to create the payment header and signature
func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) {
now := time.Now().Unix()
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)

Expand Down Expand Up @@ -116,7 +110,7 @@ func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quo
}

// AccountBlob accountant provides and records payment information
func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, error) {
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*core.PaymentMetadata, error) {
binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
if err != nil {
return nil, err
Expand All @@ -127,28 +121,27 @@ func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums
BinIndex: binIndex,
CumulativePayment: cumulativePayment,
}
protoPaymentHeader := pm.ConvertToProtoPaymentHeader()

return protoPaymentHeader, nil
return pm, nil
}

// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored
// PaymentCharged returns the chargeable price for a given data length
func (a *accountant) PaymentCharged(numSymbols uint) uint64 {
func (a *Accountant) PaymentCharged(numSymbols uint) uint64 {
return uint64(a.SymbolsCharged(numSymbols)) * uint64(a.pricePerSymbol)
}

// SymbolsCharged returns the number of symbols charged for a given data length
// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols.
func (a *accountant) SymbolsCharged(numSymbols uint) uint32 {
func (a *Accountant) SymbolsCharged(numSymbols uint) uint32 {
if numSymbols <= uint(a.minNumSymbols) {
return a.minNumSymbols
}
// Round up to the nearest multiple of MinNumSymbols
return uint32(core.RoundUpDivide(uint(numSymbols), uint(a.minNumSymbols))) * a.minNumSymbols
}

func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
func (a *Accountant) GetRelativeBinRecord(index uint32) *BinRecord {
relativeIndex := index % a.numBins
if a.binRecords[relativeIndex].Index != uint32(index) {
a.binRecords[relativeIndex] = BinRecord{
Expand All @@ -160,6 +153,59 @@ func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord {
return &a.binRecords[relativeIndex]
}

func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) error {
if paymentState == nil {
return fmt.Errorf("payment state cannot be nil")
} else if paymentState.GetPaymentGlobalParams() == nil {
return fmt.Errorf("payment global params cannot be nil")
} else if paymentState.GetOnchainCumulativePayment() == nil {
return fmt.Errorf("onchain cumulative payment cannot be nil")
} else if paymentState.GetCumulativePayment() == nil {
return fmt.Errorf("cumulative payment cannot be nil")
} else if paymentState.GetReservation() == nil {
return fmt.Errorf("reservation cannot be nil")
} else if paymentState.GetReservation().GetQuorumNumbers() == nil {
return fmt.Errorf("reservation quorum numbers cannot be nil")
} else if paymentState.GetReservation().GetQuorumSplit() == nil {
return fmt.Errorf("reservation quorum split cannot be nil")
} else if paymentState.GetBinRecords() == nil {
return fmt.Errorf("bin records cannot be nil")
}

a.minNumSymbols = uint32(paymentState.PaymentGlobalParams.MinNumSymbols)
a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.OnchainCumulativePayment)
a.cumulativePayment = new(big.Int).SetBytes(paymentState.CumulativePayment)
a.pricePerSymbol = uint32(paymentState.PaymentGlobalParams.PricePerSymbol)

a.reservation.SymbolsPerSec = uint64(paymentState.PaymentGlobalParams.GlobalSymbolsPerSecond)
a.reservation.StartTimestamp = uint64(paymentState.Reservation.StartTimestamp)
a.reservation.EndTimestamp = uint64(paymentState.Reservation.EndTimestamp)
a.reservationWindow = uint32(paymentState.PaymentGlobalParams.ReservationWindow)

quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
for i, quorum := range paymentState.Reservation.QuorumNumbers {
quorumNumbers[i] = uint8(quorum)
}
a.reservation.QuorumNumbers = quorumNumbers

quorumSplit := make([]uint8, len(paymentState.Reservation.QuorumSplit))
for i, quorum := range paymentState.Reservation.QuorumSplit {
quorumSplit[i] = uint8(quorum)
}
a.reservation.QuorumSplit = quorumSplit

binRecords := make([]BinRecord, len(paymentState.BinRecords))
for i, record := range paymentState.BinRecords {
binRecords[i] = BinRecord{
Index: record.Index,
Usage: record.Usage,
}
}
a.binRecords = binRecords

return nil
}

// QuorumCheck eagerly returns error if the check finds a quorum number not an element of the allowed quorum numbers
func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error {
if len(quorumNumbers) == 0 {
Expand Down
33 changes: 11 additions & 22 deletions api/clients/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,27 @@ func TestAccountBlob_Reservation(t *testing.T) {
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, symbolLength, quorums)
metadata := core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

symbolLength = uint64(700)

header, err = accountant.AccountBlob(ctx, symbolLength, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.NotEqual(t, 0, header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true)

// Second call should use on-demand payment
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)

assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(300), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(300), header.CumulativePayment)
}

func TestAccountBlob_OnDemand(t *testing.T) {
Expand Down Expand Up @@ -124,10 +121,9 @@ func TestAccountBlob_OnDemand(t *testing.T) {
header, err := accountant.AccountBlob(ctx, numSymbols, quorums)
assert.NoError(t, err)

metadata := core.ConvertPaymentHeader(header)
expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol)))
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
assert.Equal(t, expectedPayment, header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
}
Expand Down Expand Up @@ -180,24 +176,21 @@ func TestAccountBlobCallSeries(t *testing.T) {

// First call: Use reservation
header, err := accountant.AccountBlob(ctx, 800, quorums)
metadata := core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Second call: Use remaining reservation + overflow
header, err = accountant.AccountBlob(ctx, 300, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Third call: Use on-demand
header, err = accountant.AccountBlob(ctx, 500, quorums)
metadata = core.ConvertPaymentHeader(header)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Fourth call: Insufficient on-demand
_, err = accountant.AccountBlob(ctx, 600, quorums)
Expand Down Expand Up @@ -321,23 +314,20 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true)

// Second call: Allow one overflow
header, err = accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(0), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)

// Third call: Should use on-demand payment
header, err = accountant.AccountBlob(ctx, 200, quorums)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
metadata = core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(200), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true)
}

Expand Down Expand Up @@ -373,8 +363,7 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
header, err := accountant.AccountBlob(ctx, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true)
metadata := core.ConvertPaymentHeader(header)
assert.Equal(t, big.NewInt(500), metadata.CumulativePayment)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Wait for next reservation duration
time.Sleep(time.Duration(reservationWindow) * time.Second)
Expand Down
76 changes: 57 additions & 19 deletions api/clients/disperser_client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"fmt"
"math/big"
"sync"

"github.com/Layr-Labs/eigenda/api"
Expand All @@ -30,12 +29,13 @@ type DisperserClientV2 interface {
}

type disperserClientV2 struct {
config *DisperserClientV2Config
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
config *DisperserClientV2Config
signer corev2.BlobRequestSigner
initOnce sync.Once
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
prover encoding.Prover
accountant *Accountant
}

var _ DisperserClientV2 = &disperserClientV2{}
Expand All @@ -60,7 +60,7 @@ var _ DisperserClientV2 = &disperserClientV2{}
//
// // Subsequent calls will use the existing connection
// status2, blobKey2, err := client.DisperseBlob(ctx, data, blobHeader)
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover) (*disperserClientV2, error) {
func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobRequestSigner, prover encoding.Prover, accountant *Accountant) (*disperserClientV2, error) {
if config == nil {
return nil, api.NewErrorInvalidArg("config must be provided")
}
Expand All @@ -75,13 +75,28 @@ func NewDisperserClientV2(config *DisperserClientV2Config, signer corev2.BlobReq
}

return &disperserClientV2{
config: config,
signer: signer,
prover: prover,
config: config,
signer: signer,
prover: prover,
accountant: accountant,
// conn and client are initialized lazily
}, nil
}

// PopulateAccountant populates the accountant with the payment state from the disperser.
func (c *disperserClientV2) PopulateAccountant(ctx context.Context) error {
paymentState, err := c.GetPaymentState(ctx)
if err != nil {
return fmt.Errorf("error getting payment state for initializing accountant: %w", err)
}

err = c.accountant.SetPaymentState(paymentState)
if err != nil {
return fmt.Errorf("error setting payment state for accountant: %w", err)
}
return nil
}

// Close closes the grpc connection to the disperser server.
// It is thread safe and can be called multiple times.
func (c *disperserClientV2) Close() error {
Expand All @@ -108,16 +123,15 @@ func (c *disperserClientV2) DisperseBlob(
if c.signer == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized signer for authenticated dispersal")
}
if c.accountant == nil {
return nil, [32]byte{}, api.NewErrorInternal("uninitialized accountant for paid dispersal; make sure to call PopulateAccountant after creating the client")
}

var payment core.PaymentMetadata
accountId, err := c.signer.GetAccountID()
symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint64(symbolLength), quorums)
if err != nil {
return nil, [32]byte{}, api.NewErrorInvalidArg(fmt.Sprintf("please configure signer key if you want to use authenticated endpoint %v", err))
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
payment.AccountID = accountId
// TODO: add payment metadata
payment.BinIndex = 0
payment.CumulativePayment = big.NewInt(0)

if len(quorums) == 0 {
return nil, [32]byte{}, api.NewErrorInvalidArg("quorum numbers must be provided")
Expand Down Expand Up @@ -160,7 +174,7 @@ func (c *disperserClientV2) DisperseBlob(
BlobVersion: blobVersion,
BlobCommitments: blobCommitments,
QuorumNumbers: quorums,
PaymentMetadata: payment,
PaymentMetadata: *payment,
}
sig, err := c.signer.SignBlobRequest(blobHeader)
if err != nil {
Expand Down Expand Up @@ -202,6 +216,30 @@ func (c *disperserClientV2) GetBlobStatus(ctx context.Context, blobKey corev2.Bl
return c.client.GetBlobStatus(ctx, request)
}

// GetPaymentState returns the payment state of the disperser client
func (c *disperserClientV2) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
err := c.initOnceGrpcConnection()
if err != nil {
return nil, api.NewErrorInternal(err.Error())
}

accountID, err := c.signer.GetAccountID()
if err != nil {
return nil, fmt.Errorf("error getting signer's account ID: %w", err)
}

signature, err := c.signer.SignPaymentStateRequest()
if err != nil {
return nil, fmt.Errorf("error signing payment state request: %w", err)
}

request := &disperser_rpc.GetPaymentStateRequest{
AccountId: accountID,
Signature: signature,
}
return c.client.GetPaymentState(ctx, request)
}

// GetBlobCommitment is a utility method that calculates commitment for a blob payload.
// While the blob commitment can be calculated by anyone, it requires SRS points to
// be loaded. For service that does not have access to SRS points, this method can be
Expand Down
Loading

0 comments on commit 100eb3e

Please sign in to comment.