diff --git a/api/clients/accountant.go b/api/clients/accountant.go new file mode 100644 index 0000000000..122ec3eeae --- /dev/null +++ b/api/clients/accountant.go @@ -0,0 +1,180 @@ +package clients + +import ( + "context" + "fmt" + "math/big" + "slices" + "sync" + "time" + + commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/meterer" +) + +var minNumBins uint32 = 3 +var requiredQuorums = []uint8{0, 1} + +type Accountant interface { + AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, []byte, error) +} + +var _ Accountant = &accountant{} + +type accountant struct { + // on-chain states + reservation *core.ActiveReservation + onDemand *core.OnDemandPayment + reservationWindow uint32 + pricePerSymbol uint32 + minNumSymbols uint32 + + // local accounting + // contains 3 bins; circular wrapping of indices + binRecords []BinRecord + usageLock sync.Mutex + cumulativePayment *big.Int + + paymentSigner core.PaymentSigner + numBins uint32 +} + +type BinRecord struct { + Index uint32 + Usage uint64 +} + +func NewAccountant(reservation *core.ActiveReservation, onDemand *core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, paymentSigner core.PaymentSigner, numBins uint32) *accountant { + //TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense + // Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic + // There's a subsequent PR that handles populating the accountant with on-chain state from the disperser + binRecords := make([]BinRecord, numBins) + for i := range binRecords { + binRecords[i] = BinRecord{Index: uint32(i), Usage: 0} + } + a := accountant{ + reservation: reservation, + onDemand: onDemand, + reservationWindow: reservationWindow, + pricePerSymbol: pricePerSymbol, + minNumSymbols: minNumSymbols, + binRecords: binRecords, + cumulativePayment: big.NewInt(0), + paymentSigner: paymentSigner, + numBins: max(numBins, minNumBins), + } + // TODO: add a routine to refresh the on-chain state occasionally? + return &a +} + +// BlobPaymentInfo calculates and records payment information. The accountant +// will attempt to use the active reservation first and check for quorum settings, +// then on-demand if the reservation is not available. The returned values are +// bin index for reservation payments and cumulative payment for on-demand payments, +// and both fields are used to create the payment header and signature +func (a *accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint64, quorumNumbers []uint8) (uint32, *big.Int, error) { + now := time.Now().Unix() + currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow) + + a.usageLock.Lock() + defer a.usageLock.Unlock() + relativeBinRecord := a.GetRelativeBinRecord(currentBinIndex) + relativeBinRecord.Usage += numSymbols + + // first attempt to use the active reservation + binLimit := a.reservation.SymbolsPerSec * uint64(a.reservationWindow) + if relativeBinRecord.Usage <= binLimit { + if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil { + return 0, big.NewInt(0), err + } + return currentBinIndex, big.NewInt(0), nil + } + + overflowBinRecord := a.GetRelativeBinRecord(currentBinIndex + 2) + // Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit + if overflowBinRecord.Usage == 0 && relativeBinRecord.Usage-numSymbols < binLimit && numSymbols <= binLimit { + overflowBinRecord.Usage += relativeBinRecord.Usage - binLimit + if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil { + return 0, big.NewInt(0), err + } + return currentBinIndex, big.NewInt(0), nil + } + + // reservation not available, attempt on-demand + //todo: rollback later if disperser respond with some type of rejection? + relativeBinRecord.Usage -= numSymbols + incrementRequired := big.NewInt(int64(a.PaymentCharged(uint(numSymbols)))) + a.cumulativePayment.Add(a.cumulativePayment, incrementRequired) + if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 { + if err := QuorumCheck(quorumNumbers, requiredQuorums); err != nil { + return 0, big.NewInt(0), err + } + return 0, a.cumulativePayment, nil + } + return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available") +} + +// AccountBlob accountant provides and records payment information +func (a *accountant) AccountBlob(ctx context.Context, numSymbols uint64, quorums []uint8) (*commonpb.PaymentHeader, []byte, error) { + binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums) + if err != nil { + return nil, nil, err + } + + accountID := a.paymentSigner.GetAccountID() + pm := &core.PaymentMetadata{ + AccountID: accountID, + BinIndex: binIndex, + CumulativePayment: cumulativePayment, + } + protoPaymentHeader := pm.ConvertToProtoPaymentHeader() + + signature, err := a.paymentSigner.SignBlobPayment(pm) + if err != nil { + return nil, nil, err + } + + return protoPaymentHeader, signature, nil +} + +// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored +// PaymentCharged returns the chargeable price for a given data length +func (a *accountant) PaymentCharged(numSymbols uint) uint64 { + return uint64(a.SymbolsCharged(numSymbols)) * uint64(a.pricePerSymbol) +} + +// SymbolsCharged returns the number of symbols charged for a given data length +// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols. +func (a *accountant) SymbolsCharged(numSymbols uint) uint32 { + if numSymbols <= uint(a.minNumSymbols) { + return a.minNumSymbols + } + // Round up to the nearest multiple of MinNumSymbols + return uint32(core.RoundUpDivide(uint(numSymbols), uint(a.minNumSymbols))) * a.minNumSymbols +} + +func (a *accountant) GetRelativeBinRecord(index uint32) *BinRecord { + relativeIndex := index % a.numBins + if a.binRecords[relativeIndex].Index != uint32(index) { + a.binRecords[relativeIndex] = BinRecord{ + Index: uint32(index), + Usage: 0, + } + } + + return &a.binRecords[relativeIndex] +} + +// QuorumCheck eagerly returns error if the check finds a quorum number not an element of the allowed quorum numbers +func QuorumCheck(quorumNumbers []uint8, allowedNumbers []uint8) error { + if len(quorumNumbers) == 0 { + return fmt.Errorf("no quorum numbers provided") + } + for _, quorum := range quorumNumbers { + if !slices.Contains(allowedNumbers, quorum) { + return fmt.Errorf("provided quorum number %v not allowed", quorum) + } + } + return nil +} diff --git a/api/clients/accountant_test.go b/api/clients/accountant_test.go new file mode 100644 index 0000000000..d40f75550f --- /dev/null +++ b/api/clients/accountant_test.go @@ -0,0 +1,478 @@ +package clients + +import ( + "context" + "encoding/hex" + "math/big" + "sync" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/assert" +) + +const numBins = uint32(3) + +func TestNewAccountant(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 100, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(6) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + assert.NotNil(t, accountant) + assert.Equal(t, reservation, accountant.reservation) + assert.Equal(t, onDemand, accountant.onDemand) + assert.Equal(t, reservationWindow, accountant.reservationWindow) + assert.Equal(t, pricePerSymbol, accountant.pricePerSymbol) + assert.Equal(t, minNumSymbols, accountant.minNumSymbols) + assert.Equal(t, []BinRecord{{Index: 0, Usage: 0}, {Index: 1, Usage: 0}, {Index: 2, Usage: 0}}, accountant.binRecords) + assert.Equal(t, big.NewInt(0), accountant.cumulativePayment) +} + +func TestAccountBlob_Reservation(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(5) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + symbolLength := uint64(500) + quorums := []uint8{0, 1} + + header, _, err := accountant.AccountBlob(ctx, symbolLength, quorums) + metadata := core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + + symbolLength = uint64(700) + + header, _, err = accountant.AccountBlob(ctx, symbolLength, quorums) + metadata = core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.NotEqual(t, 0, header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.binRecords)), true) + + // Second call should use on-demand payment + header, _, err = accountant.AccountBlob(ctx, 300, quorums) + metadata = core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, big.NewInt(300), metadata.CumulativePayment) +} + +func TestAccountBlob_OnDemand(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1500), + } + reservationWindow := uint32(5) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + numSymbols := uint64(1500) + quorums := []uint8{0, 1} + + header, _, err := accountant.AccountBlob(ctx, numSymbols, quorums) + assert.NoError(t, err) + + metadata := core.ConvertPaymentHeader(header) + expectedPayment := big.NewInt(int64(numSymbols * uint64(pricePerSymbol))) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, expectedPayment, metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + assert.Equal(t, expectedPayment, accountant.cumulativePayment) +} + +func TestAccountBlob_InsufficientOnDemand(t *testing.T) { + reservation := &core.ActiveReservation{} + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(60) + pricePerSymbol := uint32(100) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + numSymbols := uint64(2000) + quorums := []uint8{0, 1} + + _, _, err = accountant.AccountBlob(ctx, numSymbols, quorums) + assert.Contains(t, err.Error(), "neither reservation nor on-demand payment is available") +} + +func TestAccountBlobCallSeries(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(5) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + quorums := []uint8{0, 1} + now := time.Now().Unix() + + // First call: Use reservation + header, _, err := accountant.AccountBlob(ctx, 800, quorums) + metadata := core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + + // Second call: Use remaining reservation + overflow + header, _, err = accountant.AccountBlob(ctx, 300, quorums) + metadata = core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + + // Third call: Use on-demand + header, _, err = accountant.AccountBlob(ctx, 500, quorums) + metadata = core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, big.NewInt(500), metadata.CumulativePayment) + + // Fourth call: Insufficient on-demand + _, _, err = accountant.AccountBlob(ctx, 600, quorums) + assert.Error(t, err) + assert.Contains(t, err.Error(), "neither reservation nor on-demand payment is available") +} + +func TestAccountBlob_BinRotation(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + quorums := []uint8{0, 1} + + // First call + _, _, err = accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + + // next reservation duration + time.Sleep(1000 * time.Millisecond) + + // Second call + _, _, err = accountant.AccountBlob(ctx, 300, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{800, 300, 0}, mapRecordUsage(accountant.binRecords)), true) + + // Third call + _, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{800, 800, 0}, mapRecordUsage(accountant.binRecords)), true) +} + +func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + quorums := []uint8{0, 1} + + // Start concurrent AccountBlob calls + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // for j := 0; j < 5; j++ { + // fmt.Println("request ", i) + _, _, err := accountant.AccountBlob(ctx, 100, quorums) + assert.NoError(t, err) + time.Sleep(500 * time.Millisecond) + // } + }() + } + + // Wait for all goroutines to finish + wg.Wait() + + // Check final state + usages := mapRecordUsage(accountant.binRecords) + assert.Equal(t, uint64(1000), usages[0]+usages[1]+usages[2]) +} + +func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(5) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + ctx := context.Background() + quorums := []uint8{0, 1} + now := time.Now().Unix() + + // Okay reservation + header, _, err := accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + metadata := core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + + // Second call: Allow one overflow + header, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + metadata = core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true) + + // Third call: Should use on-demand payment + header, _, err = accountant.AccountBlob(ctx, 200, quorums) + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + metadata = core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(200), metadata.CumulativePayment) + assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.binRecords)), true) +} + +func TestAccountBlob_ReservationOverflowReset(t *testing.T) { + reservation := &core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := &core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner, err := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + assert.NoError(t, err) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner, numBins) + + ctx := context.Background() + quorums := []uint8{0, 1} + + // full reservation + _, _, err = accountant.AccountBlob(ctx, 1000, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + + // no overflow + header, _, err := accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.binRecords)), true) + metadata := core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(500), metadata.CumulativePayment) + + // Wait for next reservation duration + time.Sleep(time.Duration(reservationWindow) * time.Second) + + // Third call: Should use new bin and allow overflow again + _, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, isRotation([]uint64{1000, 500, 0}, mapRecordUsage(accountant.binRecords)), true) +} + +func TestQuorumCheck(t *testing.T) { + tests := []struct { + name string + quorumNumbers []uint8 + allowedNumbers []uint8 + expectError bool + errorMessage string + }{ + { + name: "valid quorum numbers", + quorumNumbers: []uint8{0, 1}, + allowedNumbers: []uint8{0, 1, 2}, + expectError: false, + }, + { + name: "empty quorum numbers", + quorumNumbers: []uint8{}, + allowedNumbers: []uint8{0, 1}, + expectError: true, + errorMessage: "no quorum numbers provided", + }, + { + name: "invalid quorum number", + quorumNumbers: []uint8{0, 2}, + allowedNumbers: []uint8{0, 1}, + expectError: true, + errorMessage: "provided quorum number 2 not allowed", + }, + { + name: "empty allowed numbers", + quorumNumbers: []uint8{0}, + allowedNumbers: []uint8{}, + expectError: true, + errorMessage: "provided quorum number 0 not allowed", + }, + { + name: "multiple invalid quorums", + quorumNumbers: []uint8{2, 3, 4}, + allowedNumbers: []uint8{0, 1}, + expectError: true, + errorMessage: "provided quorum number 2 not allowed", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := QuorumCheck(tt.quorumNumbers, tt.allowedNumbers) + if tt.expectError { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMessage) + } else { + assert.NoError(t, err) + } + }) + } +} + +func mapRecordUsage(records []BinRecord) []uint64 { + return []uint64{records[0].Usage, records[1].Usage, records[2].Usage} +} + +func isRotation(arrA, arrB []uint64) bool { + n := len(arrA) + if n != len(arrB) { + return false + } + + doubleArrA := append(arrA, arrA...) + // Check if arrB exists in doubleArrA as a subarray + for i := 0; i < n; i++ { + match := true + for j := 0; j < n; j++ { + if doubleArrA[i+j] != arrB[j] { + match = false + break + } + } + if match { + return true + } + } + return false +} diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index c2422ea00c..87101547aa 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/api" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" @@ -53,7 +54,6 @@ type DisperserClient interface { // DisperseBlobAuthenticated disperses a blob with an authenticated request. // The BlobStatus returned will always be PROCESSSING if error is nil. DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) - DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) } @@ -183,11 +183,6 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums return blobStatus, reply.GetRequestId(), nil } -// TODO: implemented in subsequent PR -func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - return nil, nil, api.NewErrorInternal("not implemented") -} - func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { err := c.initOnceGrpcConnection() if err != nil { diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 81d7716807..3231f29541 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -34,7 +34,8 @@ type IEigenDAClient interface { // See the NewEigenDAClient constructor's documentation for details and usage examples. // TODO: Refactor this struct and interface above to use same naming convention as disperser client. -// Also need to make the fields private and use the constructor in the tests. +// +// Also need to make the fields private and use the constructor in the tests. type EigenDAClient struct { // TODO: all of these should be private, to prevent users from using them directly, // which breaks encapsulation and makes it hard for us to do refactors or changes @@ -108,6 +109,7 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien } disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS) + disperserClient, err := NewDisperserClient(disperserConfig, signer) if err != nil { return nil, fmt.Errorf("new disperser-client: %w", err) @@ -237,6 +239,7 @@ func (m *EigenDAClient) putBlob(ctxFinality context.Context, rawData []byte, res } // disperse blob // TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda + // clients with a payment signer setting can disperse paid blobs _, requestID, err := m.Client.DisperseBlobAuthenticated(ctxFinality, data, customQuorumNumbers) if err != nil { // DisperserClient returned error is already a grpc error which can be a 400 (eg rate limited) or 500, diff --git a/api/clients/mock/disperser_client.go b/api/clients/mock/disperser_client.go index c7ab9627f0..3763e81304 100644 --- a/api/clients/mock/disperser_client.go +++ b/api/clients/mock/disperser_client.go @@ -81,11 +81,6 @@ func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quo return status, key, err } -// TODO: implement in the subsequent PR -func (c *MockDisperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - return nil, nil, nil -} - func (c *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { args := c.Called(key) var reply *disperser_rpc.BlobStatusReply diff --git a/api/clients/retrieval_client.go b/api/clients/retrieval_client.go index 8bc034cd28..e8782a5e6b 100644 --- a/api/clients/retrieval_client.go +++ b/api/clients/retrieval_client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" diff --git a/api/grpc/disperser/disperser.pb.go b/api/grpc/disperser/disperser.pb.go index 3b29214a8e..d10d4d7366 100644 --- a/api/grpc/disperser/disperser.pb.go +++ b/api/grpc/disperser/disperser.pb.go @@ -1432,38 +1432,33 @@ var file_disperser_disperser_proto_rawDesc = []byte{ 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x04, 0x12, 0x1b, 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f, 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x12, 0x0e, 0x0a, - 0x0a, 0x44, 0x49, 0x53, 0x50, 0x45, 0x52, 0x53, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x32, 0xb1, 0x03, + 0x0a, 0x44, 0x49, 0x53, 0x50, 0x45, 0x52, 0x53, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x32, 0xd9, 0x02, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0c, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x10, 0x44, - 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x50, 0x61, 0x69, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x12, - 0x22, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, - 0x65, 0x72, 0x73, 0x65, 0x50, 0x61, 0x69, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, - 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, - 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, - 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, - 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, - 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, - 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, - 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, - 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, - 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, - 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, - 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, - 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, - 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, + 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, + 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, + 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, + 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, + 0x74, 0x65, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, + 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, + 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, + 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, + 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, + 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, + 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1518,17 +1513,15 @@ var file_disperser_disperser_proto_depIdxs = []int32{ 16, // 12: disperser.BlobVerificationProof.batch_metadata:type_name -> disperser.BatchMetadata 17, // 13: disperser.BatchMetadata.batch_header:type_name -> disperser.BatchHeader 5, // 14: disperser.Disperser.DisperseBlob:input_type -> disperser.DisperseBlobRequest - 6, // 15: disperser.Disperser.DispersePaidBlob:input_type -> disperser.DispersePaidBlobRequest - 1, // 16: disperser.Disperser.DisperseBlobAuthenticated:input_type -> disperser.AuthenticatedRequest - 8, // 17: disperser.Disperser.GetBlobStatus:input_type -> disperser.BlobStatusRequest - 10, // 18: disperser.Disperser.RetrieveBlob:input_type -> disperser.RetrieveBlobRequest - 7, // 19: disperser.Disperser.DisperseBlob:output_type -> disperser.DisperseBlobReply - 7, // 20: disperser.Disperser.DispersePaidBlob:output_type -> disperser.DisperseBlobReply - 2, // 21: disperser.Disperser.DisperseBlobAuthenticated:output_type -> disperser.AuthenticatedReply - 9, // 22: disperser.Disperser.GetBlobStatus:output_type -> disperser.BlobStatusReply - 11, // 23: disperser.Disperser.RetrieveBlob:output_type -> disperser.RetrieveBlobReply - 19, // [19:24] is the sub-list for method output_type - 14, // [14:19] is the sub-list for method input_type + 1, // 15: disperser.Disperser.DisperseBlobAuthenticated:input_type -> disperser.AuthenticatedRequest + 8, // 16: disperser.Disperser.GetBlobStatus:input_type -> disperser.BlobStatusRequest + 10, // 17: disperser.Disperser.RetrieveBlob:input_type -> disperser.RetrieveBlobRequest + 7, // 18: disperser.Disperser.DisperseBlob:output_type -> disperser.DisperseBlobReply + 2, // 19: disperser.Disperser.DisperseBlobAuthenticated:output_type -> disperser.AuthenticatedReply + 9, // 20: disperser.Disperser.GetBlobStatus:output_type -> disperser.BlobStatusReply + 11, // 21: disperser.Disperser.RetrieveBlob:output_type -> disperser.RetrieveBlobReply + 18, // [18:22] is the sub-list for method output_type + 14, // [14:18] is the sub-list for method input_type 14, // [14:14] is the sub-list for extension type_name 14, // [14:14] is the sub-list for extension extendee 0, // [0:14] is the sub-list for field type_name diff --git a/api/grpc/disperser/disperser_grpc.pb.go b/api/grpc/disperser/disperser_grpc.pb.go index aac01c73b2..c6bb719a2b 100644 --- a/api/grpc/disperser/disperser_grpc.pb.go +++ b/api/grpc/disperser/disperser_grpc.pb.go @@ -20,7 +20,6 @@ const _ = grpc.SupportPackageIsVersion7 const ( Disperser_DisperseBlob_FullMethodName = "/disperser.Disperser/DisperseBlob" - Disperser_DispersePaidBlob_FullMethodName = "/disperser.Disperser/DispersePaidBlob" Disperser_DisperseBlobAuthenticated_FullMethodName = "/disperser.Disperser/DisperseBlobAuthenticated" Disperser_GetBlobStatus_FullMethodName = "/disperser.Disperser/GetBlobStatus" Disperser_RetrieveBlob_FullMethodName = "/disperser.Disperser/RetrieveBlob" @@ -43,11 +42,6 @@ type DisperserClient interface { // // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(ctx context.Context, in *DisperseBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - DispersePaidBlob(ctx context.Context, in *DispersePaidBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message @@ -85,15 +79,6 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, in *DisperseBlobRequ return out, nil } -func (c *disperserClient) DispersePaidBlob(ctx context.Context, in *DispersePaidBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) { - out := new(DisperseBlobReply) - err := c.cc.Invoke(ctx, Disperser_DispersePaidBlob_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, opts ...grpc.CallOption) (Disperser_DisperseBlobAuthenticatedClient, error) { stream, err := c.cc.NewStream(ctx, &Disperser_ServiceDesc.Streams[0], Disperser_DisperseBlobAuthenticated_FullMethodName, opts...) if err != nil { @@ -160,11 +145,6 @@ type DisperserServer interface { // // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(context.Context, *DisperseBlobRequest) (*DisperseBlobReply, error) - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - DispersePaidBlob(context.Context, *DispersePaidBlobRequest) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message @@ -193,9 +173,6 @@ type UnimplementedDisperserServer struct { func (UnimplementedDisperserServer) DisperseBlob(context.Context, *DisperseBlobRequest) (*DisperseBlobReply, error) { return nil, status.Errorf(codes.Unimplemented, "method DisperseBlob not implemented") } -func (UnimplementedDisperserServer) DispersePaidBlob(context.Context, *DispersePaidBlobRequest) (*DisperseBlobReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method DispersePaidBlob not implemented") -} func (UnimplementedDisperserServer) DisperseBlobAuthenticated(Disperser_DisperseBlobAuthenticatedServer) error { return status.Errorf(codes.Unimplemented, "method DisperseBlobAuthenticated not implemented") } @@ -236,24 +213,6 @@ func _Disperser_DisperseBlob_Handler(srv interface{}, ctx context.Context, dec f return interceptor(ctx, in, info, handler) } -func _Disperser_DispersePaidBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DispersePaidBlobRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DisperserServer).DispersePaidBlob(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Disperser_DispersePaidBlob_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DisperserServer).DispersePaidBlob(ctx, req.(*DispersePaidBlobRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _Disperser_DisperseBlobAuthenticated_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(DisperserServer).DisperseBlobAuthenticated(&disperserDisperseBlobAuthenticatedServer{stream}) } @@ -327,10 +286,6 @@ var Disperser_ServiceDesc = grpc.ServiceDesc{ MethodName: "DisperseBlob", Handler: _Disperser_DisperseBlob_Handler, }, - { - MethodName: "DispersePaidBlob", - Handler: _Disperser_DispersePaidBlob_Handler, - }, { MethodName: "GetBlobStatus", Handler: _Disperser_GetBlobStatus_Handler, diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index 7d21ac3f18..f4bfa1384c 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -17,13 +17,6 @@ service Disperser { // INTERNAL (500): serious error, user should NOT retry. rpc DisperseBlob(DisperseBlobRequest) returns (DisperseBlobReply) {} - // This API require valid payments to accept blob to disperse from clients. - // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the - // processing status of the blob. - rpc DispersePaidBlob(DispersePaidBlobRequest) returns (DisperseBlobReply) {} - - // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: // 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message diff --git a/core/auth.go b/core/auth.go index 349f5b1aff..7e7669aa21 100644 --- a/core/auth.go +++ b/core/auth.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" geth "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -51,6 +50,6 @@ func VerifySignature(message []byte, accountAddr geth.Address, sig []byte) error } type PaymentSigner interface { - SignBlobPayment(header *commonpb.PaymentHeader) ([]byte, error) + SignBlobPayment(header *PaymentMetadata) ([]byte, error) GetAccountID() string } diff --git a/core/auth/payment_signer.go b/core/auth/payment_signer.go index 2afd16ea6e..b222830406 100644 --- a/core/auth/payment_signer.go +++ b/core/auth/payment_signer.go @@ -4,7 +4,6 @@ import ( "crypto/ecdsa" "fmt" - commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" "github.com/Layr-Labs/eigenda/core" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -32,9 +31,7 @@ func NewPaymentSigner(privateKeyHex string) (*paymentSigner, error) { } // SignBlobPayment signs the payment header and returns the signature -func (s *paymentSigner) SignBlobPayment(header *commonpb.PaymentHeader) ([]byte, error) { - header.AccountId = s.GetAccountID() - pm := core.ConvertPaymentHeader(header) +func (s *paymentSigner) SignBlobPayment(pm *core.PaymentMetadata) ([]byte, error) { hash, err := pm.Hash() if err != nil { return nil, fmt.Errorf("failed to hash payment header: %w", err) @@ -54,7 +51,7 @@ func NewNoopPaymentSigner() *NoopPaymentSigner { return &NoopPaymentSigner{} } -func (s *NoopPaymentSigner) SignBlobPayment(header *commonpb.PaymentHeader) ([]byte, error) { +func (s *NoopPaymentSigner) SignBlobPayment(header *core.PaymentMetadata) ([]byte, error) { return nil, fmt.Errorf("noop signer cannot sign blob payment header") } @@ -63,9 +60,8 @@ func (s *NoopPaymentSigner) GetAccountID() string { } // VerifyPaymentSignature verifies the signature against the payment metadata -func VerifyPaymentSignature(paymentHeader *commonpb.PaymentHeader, paymentSignature []byte) error { - pm := core.ConvertPaymentHeader(paymentHeader) - hash, err := pm.Hash() +func VerifyPaymentSignature(paymentHeader *core.PaymentMetadata, paymentSignature []byte) error { + hash, err := paymentHeader.Hash() if err != nil { return fmt.Errorf("failed to hash payment header: %w", err) } @@ -76,7 +72,7 @@ func VerifyPaymentSignature(paymentHeader *commonpb.PaymentHeader, paymentSignat } recoveredAddress := crypto.PubkeyToAddress(*recoveredPubKey) - accountId := common.HexToAddress(paymentHeader.AccountId) + accountId := common.HexToAddress(paymentHeader.AccountID) if recoveredAddress != accountId { return fmt.Errorf("signature address %s does not match account id %s", recoveredAddress.Hex(), accountId.Hex()) } diff --git a/core/auth/payment_signer_test.go b/core/auth/payment_signer_test.go index ecd9ec692d..e2c1a172ad 100644 --- a/core/auth/payment_signer_test.go +++ b/core/auth/payment_signer_test.go @@ -2,9 +2,10 @@ package auth_test import ( "encoding/hex" + "math/big" "testing" - commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" @@ -20,10 +21,10 @@ func TestPaymentSigner(t *testing.T) { require.NoError(t, err) t.Run("SignBlobPayment", func(t *testing.T) { - header := &commonpb.PaymentHeader{ + header := &core.PaymentMetadata{ + AccountID: signer.GetAccountID(), BinIndex: 1, - CumulativePayment: []byte{0x01, 0x02, 0x03}, - AccountId: "", + CumulativePayment: big.NewInt(1), } signature, err := signer.SignBlobPayment(header) @@ -36,10 +37,10 @@ func TestPaymentSigner(t *testing.T) { }) t.Run("VerifyPaymentSignature_InvalidSignature", func(t *testing.T) { - header := &commonpb.PaymentHeader{ + header := &core.PaymentMetadata{ BinIndex: 1, - CumulativePayment: []byte{0x01, 0x02, 0x03}, - AccountId: "", + CumulativePayment: big.NewInt(1), + AccountID: signer.GetAccountID(), } // Create an invalid signature @@ -49,10 +50,10 @@ func TestPaymentSigner(t *testing.T) { }) t.Run("VerifyPaymentSignature_ModifiedHeader", func(t *testing.T) { - header := &commonpb.PaymentHeader{ + header := &core.PaymentMetadata{ BinIndex: 1, - CumulativePayment: []byte{0x01, 0x02, 0x03}, - AccountId: "", + CumulativePayment: big.NewInt(1), + AccountID: signer.GetAccountID(), } signature, err := signer.SignBlobPayment(header) diff --git a/core/data.go b/core/data.go index 0742f5aa0b..e4e990e620 100644 --- a/core/data.go +++ b/core/data.go @@ -572,6 +572,24 @@ func ConvertPaymentHeader(header *commonpb.PaymentHeader) *PaymentMetadata { } } +// ConvertToProtoPaymentHeader converts a PaymentMetadata to a protobuf payment header +func (pm *PaymentMetadata) ConvertToProtoPaymentHeader() *commonpb.PaymentHeader { + return &commonpb.PaymentHeader{ + AccountId: pm.AccountID, + BinIndex: pm.BinIndex, + CumulativePayment: pm.CumulativePayment.Bytes(), + } +} + +// ConvertToProtoPaymentHeader converts a PaymentMetadata to a protobuf payment header +func ConvertToPaymentMetadata(ph *commonpb.PaymentHeader) *PaymentMetadata { + return &PaymentMetadata{ + AccountID: ph.AccountId, + BinIndex: ph.BinIndex, + CumulativePayment: new(big.Int).SetBytes(ph.CumulativePayment), + } +} + // OperatorInfo contains information about an operator which is stored on the blockchain state, // corresponding to a particular quorum type ActiveReservation struct { diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 53ac0848af..ba1f2cfc65 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -70,15 +70,14 @@ func (m *Meterer) Start(ctx context.Context) { // MeterRequest validates a blob header and adds it to the meterer's state // TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare) -func (m *Meterer) MeterRequest(ctx context.Context, blob core.Blob, header core.PaymentMetadata) error { - headerQuorums := blob.GetQuorumNumbers() +func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) error { // Validate against the payment method if header.CumulativePayment.Sign() == 0 { reservation, err := m.ChainPaymentState.GetActiveReservationByAccount(ctx, header.AccountID) if err != nil { return fmt.Errorf("failed to get active reservation by account: %w", err) } - if err := m.ServeReservationRequest(ctx, header, &reservation, blob.RequestHeader.BlobAuthHeader.Length, headerQuorums); err != nil { + if err := m.ServeReservationRequest(ctx, header, &reservation, numSymbols, quorumNumbers); err != nil { return fmt.Errorf("invalid reservation: %w", err) } } else { @@ -86,7 +85,7 @@ func (m *Meterer) MeterRequest(ctx context.Context, blob core.Blob, header core. if err != nil { return fmt.Errorf("failed to get on-demand payment by account: %w", err) } - if err := m.ServeOnDemandRequest(ctx, header, &onDemandPayment, blob.RequestHeader.BlobAuthHeader.Length, headerQuorums); err != nil { + if err := m.ServeOnDemandRequest(ctx, header, &onDemandPayment, numSymbols, quorumNumbers); err != nil { return fmt.Errorf("invalid on-demand request: %w", err) } } @@ -95,7 +94,7 @@ func (m *Meterer) MeterRequest(ctx context.Context, blob core.Blob, header core. } // ServeReservationRequest handles the rate limiting logic for incoming requests -func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, blobLength uint, quorumNumbers []uint8) error { +func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, numSymbols uint, quorumNumbers []uint8) error { if err := m.ValidateQuorum(quorumNumbers, reservation.QuorumNumbers); err != nil { return fmt.Errorf("invalid quorum for reservation: %w", err) } @@ -104,7 +103,7 @@ func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.Payme } // Update bin usage atomically and check against reservation's data rate as the bin limit - if err := m.IncrementBinUsage(ctx, header, reservation, blobLength); err != nil { + if err := m.IncrementBinUsage(ctx, header, reservation, numSymbols); err != nil { return fmt.Errorf("bin overflows: %w", err) } @@ -143,9 +142,9 @@ func (m *Meterer) ValidateBinIndex(header core.PaymentMetadata, reservation *cor } // IncrementBinUsage increments the bin usage atomically and checks for overflow -func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, blobLength uint) error { - numSymbols := m.SymbolsCharged(blobLength) - newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.BinIndex), uint64(numSymbols)) +func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, numSymbols uint) error { + symbolsCharged := m.SymbolsCharged(numSymbols) + newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.BinIndex), uint64(symbolsCharged)) if err != nil { return fmt.Errorf("failed to increment bin usage: %w", err) } @@ -177,7 +176,7 @@ func GetBinIndex(timestamp uint64, binInterval uint32) uint32 { // ServeOnDemandRequest handles the rate limiting logic for incoming requests // On-demand requests doesn't have additional quorum settings and should only be // allowed by ETH and EIGEN quorums -func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, blobLength uint, headerQuorums []uint8) error { +func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint, headerQuorums []uint8) error { quorumNumbers, err := m.ChainPaymentState.GetOnDemandQuorumNumbers(ctx) if err != nil { return fmt.Errorf("failed to get on-demand quorum numbers: %w", err) @@ -187,13 +186,13 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM return fmt.Errorf("invalid quorum for On-Demand Request: %w", err) } // update blob header to use the miniumum chargeable size - symbolsCharged := m.SymbolsCharged(blobLength) + symbolsCharged := m.SymbolsCharged(numSymbols) err = m.OffchainStore.AddOnDemandPayment(ctx, header, symbolsCharged) if err != nil { return fmt.Errorf("failed to update cumulative payment: %w", err) } // Validate payments attached - err = m.ValidatePayment(ctx, header, onDemandPayment, blobLength) + err = m.ValidatePayment(ctx, header, onDemandPayment, numSymbols) if err != nil { // No tolerance for incorrect payment amounts; no rollbacks return fmt.Errorf("invalid on-demand payment: %w", err) @@ -215,25 +214,25 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM // ValidatePayment checks if the provided payment header is valid against the local accounting // prevPmt is the largest cumulative payment strictly less than PaymentMetadata.cumulativePayment if exists // nextPmt is the smallest cumulative payment strictly greater than PaymentMetadata.cumulativePayment if exists -// nextPmtDataLength is the dataLength of corresponding to nextPmt if exists -// prevPmt + PaymentMetadata.DataLength * m.FixedFeePerByte +// nextPmtnumSymbols is the numSymbols of corresponding to nextPmt if exists +// prevPmt + PaymentMetadata.numSymbols * m.FixedFeePerByte // <= PaymentMetadata.CumulativePayment -// <= nextPmt - nextPmtDataLength * m.FixedFeePerByte > nextPmt -func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, blobLength uint) error { +// <= nextPmt - nextPmtnumSymbols * m.FixedFeePerByte > nextPmt +func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetadata, onDemandPayment *core.OnDemandPayment, numSymbols uint) error { if header.CumulativePayment.Cmp(onDemandPayment.CumulativePayment) > 0 { return fmt.Errorf("request claims a cumulative payment greater than the on-chain deposit") } - prevPmt, nextPmt, nextPmtDataLength, err := m.OffchainStore.GetRelevantOnDemandRecords(ctx, header.AccountID, header.CumulativePayment) // zero if DNE + prevPmt, nextPmt, nextPmtnumSymbols, err := m.OffchainStore.GetRelevantOnDemandRecords(ctx, header.AccountID, header.CumulativePayment) // zero if DNE if err != nil { return fmt.Errorf("failed to get relevant on-demand records: %w", err) } // the current request must increment cumulative payment by a magnitude sufficient to cover the blob size - if prevPmt+m.PaymentCharged(blobLength) > header.CumulativePayment.Uint64() { + if prevPmt+m.PaymentCharged(numSymbols) > header.CumulativePayment.Uint64() { return fmt.Errorf("insufficient cumulative payment increment") } // the current request must not break the payment magnitude for the next payment if the two requests were delivered out-of-order - if nextPmt != 0 && header.CumulativePayment.Uint64()+m.PaymentCharged(uint(nextPmtDataLength)) > nextPmt { + if nextPmt != 0 && header.CumulativePayment.Uint64()+m.PaymentCharged(uint(nextPmtnumSymbols)) > nextPmt { return fmt.Errorf("breaking cumulative payment invariants") } // check passed: blob can be safely inserted into the set of payments @@ -241,19 +240,18 @@ func (m *Meterer) ValidatePayment(ctx context.Context, header core.PaymentMetada } // PaymentCharged returns the chargeable price for a given data length -func (m *Meterer) PaymentCharged(dataLength uint) uint64 { - return uint64(m.SymbolsCharged(dataLength)) * uint64(m.ChainPaymentState.GetPricePerSymbol()) +func (m *Meterer) PaymentCharged(numSymbols uint) uint64 { + return uint64(m.SymbolsCharged(numSymbols)) * uint64(m.ChainPaymentState.GetPricePerSymbol()) } // SymbolsCharged returns the number of symbols charged for a given data length // being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols. -func (m *Meterer) SymbolsCharged(dataLength uint) uint32 { - if dataLength <= uint(m.ChainPaymentState.GetMinNumSymbols()) { +func (m *Meterer) SymbolsCharged(numSymbols uint) uint32 { + if numSymbols <= uint(m.ChainPaymentState.GetMinNumSymbols()) { return m.ChainPaymentState.GetMinNumSymbols() } // Round up to the nearest multiple of MinNumSymbols - fmt.Println("return ", uint32(core.RoundUpDivide(uint(dataLength), uint(m.ChainPaymentState.GetMinNumSymbols())))*m.ChainPaymentState.GetMinNumSymbols()) - return uint32(core.RoundUpDivide(uint(dataLength), uint(m.ChainPaymentState.GetMinNumSymbols()))) * m.ChainPaymentState.GetMinNumSymbols() + return uint32(core.RoundUpDivide(uint(numSymbols), uint(m.ChainPaymentState.GetMinNumSymbols()))) * m.ChainPaymentState.GetMinNumSymbols() } // ValidateBinIndex checks if the provided bin index is valid diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 8fb2d1b9df..aa2537ef34 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -15,7 +15,6 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" - "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/ethereum/go-ethereum/crypto" @@ -185,17 +184,17 @@ func TestMetererReservations(t *testing.T) { paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, fmt.Errorf("reservation not found")) // test invalid quorom ID - blob, header := createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, accountID1) - err := mt.MeterRequest(ctx, *blob, *header) + header := createPaymentHeader(1, 0, accountID1) + err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "quorum number mismatch") // overwhelming bin overflow for empty bins - blob, header = createMetererInput(binIndex-1, 0, 10, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex-1, 0, accountID2) + err = mt.MeterRequest(ctx, *header, 10, quoromNumbers) assert.NoError(t, err) // overwhelming bin overflow for empty bins - blob, header = createMetererInput(binIndex-1, 0, 1000, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex-1, 0, accountID2) + err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers) assert.ErrorContains(t, err, "overflow usage exceeds bin limit") // test non-existent account @@ -203,22 +202,22 @@ func TestMetererReservations(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - blob, header = createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) + header = createPaymentHeader(1, 0, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "failed to get active reservation by account: reservation not found") // test invalid bin index - blob, header = createMetererInput(binIndex, 0, 2000, quoromNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 0, accountID1) + err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers) assert.ErrorContains(t, err, "invalid bin index for reservation") // test bin usage metering - dataLength := uint(20) + symbolLength := uint(20) requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3 for i := 0; i < 9; i++ { - blob, header = createMetererInput(binIndex, 0, dataLength, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 0, accountID2) + err = mt.MeterRequest(ctx, *header, symbolLength, quoromNumbers) assert.NoError(t, err) item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{ "AccountID": &types.AttributeValueMemberS{Value: accountID2}, @@ -231,9 +230,9 @@ func TestMetererReservations(t *testing.T) { } // first over flow is allowed - blob, header = createMetererInput(binIndex, 0, 25, quoromNumbers, accountID2) + header = createPaymentHeader(binIndex, 0, accountID2) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 25, quoromNumbers) assert.NoError(t, err) overflowedBinIndex := binIndex + 2 item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{ @@ -247,9 +246,9 @@ func TestMetererReservations(t *testing.T) { assert.Equal(t, strconv.Itoa(int(16)), item["BinUsage"].(*types.AttributeValueMemberN).Value) // second over flow - blob, header = createMetererInput(binIndex, 0, 1, quoromNumbers, accountID2) + header = createPaymentHeader(binIndex, 0, accountID2) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1, quoromNumbers) assert.ErrorContains(t, err, "bin has already been filled") } @@ -274,19 +273,19 @@ func TestMetererOnDemand(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - blob, header := createMetererInput(binIndex, 2, 1000, quorumNumbers, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) + header := createPaymentHeader(binIndex, 2, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers) assert.ErrorContains(t, err, "failed to get on-demand payment by account: payment not found") // test invalid quorom ID - blob, header = createMetererInput(binIndex, 1, 1000, []uint8{0, 1, 2}, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 1, accountID1) + err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "invalid quorum for On-Demand Request") // test insufficient cumulative payment - blob, header = createMetererInput(binIndex, 1, 2000, quorumNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 1, accountID1) + err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers) assert.ErrorContains(t, err, "insufficient cumulative payment increment") // No rollback after meter request result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ @@ -297,40 +296,40 @@ func TestMetererOnDemand(t *testing.T) { assert.Equal(t, 1, len(result)) // test duplicated cumulative payments - dataLength := uint(100) - priceCharged := mt.PaymentCharged(dataLength) + symbolLength := uint(100) + priceCharged := mt.PaymentCharged(symbolLength) assert.Equal(t, uint64(102*mt.ChainPaymentState.GetPricePerSymbol()), priceCharged) - blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, priceCharged, accountID2) + err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers) assert.NoError(t, err) - blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, priceCharged, accountID2) + err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers) assert.ErrorContains(t, err, "exact payment already exists") // test valid payments for i := 1; i < 9; i++ { - blob, header = createMetererInput(binIndex, uint64(priceCharged)*uint64(i+1), dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, uint64(priceCharged)*uint64(i+1), accountID2) + err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers) assert.NoError(t, err) } // test cumulative payment on-chain constraint - blob, header = createMetererInput(binIndex, 2023, 1, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 2023, accountID2) + err = mt.MeterRequest(ctx, *header, 1, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: request claims a cumulative payment greater than the on-chain deposit") // test insufficient increment in cumulative payment previousCumulativePayment := uint64(priceCharged) * uint64(9) - dataLength = uint(2) - priceCharged = mt.PaymentCharged(dataLength) - blob, header = createMetererInput(binIndex, previousCumulativePayment+priceCharged-1, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + symbolLength = uint(2) + priceCharged = mt.PaymentCharged(symbolLength) + header = createPaymentHeader(binIndex, previousCumulativePayment+priceCharged-1, accountID2) + err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: insufficient cumulative payment increment") previousCumulativePayment = previousCumulativePayment + priceCharged // test cannot insert cumulative payment in out of order - blob, header = createMetererInput(binIndex, mt.PaymentCharged(50), 50, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, mt.PaymentCharged(50), accountID2) + err = mt.MeterRequest(ctx, *header, 50, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") numPrevRecords := 12 @@ -341,9 +340,8 @@ func TestMetererOnDemand(t *testing.T) { assert.NoError(t, err) assert.Equal(t, numPrevRecords, len(result)) // test failed global rate limit (previously payment recorded: 2, global limit: 1009) - fmt.Println("need ", previousCumulativePayment+mt.PaymentCharged(1010)) - blob, header = createMetererInput(binIndex, previousCumulativePayment+mt.PaymentCharged(1010), 1010, quorumNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, previousCumulativePayment+mt.PaymentCharged(1010), accountID1) + err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers) assert.ErrorContains(t, err, "failed global rate limiting") // Correct rollback result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ @@ -357,42 +355,42 @@ func TestMetererOnDemand(t *testing.T) { func TestMeterer_paymentCharged(t *testing.T) { tests := []struct { name string - dataLength uint + symbolLength uint pricePerSymbol uint32 minNumSymbols uint32 expected uint64 }{ { name: "Data length equal to min chargeable size", - dataLength: 1024, + symbolLength: 1024, pricePerSymbol: 1, minNumSymbols: 1024, expected: 1024, }, { name: "Data length less than min chargeable size", - dataLength: 512, + symbolLength: 512, pricePerSymbol: 1, minNumSymbols: 1024, expected: 1024, }, { name: "Data length greater than min chargeable size", - dataLength: 2048, + symbolLength: 2048, pricePerSymbol: 1, minNumSymbols: 1024, expected: 2048, }, { name: "Large data length", - dataLength: 1 << 20, // 1 MB + symbolLength: 1 << 20, // 1 MB pricePerSymbol: 1, minNumSymbols: 1024, expected: 1 << 20, }, { name: "Price not evenly divisible by min chargeable size", - dataLength: 1536, + symbolLength: 1536, pricePerSymbol: 1, minNumSymbols: 1024, expected: 2048, @@ -407,7 +405,7 @@ func TestMeterer_paymentCharged(t *testing.T) { m := &meterer.Meterer{ ChainPaymentState: paymentChainState, } - result := m.PaymentCharged(tt.dataLength) + result := m.PaymentCharged(tt.symbolLength) assert.Equal(t, tt.expected, result) }) } @@ -416,37 +414,37 @@ func TestMeterer_paymentCharged(t *testing.T) { func TestMeterer_symbolsCharged(t *testing.T) { tests := []struct { name string - dataLength uint + symbolLength uint minNumSymbols uint32 expected uint32 }{ { name: "Data length equal to min number of symobols", - dataLength: 1024, + symbolLength: 1024, minNumSymbols: 1024, expected: 1024, }, { name: "Data length less than min number of symbols", - dataLength: 512, + symbolLength: 512, minNumSymbols: 1024, expected: 1024, }, { name: "Data length greater than min number of symbols", - dataLength: 2048, + symbolLength: 2048, minNumSymbols: 1024, expected: 2048, }, { name: "Large data length", - dataLength: 1 << 20, // 1 MB + symbolLength: 1 << 20, // 1 MB minNumSymbols: 1024, expected: 1 << 20, }, { name: "Very small data length", - dataLength: 16, + symbolLength: 16, minNumSymbols: 1024, expected: 1024, }, @@ -459,34 +457,16 @@ func TestMeterer_symbolsCharged(t *testing.T) { m := &meterer.Meterer{ ChainPaymentState: paymentChainState, } - result := m.SymbolsCharged(tt.dataLength) + result := m.SymbolsCharged(tt.symbolLength) assert.Equal(t, tt.expected, result) }) } } -func createMetererInput(binIndex uint32, cumulativePayment uint64, dataLength uint, quorumNumbers []uint8, accountID string) (blob *core.Blob, header *core.PaymentMetadata) { - sp := make([]*core.SecurityParam, len(quorumNumbers)) - for i, quorumID := range quorumNumbers { - sp[i] = &core.SecurityParam{ - QuorumID: quorumID, - } - } - blob = &core.Blob{ - RequestHeader: core.BlobRequestHeader{ - BlobAuthHeader: core.BlobAuthHeader{ - AccountID: accountID2, - BlobCommitments: encoding.BlobCommitments{ - Length: dataLength, - }, - }, - SecurityParams: sp, - }, - } - header = &core.PaymentMetadata{ +func createPaymentHeader(binIndex uint32, cumulativePayment uint64, accountID string) *core.PaymentMetadata { + return &core.PaymentMetadata{ AccountID: accountID, BinIndex: binIndex, CumulativePayment: big.NewInt(int64(cumulativePayment)), } - return blob, header } diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 7db5a1bd55..2725ef765e 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -105,6 +105,7 @@ func (pcs *OnchainPaymentState) RefreshOnchainPaymentState(ctx context.Context, if err != nil { return err } + // These parameters should be rarely updated, but we refresh them anyway pcs.PaymentVaultParams = paymentVaultParams return nil } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index cf9b1e8538..cc76549d43 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "errors" "fmt" - "math/big" "net" "slices" "strings" @@ -260,7 +259,7 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob // to track the error again. func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string, apiMethodName string, paymentHeader *core.PaymentMetadata) (*pb.DisperseBlobReply, error) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds + s.metrics.ObserveLatency(apiMethodName, f*1000) // make milliseconds })) defer timer.ObserveDuration() @@ -285,7 +284,8 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut // If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available if paymentHeader != nil { - err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader) + blobLength := encoding.GetBlobLength(uint(blobSize)) + err := s.meterer.MeterRequest(ctx, *paymentHeader, blobLength, blob.GetQuorumNumbers()) if err != nil { return nil, api.NewErrorResourceExhausted(err.Error()) } @@ -318,44 +318,6 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } -func (s *DispersalServer) DispersePaidBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*pb.DisperseBlobReply, error) { - // If EnablePaymentMeter is false, meterer gets set to nil at start - // In that case, the function should not continue. (checking ) - if s.meterer == nil { - return nil, api.NewErrorInternal("payment feature is not enabled") - } - blob, err := s.validatePaidRequestAndGetBlob(ctx, req) - binIndex := req.PaymentHeader.BinIndex - cumulativePayment := new(big.Int).SetBytes(req.PaymentHeader.CumulativePayment) - //todo: before disperse blob, validate the signature - signature := req.PaymentSignature - if err != nil { - for _, quorumID := range req.QuorumNumbers { - s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "DispersePaidBlob") - } - s.metrics.HandleInvalidArgRpcRequest("DispersePaidBlob") - return nil, api.NewErrorInvalidArg(err.Error()) - } - - if err = auth.VerifyPaymentSignature(req.GetPaymentHeader(), signature); err != nil { - return nil, api.NewErrorInvalidArg("payment signature is invalid") - } - - paymentHeader := core.PaymentMetadata{ - AccountID: blob.RequestHeader.AccountID, - BinIndex: binIndex, - CumulativePayment: cumulativePayment, - } - reply, err := s.disperseBlob(ctx, blob, "", "DispersePaidBlob", &paymentHeader) - if err != nil { - // Note the DispersePaidBlob already updated metrics for this error. - s.logger.Info("failed to disperse blob", "err", err) - } else { - s.metrics.HandleSuccessfulRpcRequest("DispersePaidBlob") - } - return reply, err -} - func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { @@ -1078,97 +1040,3 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb return blob, nil } - -// TODO: refactor checks with validateRequestAndGetBlob; most checks are the same, but paid requests have different quorum requirements -func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*core.Blob, error) { - - data := req.GetData() - blobSize := len(data) - // The blob size in bytes must be in range [1, maxBlobSize]. - if blobSize > s.maxBlobSize { - return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) - } - if blobSize == 0 { - return nil, fmt.Errorf("blob size must be greater than 0") - } - - if len(req.GetQuorumNumbers()) > 256 { - return nil, errors.New("number of custom_quorum_numbers must not exceed 256") - } - - // validate every 32 bytes is a valid field element - _, err := rs.ToFrArray(data) - if err != nil { - s.logger.Error("failed to convert a 32bytes as a field element", "err", err) - return nil, api.NewErrorInvalidArg(fmt.Sprintf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617: %v", err)) - } - - quorumConfig, err := s.updateQuorumConfig(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get quorum config: %w", err) - } - - if len(req.GetQuorumNumbers()) > int(quorumConfig.QuorumCount) { - return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums") - } - - seenQuorums := make(map[uint8]struct{}) - - // TODO: validate payment signature against payment metadata - if err = auth.VerifyPaymentSignature(req.GetPaymentHeader(), req.GetPaymentSignature()); err != nil { - return nil, fmt.Errorf("payment signature is invalid: %w", err) - } - // Unlike regular blob dispersal request validation, there's no check with required quorums - // Because Reservation has their specific quorum requirements, and on-demand is only allowed and paid to the required quorums. - // Payment specific validations are done within the meterer library. - for i := range req.GetQuorumNumbers() { - - if req.GetQuorumNumbers()[i] > core.MaxQuorumID { - return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, 254], but found %d", req.GetQuorumNumbers()[i]) - } - - quorumID := uint8(req.GetQuorumNumbers()[i]) - if quorumID >= quorumConfig.QuorumCount { - return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, %d], but found %d", s.quorumConfig.QuorumCount-1, quorumID) - } - - if _, ok := seenQuorums[quorumID]; ok { - return nil, fmt.Errorf("custom_quorum_numbers must not contain duplicates") - } - seenQuorums[quorumID] = struct{}{} - - } - - if len(seenQuorums) == 0 { - return nil, fmt.Errorf("the blob must be sent to at least one quorum") - } - - params := make([]*core.SecurityParam, len(seenQuorums)) - i := 0 - for quorumID := range seenQuorums { - params[i] = &core.SecurityParam{ - QuorumID: core.QuorumID(quorumID), - AdversaryThreshold: quorumConfig.SecurityParams[quorumID].AdversaryThreshold, - ConfirmationThreshold: quorumConfig.SecurityParams[quorumID].ConfirmationThreshold, - } - err = params[i].Validate() - if err != nil { - return nil, fmt.Errorf("invalid request: %w", err) - } - i++ - } - - header := core.BlobRequestHeader{ - BlobAuthHeader: core.BlobAuthHeader{ - AccountID: req.PaymentHeader.AccountId, - }, - SecurityParams: params, - } - - blob := &core.Blob{ - RequestHeader: header, - Data: data, - } - - return blob, nil -} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 4f6535769a..ce5bce89cc 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -5,6 +5,8 @@ import ( "crypto/rand" "flag" "fmt" + "math" + "math/big" "net" "os" "runtime" @@ -701,6 +703,22 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal panic("failed to make initial query to the on-chain state") } + mockState.On("GetPricePerSymbol").Return(uint32(encoding.BYTES_PER_SYMBOL), nil) + mockState.On("GetMinNumSymbols").Return(uint32(1), nil) + mockState.On("GetGlobalSymbolsPerSecond").Return(uint64(4096), nil) + mockState.On("GetRequiredQuorumNumbers").Return([]uint8{0, 1}, nil) + mockState.On("GetOnDemandQuorumNumbers").Return([]uint8{0, 1}, nil) + mockState.On("GetReservationWindow").Return(uint32(1), nil) + mockState.On("GetOnDemandPaymentByAccount", tmock.Anything, tmock.Anything).Return(core.OnDemandPayment{ + CumulativePayment: big.NewInt(3000), + }, nil) + mockState.On("GetActiveReservationByAccount", tmock.Anything, tmock.Anything).Return(core.ActiveReservation{ + SymbolsPerSec: 2048, + StartTimestamp: 0, + EndTimestamp: math.MaxUint32, + QuorumNumbers: []uint8{0, 1}, + QuorumSplit: []byte{50, 50}, + }, nil) // append test name to each table name for an unique store table_names := []string{"reservations_server_" + testName, "ondemand_server_" + testName, "global_server_" + testName} err = meterer.CreateReservationTable(awsConfig, table_names[0]) @@ -730,7 +748,11 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal teardown() panic("failed to create offchain store") } - meterer := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) + mt := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) + err = mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background(), nil) + if err != nil { + panic("failed to make initial query to the on-chain state") + } ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ @@ -787,7 +809,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), mt, ratelimiter, rateConfig, testMaxBlobSize) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/test/integration_test.go b/test/integration_test.go index 7dde36695b..738dd60a92 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -210,18 +210,16 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser tx.On("GetCurrentBlockNumber").Return(uint64(100), nil) tx.On("GetQuorumCount").Return(1, nil) - minimumNumSymbols := uint32(128) - pricePerSymbol := uint32(1) - reservationLimit := uint64(1024) - paymentLimit := big.NewInt(512) - // this is disperser client's private key used in tests privateKey, err := crypto.HexToECDSA("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded") // Remove "0x" prefix if err != nil { panic("failed to convert hex to ECDSA") } publicKey := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() + mockState := &coremock.MockOnchainPaymentState{} + reservationLimit := uint64(1024) + paymentLimit := big.NewInt(512) mockState.On("GetActiveReservationByAccount", mock.Anything, mock.MatchedBy(func(account string) bool { return account == publicKey })).Return(core.ActiveReservation{SymbolsPerSec: reservationLimit, StartTimestamp: 0, EndTimestamp: math.MaxUint32, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, nil) @@ -232,11 +230,11 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser })).Return(core.OnDemandPayment{CumulativePayment: paymentLimit}, nil) mockState.On("GetOnDemandPaymentByAccount", mock.Anything, mock.Anything).Return(core.OnDemandPayment{}, errors.New("payment not found")) mockState.On("GetOnDemandQuorumNumbers", mock.Anything).Return([]uint8{0, 1}, nil) - mockState.On("GetMinNumSymbols", mock.Anything).Return(minimumNumSymbols, nil) - mockState.On("GetPricePerSymbol", mock.Anything).Return(pricePerSymbol, nil) - mockState.On("GetReservationWindow", mock.Anything).Return(uint32(60), nil) mockState.On("GetGlobalSymbolsPerSecond", mock.Anything).Return(uint64(1024), nil) - mockState.On("GetOnDemandQuorumNumbers", mock.Anything).Return([]uint8{0, 1}, nil) + mockState.On("GetPricePerSymbol", mock.Anything).Return(uint32(1), nil) + mockState.On("GetMinNumSymbols", mock.Anything).Return(uint32(128), nil) + mockState.On("GetReservationWindow", mock.Anything).Return(uint32(60), nil) + mockState.On("RefreshOnchainPaymentState", mock.Anything).Return(nil).Maybe() deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") if !deployLocalStack { @@ -293,8 +291,8 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser panic("failed to make initial query to the on-chain state") } - meterer := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger) - server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, meterer, ratelimiter, rateConfig, testMaxBlobSize) + mt := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger) + server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, mt, ratelimiter, rateConfig, testMaxBlobSize) return TestDisperser{ batcher: batcher, diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 22e960f4fb..43d9880ce2 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -33,15 +33,6 @@ func (m *MockDisperserClient) DisperseBlobAuthenticated( return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) } -func (m *MockDisperserClient) DispersePaidBlob( - ctx context.Context, - data []byte, - customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - - args := m.mock.Called(data, customQuorums) - return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) -} - func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { args := m.mock.Called(key) return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1)