diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index ba1f2cfc65..06374ce292 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -3,6 +3,7 @@ package meterer import ( "context" "fmt" + "log" "slices" "time" @@ -71,6 +72,7 @@ func (m *Meterer) Start(ctx context.Context) { // MeterRequest validates a blob header and adds it to the meterer's state // TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare) func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, numSymbols uint, quorumNumbers []uint8) error { + log.Println(m) // Validate against the payment method if header.CumulativePayment.Sign() == 0 { reservation, err := m.ChainPaymentState.GetActiveReservationByAccount(ctx, header.AccountID) diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index a07548c3f5..44e0b8c50f 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -31,9 +31,10 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl if req.GetBlobHeader().GetPaymentHeader() != nil { binIndex := req.GetBlobHeader().GetPaymentHeader().GetBinIndex() cumulativePayment := new(big.Int).SetBytes(req.GetBlobHeader().GetPaymentHeader().GetCumulativePayment()) + accountID := req.GetBlobHeader().GetPaymentHeader().GetAccountId() paymentHeader := core.PaymentMetadata{ - AccountID: req.GetBlobHeader().GetPaymentHeader().GetAccountId(), + AccountID: accountID, BinIndex: binIndex, CumulativePayment: cumulativePayment, } diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index 9b6cb56998..76b46415b9 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -57,6 +57,7 @@ func NewDispersalServerV2( blobMetadataStore *blobstore.BlobMetadataStore, chainReader core.Reader, ratelimiter common.RateLimiter, + meterer *meterer.Meterer, authenticator corev2.BlobRequestAuthenticator, prover encoding.Prover, maxNumSymbolsPerBlob uint64, @@ -72,6 +73,7 @@ func NewDispersalServerV2( chainReader: chainReader, authenticator: authenticator, + meterer: meterer, prover: prover, logger: logger, diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 5ef2fe8dac..2d83c5c21d 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -14,6 +14,7 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/s3" "github.com/Layr-Labs/eigenda/core" auth "github.com/Layr-Labs/eigenda/core/auth/v2" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/apiserver" @@ -430,6 +431,52 @@ func newTestServerV2(t *testing.T) *testComponents { chainReader := &mock.MockWriter{} rateConfig := apiserver.RateConfig{} + // append test name to each table name for an unique store + mockState := &mock.MockOnchainPaymentState{} + mockState.On("RefreshOnchainPaymentState", tmock.Anything).Return(nil).Maybe() + mockState.On("GetReservationWindow", tmock.Anything).Return(uint32(1), nil) + mockState.On("GetPricePerSymbol", tmock.Anything).Return(uint32(2), nil) + mockState.On("GetGlobalSymbolsPerSecond", tmock.Anything).Return(uint64(1009), nil) + mockState.On("GetMinNumSymbols", tmock.Anything).Return(uint32(3), nil) + + now := uint64(time.Now().Unix()) + mockState.On("GetActiveReservationByAccount", tmock.Anything, tmock.Anything).Return(core.ActiveReservation{SymbolsPerSec: 100, StartTimestamp: now + 1200, EndTimestamp: now + 1800, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, nil) + mockState.On("GetOnDemandPaymentByAccount", tmock.Anything, tmock.Anything).Return(core.OnDemandPayment{CumulativePayment: big.NewInt(3864)}, nil) + mockState.On("GetOnDemandQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) + + if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + panic("failed to make initial query to the on-chain state") + } + table_names := []string{"reservations_server_" + t.Name(), "ondemand_server_" + t.Name(), "global_server_" + t.Name()} + err = meterer.CreateReservationTable(awsConfig, table_names[0]) + if err != nil { + teardown() + panic("failed to create reservation table") + } + err = meterer.CreateOnDemandTable(awsConfig, table_names[1]) + if err != nil { + teardown() + panic("failed to create ondemand table") + } + err = meterer.CreateGlobalReservationTable(awsConfig, table_names[2]) + if err != nil { + teardown() + panic("failed to create global reservation table") + } + + store, err := meterer.NewOffchainStore( + awsConfig, + table_names[0], + table_names[1], + table_names[2], + logger, + ) + if err != nil { + teardown() + panic("failed to create offchain store") + } + meterer := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) + chainReader.On("GetCurrentBlockNumber").Return(uint32(100), nil) chainReader.On("GetQuorumCount").Return(uint8(2), nil) chainReader.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) @@ -439,7 +486,7 @@ func newTestServerV2(t *testing.T) *testComponents { s := apiserver.NewDispersalServerV2(disperser.ServerConfig{ GrpcPort: "51002", GrpcTimeout: 1 * time.Second, - }, rateConfig, blobStore, blobMetadataStore, chainReader, nil, auth.NewAuthenticator(), prover, 10, time.Hour, logger) + }, rateConfig, blobStore, blobMetadataStore, chainReader, nil, meterer, auth.NewAuthenticator(), prover, 100, time.Hour, logger) err = s.RefreshOnchainState(context.Background()) assert.NoError(t, err) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 0be93b7ab6..07ff7936cc 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -176,6 +176,7 @@ func RunDisperserServer(ctx *cli.Context) error { blobMetadataStore, transactor, ratelimiter, + meterer, authv2.NewAuthenticator(), prover, uint64(config.MaxNumSymbolsPerBlob),