Skip to content

Commit

Permalink
cmd/livepeer: Use price feed watcher for dynamic pricePerPixel (#2981)
Browse files Browse the repository at this point in the history
* eth/watchers: Create PriceFeed watcher

Makefile: Use mockgen binary from tool dependencies

eth/contracts: Add chainlink interfaces source

Makefile: Generate Chainlink contracts ABI

tools: Add abigen tool to repo

eth/contracts: Generate chainlink bindings

Makefile: Fix abigen bindings generation

Revert everything abigen

Turns out there's already bindings exported from the Chainlink lib.

go.mod: Add chainlink library

eth/watchers: Add pricefeed watcher

eth/watchers: Clean-up event watching code

eth/watchers: Improve price tracking

Revert "go.mod: Add chainlink library"

This reverts commit ac415bd.

Revert "Revert everything abigen"

This reverts commit b7c40b1.

eth/contracts: Gen bindings for proxy iface

eth/watchers: Use local bindings for contracts

eth/watchers: Simplify event subs logic

eth/watchers: Simplify&optimize truncated ticker

eth/watchers: Update decimals on fetch

eth/watchers: Improve handling of decimals

eth/watchers: Fix price rat creation

eth/watchers: Make sure we use UTC on truncated timer

eth/contracts/chainlink: Generate only V3 contract bindings

eth/watchers: Watch PriceFeed only with polling

eth/watchers: Add a retry logic on price update

eth/watchers: Use clog instead of fmt.Printf

* eth: Create separate pricefeed client unit

This will make the code more testable.

* eth: Add tests for pricefeed client

* eth/watchers: Add tests to the truncated ticker

Gosh that was much harder than I thought

* eth/watchers: Add tests for pricefeedwatcher

* eth: Add comments to the new components

* go fmt

* cmd: make pricePerUnit flags strings

* cmd: Allow price per unit to be speficied with a currency

Currently ignoring the currency value.

* cmd: Add logic to start price update loop

* cmd: Add flag for specifying price feed address

* cmd: Add a lil test to priceDataToWei

* TODO: Reminder for something I noticed is missing

* cmd/starter: Support currencies for custom broadcaster prices

* eth: Address minor review comments

* eth,eth/watchers: Improve pricefeed watcher interface

* eth/watchers: Fix pricefeed watcher after merge

* cmd,core,server: Support dynamic updates to price in USD

* eth/watchers: Remove truncated ticker tests

* eth/watchers: Finalize pricefeedwatcher docs/tests

* cmd: Address review comment

* core: Create tests for autoconvertedprice

* cmd,core: Move wei default to AutoConvertedPrice

* Address review comments

* cmd: Fix the e2e flow for setting/updating configs

* CHANGELOG

* cmd: Make sure pricePerPixel can be specified with e notation

Parse it directlty as a big.Rat from a raw string, like I was
doing for pricePerUnit in some places.

* Fix tests

Turns out tests were not running on my branch due to base branch

* go fmt

* core: Fix typo in comment

* cmd,server: Use 3 decimal points when logging PPP

Found out that's officially supported precision on the
discovery logic, so let's reflect that here.
  • Loading branch information
victorges authored Mar 27, 2024
1 parent ef5d789 commit 706ec33
Show file tree
Hide file tree
Showing 21 changed files with 976 additions and 264 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### General

- [#2981](https://github.com/livepeer/go-livepeer/pull/2981) Add support for prices in custom currencies like USD (@victorges)

#### Broadcaster

#### Orchestrator
Expand Down
11 changes: 6 additions & 5 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.SelectPriceExpFactor = flag.Float64("selectPriceExpFactor", *cfg.SelectPriceExpFactor, "Expresses how significant a small change of price is for the selection algorithm; default 100")
cfg.OrchPerfStatsURL = flag.String("orchPerfStatsUrl", *cfg.OrchPerfStatsURL, "URL of Orchestrator Performance Stream Tester")
cfg.Region = flag.String("region", *cfg.Region, "Region in which a broadcaster is deployed; used to select the region while using the orchestrator's performance stats")
cfg.MaxPricePerUnit = flag.Int("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")
cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")

// Transcoding:
Expand Down Expand Up @@ -171,11 +171,12 @@ func parseLivepeerConfig() starter.LivepeerConfig {
// Broadcaster deposit multiplier to determine max acceptable ticket faceValue
cfg.DepositMultiplier = flag.Int("depositMultiplier", *cfg.DepositMultiplier, "The deposit multiplier used to determine max acceptable faceValue for PM tickets")
// Orchestrator base pricing info
cfg.PricePerUnit = flag.Int("pricePerUnit", 0, "The price per 'pixelsPerUnit' amount pixels")
// Unit of pixels for both O's basePriceInfo and B's MaxBroadcastPrice
cfg.PixelsPerUnit = flag.Int("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")
cfg.PricePerUnit = flag.String("pricePerUnit", "0", "The price per 'pixelsPerUnit' amount pixels. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
// Unit of pixels for both O's pricePerUnit and B's maxPricePerUnit
cfg.PixelsPerUnit = flag.String("pixelsPerUnit", *cfg.PixelsPerUnit, "Amount of pixels per unit. Set to '> 1' to have smaller price granularity than 1 wei / pixel")
cfg.PriceFeedAddr = flag.String("priceFeedAddr", *cfg.PriceFeedAddr, "ETH address of the Chainlink price feed contract. Used for custom currencies conversion on -pricePerUnit or -maxPricePerUnit")
cfg.AutoAdjustPrice = flag.Bool("autoAdjustPrice", *cfg.AutoAdjustPrice, "Enable/disable automatic price adjustments based on the overhead for redeeming tickets")
cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1},{"ethaddress":"address2","priceperunit":1200,"pixelsperunit":1}]}`)
cfg.PricePerBroadcaster = flag.String("pricePerBroadcaster", *cfg.PricePerBroadcaster, `json list of price per broadcaster or path to json config file. Example: {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1000000000000},{"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":1000000000000}]}`)
// Interval to poll for blocks
cfg.BlockPollingInterval = flag.Int("blockPollingInterval", *cfg.BlockPollingInterval, "Interval in seconds at which different blockchain event services poll for blocks")
// Redemption service
Expand Down
163 changes: 128 additions & 35 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"os/user"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/blockwatch"
"github.com/livepeer/go-livepeer/eth/watchers"
"github.com/livepeer/go-livepeer/monitor"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/server"
Expand Down Expand Up @@ -95,7 +97,7 @@ type LivepeerConfig struct {
SelectPriceExpFactor *float64
OrchPerfStatsURL *string
Region *string
MaxPricePerUnit *int
MaxPricePerUnit *string
MinPerfScore *float64
MaxSessions *string
CurrentManifest *bool
Expand All @@ -118,8 +120,9 @@ type LivepeerConfig struct {
MaxTicketEV *string
MaxTotalEV *string
DepositMultiplier *int
PricePerUnit *int
PixelsPerUnit *int
PricePerUnit *string
PixelsPerUnit *string
PriceFeedAddr *string
AutoAdjustPrice *bool
PricePerBroadcaster *string
BlockPollingInterval *int
Expand Down Expand Up @@ -192,8 +195,9 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxTicketEV := "3000000000000"
defaultMaxTotalEV := "20000000000000"
defaultDepositMultiplier := 1
defaultMaxPricePerUnit := 0
defaultPixelsPerUnit := 1
defaultMaxPricePerUnit := "0"
defaultPixelsPerUnit := "1"
defaultPriceFeedAddr := "0x639Fe6ab55C921f74e7fac1ee960C0B6293ba612" // ETH / USD price feed address on Arbitrum Mainnet
defaultAutoAdjustPrice := true
defaultPricePerBroadcaster := ""
defaultBlockPollingInterval := 5
Expand Down Expand Up @@ -278,6 +282,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
DepositMultiplier: &defaultDepositMultiplier,
MaxPricePerUnit: &defaultMaxPricePerUnit,
PixelsPerUnit: &defaultPixelsPerUnit,
PriceFeedAddr: &defaultPriceFeedAddr,
AutoAdjustPrice: &defaultAutoAdjustPrice,
PricePerBroadcaster: &defaultPricePerBroadcaster,
BlockPollingInterval: &defaultBlockPollingInterval,
Expand Down Expand Up @@ -712,6 +717,13 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
go serviceRegistryWatcher.Watch()
defer serviceRegistryWatcher.Stop()

core.PriceFeedWatcher, err = watchers.NewPriceFeedWatcher(backend, *cfg.PriceFeedAddr)
// The price feed watch loop is started on demand on first subscribe.
if err != nil {
glog.Errorf("Failed to set up price feed watcher: %v", err)
return
}

n.Balances = core.NewAddressBalances(cleanupInterval)
defer n.Balances.StopCleanup()

Expand All @@ -733,27 +745,44 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

if *cfg.Orchestrator {
// Set price per pixel base info
if *cfg.PixelsPerUnit <= 0 {
pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))
}
if pixelsPerUnit.Sign() <= 0 {
// Can't divide by 0
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %d", *cfg.PixelsPerUnit))
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))
}
if cfg.PricePerUnit == nil {
// Prevent orchestrators from unknowingly providing free transcoding
panic(fmt.Errorf("-pricePerUnit must be set"))
}
if *cfg.PricePerUnit < 0 {
panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %d", *cfg.PricePerUnit))
pricePerUnit, currency, err := parsePricePerUnit(*cfg.PricePerUnit)
if err != nil {
panic(fmt.Errorf("-pricePerUnit must be a valid integer with an optional currency, provided %v", *cfg.PricePerUnit))
} else if pricePerUnit.Sign() < 0 {
panic(fmt.Errorf("-pricePerUnit must be >= 0, provided %s", pricePerUnit))
}
pricePerPixel := new(big.Rat).Quo(pricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel\n ", price.FloatString(3))
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))
}
n.SetBasePrice("default", big.NewRat(int64(*cfg.PricePerUnit), int64(*cfg.PixelsPerUnit)))
glog.Infof("Price: %d wei for %d pixels\n ", *cfg.PricePerUnit, *cfg.PixelsPerUnit)

if *cfg.PricePerBroadcaster != "" {
ppb := getBroadcasterPrices(*cfg.PricePerBroadcaster)
for _, p := range ppb {
price := big.NewRat(p.PricePerUnit, p.PixelsPerUnit)
n.SetBasePrice(p.EthAddress, price)
glog.Infof("Price: %v set for broadcaster %v", price.RatString(), p.EthAddress)
n.SetBasePrice("default", autoPrice)

broadcasterPrices := getBroadcasterPrices(*cfg.PricePerBroadcaster)
for _, p := range broadcasterPrices {
p := p
pricePerPixel := new(big.Rat).Quo(p.PricePerUnit, p.PixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(p.Currency, pricePerPixel, func(price *big.Rat) {
glog.Infof("Price: %v wei per pixel for broadcaster %v", price.FloatString(3), p.EthAddress)
})
if err != nil {
panic(fmt.Errorf("Error converting price for broadcaster %s: %v", p.EthAddress, err))
}
n.SetBasePrice(p.EthAddress, autoPrice)
}

n.AutoSessionLimit = *cfg.MaxSessions == "auto"
Expand Down Expand Up @@ -850,12 +879,30 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

n.Sender = pm.NewSender(n.Eth, timeWatcher, senderWatcher, maxEV, maxTotalEV, *cfg.DepositMultiplier)

if *cfg.PixelsPerUnit <= 0 {
pixelsPerUnit, ok := new(big.Rat).SetString(*cfg.PixelsPerUnit)
if !ok || !pixelsPerUnit.IsInt() {
panic(fmt.Errorf("-pixelsPerUnit must be a valid integer, provided %v", *cfg.PixelsPerUnit))
}
if pixelsPerUnit.Sign() <= 0 {
// Can't divide by 0
panic(fmt.Errorf("The amount of pixels per unit must be greater than 0, provided %d instead\n", *cfg.PixelsPerUnit))
panic(fmt.Errorf("-pixelsPerUnit must be > 0, provided %v", *cfg.PixelsPerUnit))
}
maxPricePerUnit, currency, err := parsePricePerUnit(*cfg.MaxPricePerUnit)
if err != nil {
panic(fmt.Errorf("The maximum price per unit must be a valid integer with an optional currency, provided %v instead\n", *cfg.MaxPricePerUnit))
}
if *cfg.MaxPricePerUnit > 0 {
server.BroadcastCfg.SetMaxPrice(big.NewRat(int64(*cfg.MaxPricePerUnit), int64(*cfg.PixelsPerUnit)))
if maxPricePerUnit.Sign() > 0 {
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxTranscodingPrice(price)
}
glog.Infof("Maximum transcoding price: %v wei per pixel\n ", price.FloatString(3))
})
if err != nil {
panic(fmt.Errorf("Error converting price: %v", err))
}
server.BroadcastCfg.SetMaxPrice(autoPrice)
} else {
glog.Infof("Maximum transcoding price per pixel is not greater than 0: %v, broadcaster is currently set to accept ANY price.\n", *cfg.MaxPricePerUnit)
glog.Infoln("To update the broadcaster's maximum acceptable transcoding price per pixel, use the CLI or restart the broadcaster with the appropriate 'maxPricePerUnit' and 'pixelsPerUnit' values")
Expand Down Expand Up @@ -1420,29 +1467,59 @@ func checkOrStoreChainID(dbh *common.DB, chainID *big.Int) error {
return nil
}

// Format of broadcasterPrices json
// {"broadcasters":[{"ethaddress":"address1","priceperunit":1000,"pixelsperunit":1}, {"ethaddress":"address2","priceperunit":2000,"pixelsperunit":3}]}
type BroadcasterPrices struct {
Prices []BroadcasterPrice `json:"broadcasters"`
}

type BroadcasterPrice struct {
EthAddress string `json:"ethaddress"`
PricePerUnit int64 `json:"priceperunit"`
PixelsPerUnit int64 `json:"pixelsperunit"`
EthAddress string
PricePerUnit *big.Rat
Currency string
PixelsPerUnit *big.Rat
}

func getBroadcasterPrices(broadcasterPrices string) []BroadcasterPrice {
var pricesSet BroadcasterPrices
prices, _ := common.ReadFromFile(broadcasterPrices)
if broadcasterPrices == "" {
return nil
}

// Format of broadcasterPrices json
// {"broadcasters":[{"ethaddress":"address1","priceperunit":0.5,"currency":"USD","pixelsperunit":1}, {"ethaddress":"address2","priceperunit":0.3,"currency":"USD","pixelsperunit":3}]}
var pricesSet struct {
Broadcasters []struct {
EthAddress string `json:"ethaddress"`
// The fields below are specified as a number in the JSON, but we don't want to lose precision so we store the raw characters here and parse as a big.Rat.
// This also allows support for exponential notation for numbers, which is helpful for pricePerUnit which could be a value like 1e12.
PixelsPerUnit json.RawMessage `json:"pixelsperunit"`
PricePerUnit json.RawMessage `json:"priceperunit"`
Currency string `json:"currency"`
} `json:"broadcasters"`
}
pricesFileContent, _ := common.ReadFromFile(broadcasterPrices)

err := json.Unmarshal([]byte(prices), &pricesSet)
err := json.Unmarshal([]byte(pricesFileContent), &pricesSet)
if err != nil {
glog.Errorf("broadcaster prices could not be parsed: %s", err)
return nil
}

return pricesSet.Prices
prices := make([]BroadcasterPrice, len(pricesSet.Broadcasters))
for i, p := range pricesSet.Broadcasters {
pixelsPerUnit, ok := new(big.Rat).SetString(string(p.PixelsPerUnit))
if !ok {
glog.Errorf("Pixels per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PixelsPerUnit)
continue
}
pricePerUnit, ok := new(big.Rat).SetString(string(p.PricePerUnit))
if !ok {
glog.Errorf("Price per unit could not be parsed for broadcaster %v. must be a valid number, provided %s", p.EthAddress, p.PricePerUnit)
continue
}
prices[i] = BroadcasterPrice{
EthAddress: p.EthAddress,
Currency: p.Currency,
PricePerUnit: pricePerUnit,
PixelsPerUnit: pixelsPerUnit,
}
}

return prices
}

func createSelectionAlgorithm(cfg LivepeerConfig) (common.SelectionAlgorithm, error) {
Expand Down Expand Up @@ -1494,6 +1571,22 @@ func parseEthKeystorePath(ethKeystorePath string) (keystorePath, error) {
return keystore, nil
}

func parsePricePerUnit(pricePerUnitStr string) (*big.Rat, string, error) {
pricePerUnitRex := regexp.MustCompile(`^(\d+(\.\d+)?)([A-z][A-z0-9]*)?$`)
match := pricePerUnitRex.FindStringSubmatch(pricePerUnitStr)
if match == nil {
return nil, "", fmt.Errorf("price must be in the format of <price><currency>, provided %v", pricePerUnitStr)
}
price, currency := match[1], match[3]

pricePerUnit, ok := new(big.Rat).SetString(price)
if !ok {
return nil, "", fmt.Errorf("price must be a valid number, provided %v", match[1])
}

return pricePerUnit, currency, nil
}

func refreshOrchPerfScoreLoop(ctx context.Context, region string, orchPerfScoreURL string, score *common.PerfScore) {
for {
refreshOrchPerfScore(region, orchPerfScoreURL, score)
Expand Down
92 changes: 90 additions & 2 deletions cmd/livepeer/starter/starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func TestParseGetBroadcasterPrices(t *testing.T) {
assert.NotNil(prices)
assert.Equal(2, len(prices))

price1 := big.NewRat(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := big.NewRat(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
price1 := new(big.Rat).Quo(prices[0].PricePerUnit, prices[0].PixelsPerUnit)
price2 := new(big.Rat).Quo(prices[1].PricePerUnit, prices[1].PixelsPerUnit)
assert.Equal(big.NewRat(1000, 1), price1)
assert.Equal(big.NewRat(2000, 3), price2)
}
Expand Down Expand Up @@ -295,3 +295,91 @@ func TestUpdatePerfScore(t *testing.T) {
}
require.Equal(t, expScores, scores.Scores)
}

func TestParsePricePerUnit(t *testing.T) {
tests := []struct {
name string
pricePerUnitStr string
expectedPrice *big.Rat
expectedCurrency string
expectError bool
}{
{
name: "Valid input with integer price",
pricePerUnitStr: "100USD",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "USD",
expectError: false,
},
{
name: "Valid input with fractional price",
pricePerUnitStr: "0.13USD",
expectedPrice: big.NewRat(13, 100),
expectedCurrency: "USD",
expectError: false,
},
{
name: "Valid input with decimal price",
pricePerUnitStr: "99.99EUR",
expectedPrice: big.NewRat(9999, 100),
expectedCurrency: "EUR",
expectError: false,
},
{
name: "Lower case currency",
pricePerUnitStr: "99.99eur",
expectedPrice: big.NewRat(9999, 100),
expectedCurrency: "eur",
expectError: false,
},
{
name: "Currency with numbers",
pricePerUnitStr: "420DOG3",
expectedPrice: big.NewRat(420, 1),
expectedCurrency: "DOG3",
expectError: false,
},
{
name: "No specified currency, empty currency",
pricePerUnitStr: "100",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "",
expectError: false,
},
{
name: "Explicit wei currency",
pricePerUnitStr: "100wei",
expectedPrice: big.NewRat(100, 1),
expectedCurrency: "wei",
expectError: false,
},
{
name: "Invalid number",
pricePerUnitStr: "abcUSD",
expectedPrice: nil,
expectedCurrency: "",
expectError: true,
},
{
name: "Negative price",
pricePerUnitStr: "-100USD",
expectedPrice: nil,
expectedCurrency: "",
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
price, currency, err := parsePricePerUnit(tt.pricePerUnitStr)

if tt.expectError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.True(t, tt.expectedPrice.Cmp(price) == 0)
assert.Equal(t, tt.expectedCurrency, currency)
}
})
}
}
Loading

0 comments on commit 706ec33

Please sign in to comment.