Skip to content

Commit

Permalink
eth: Create separate pricefeed client unit
Browse files Browse the repository at this point in the history
This will make the code more testable.
  • Loading branch information
victorges committed Mar 14, 2024
1 parent 87d9992 commit ae07a11
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 90 deletions.
107 changes: 107 additions & 0 deletions eth/pricefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package eth

import (
"context"
"errors"
"fmt"
"math/big"
"regexp"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/livepeer/go-livepeer/eth/contracts/chainlink"
)

type PriceData struct {
RoundID int64
Price *big.Rat
UpdatedAt time.Time
}

type PriceFeedEthClient interface {
Description() (string, error)
FetchPriceData() (PriceData, error)
}

func NewPriceFeedEthClient(ctx context.Context, rpcUrl, priceFeedAddr string) (PriceFeedEthClient, error) {
client, err := ethclient.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize client: %w", err)

Check warning on line 31 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L28-L31

Added lines #L28 - L31 were not covered by tests
}

ok := isContractAddress(priceFeedAddr, client)
if !ok {
return nil, fmt.Errorf("not a contract address: %s", priceFeedAddr)

Check warning on line 36 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L34-L36

Added lines #L34 - L36 were not covered by tests
}

addr := common.HexToAddress(priceFeedAddr)
priceFeed, err := chainlink.NewAggregatorV3Interface(addr, client)
if err != nil {
return nil, fmt.Errorf("failed to create mock aggregator proxy: %w", err)

Check warning on line 42 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L39-L42

Added lines #L39 - L42 were not covered by tests
}

return &priceFeedClient{
client: client,
priceFeed: priceFeed,
}, nil

Check warning on line 48 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L45-L48

Added lines #L45 - L48 were not covered by tests
}

type priceFeedClient struct {
client *ethclient.Client
priceFeed *chainlink.AggregatorV3Interface
}

func (c *priceFeedClient) Description() (string, error) {
return c.priceFeed.Description(&bind.CallOpts{})

Check warning on line 57 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}

func (c *priceFeedClient) FetchPriceData() (PriceData, error) {
data, err := c.priceFeed.LatestRoundData(&bind.CallOpts{})
if err != nil {
return PriceData{}, errors.New("failed to get latest round data: " + err.Error())

Check warning on line 63 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L60-L63

Added lines #L60 - L63 were not covered by tests
}

decimals, err := c.priceFeed.Decimals(&bind.CallOpts{})
if err != nil {
return PriceData{}, errors.New("failed to get decimals: " + err.Error())

Check warning on line 68 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L66-L68

Added lines #L66 - L68 were not covered by tests
}

return computePriceData(data.RoundId, data.UpdatedAt, data.Answer, decimals), nil

Check warning on line 71 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L71

Added line #L71 was not covered by tests
}

func computePriceData(roundID, updatedAt, answer *big.Int, decimals uint8) PriceData {

Check warning on line 74 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L74

Added line #L74 was not covered by tests
// Compute a big.int which is 10^decimals.
divisor := new(big.Int).Exp(
big.NewInt(10),
big.NewInt(int64(decimals)),
nil)

Check warning on line 79 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L76-L79

Added lines #L76 - L79 were not covered by tests

return PriceData{
RoundID: roundID.Int64(),
Price: new(big.Rat).SetFrac(answer, divisor),
UpdatedAt: time.Unix(updatedAt.Int64(), 0),

Check warning on line 84 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L81-L84

Added lines #L81 - L84 were not covered by tests
}
}

func isContractAddress(addr string, client *ethclient.Client) bool {
if len(addr) == 0 {
return false

Check warning on line 90 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L88-L90

Added lines #L88 - L90 were not covered by tests
}

// Ensure it is an Ethereum address: 0x followed by 40 hexadecimal characters.
re := regexp.MustCompile("^0x[0-9a-fA-F]{40}$")
if !re.MatchString(addr) {
return false

Check warning on line 96 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L94-L96

Added lines #L94 - L96 were not covered by tests
}

// Ensure it is a contract address.
address := common.HexToAddress(addr)
bytecode, err := client.CodeAt(context.Background(), address, nil) // nil is latest block
if err != nil {
return false

Check warning on line 103 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L100-L103

Added lines #L100 - L103 were not covered by tests
}
isContract := len(bytecode) > 0
return isContract

Check warning on line 106 in eth/pricefeed.go

View check run for this annotation

Codecov / codecov/patch

eth/pricefeed.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
121 changes: 31 additions & 90 deletions eth/watchers/pricefeedwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ package watchers
import (
"context"
"fmt"
"math/big"
"regexp"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/eth/contracts/chainlink"
"github.com/livepeer/go-livepeer/eth"
)

const (
Expand All @@ -23,41 +18,25 @@ const (
type PriceFeedWatcher struct {
ctx context.Context

client *ethclient.Client
proxy *chainlink.AggregatorV3Interface

updatePeriod time.Duration
priceFeed eth.PriceFeedEthClient
currencyBase, currencyQuote string

current PriceData
priceUpdated chan PriceData
}

type PriceData struct {
RoundId int64
Price *big.Rat
UpdatedAt time.Time
current eth.PriceData
priceUpdated chan eth.PriceData
}

func NewPriceFeedWatcher(ctx context.Context, rpcUrl, proxyAddrStr string) (*PriceFeedWatcher, error) {
// Initialize client instance using the rpcUrl.
client, err := ethclient.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to initialize client: %w", err)
func NewPriceFeedWatcher(ctx context.Context, rpcUrl, priceFeedAddr string, updatePeriod time.Duration) (*PriceFeedWatcher, error) {
if updatePeriod <= 0 {
updatePeriod = 1 * time.Hour

Check warning on line 31 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L29-L31

Added lines #L29 - L31 were not covered by tests
}

// Test if it is a contract address.
ok := isContractAddress(proxyAddrStr, client)
if !ok {
return nil, fmt.Errorf("not a contract address: %s", proxyAddrStr)
}

proxyAddr := common.HexToAddress(proxyAddrStr)
proxy, err := chainlink.NewAggregatorV3Interface(proxyAddr, client)
priceFeed, err := eth.NewPriceFeedEthClient(ctx, rpcUrl, priceFeedAddr)
if err != nil {
return nil, fmt.Errorf("failed to create mock aggregator proxy: %w", err)
return nil, fmt.Errorf("failed to create price feed client: %w", err)

Check warning on line 36 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L34-L36

Added lines #L34 - L36 were not covered by tests
}

description, err := proxy.Description(&bind.CallOpts{})
description, err := priceFeed.Description()
if err != nil {
return nil, fmt.Errorf("failed to get description: %w", err)

Check warning on line 41 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L39-L41

Added lines #L39 - L41 were not covered by tests
}
Expand All @@ -69,14 +48,14 @@ func NewPriceFeedWatcher(ctx context.Context, rpcUrl, proxyAddrStr string) (*Pri

w := &PriceFeedWatcher{
ctx: ctx,
client: client,
proxy: proxy,
updatePeriod: updatePeriod,
priceFeed: priceFeed,
currencyBase: currencyFrom,
currencyQuote: currencyTo,
priceUpdated: make(chan PriceData, 1),
priceUpdated: make(chan eth.PriceData, 1),

Check warning on line 55 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L49-L55

Added lines #L49 - L55 were not covered by tests
}

err = w.fetchPrice()
err = w.updatePrice()
if err != nil {
return nil, fmt.Errorf("failed to update price: %w", err)

Check warning on line 60 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L58-L60

Added lines #L58 - L60 were not covered by tests
}
Expand All @@ -91,40 +70,18 @@ func (w *PriceFeedWatcher) Currencies() (base string, quote string) {
return w.currencyBase, w.currencyQuote

Check warning on line 70 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

func (w *PriceFeedWatcher) Current() PriceData {
func (w *PriceFeedWatcher) Current() eth.PriceData {
return w.current

Check warning on line 74 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

func (w *PriceFeedWatcher) PriceUpdated() <-chan PriceData {
func (w *PriceFeedWatcher) PriceUpdated() <-chan eth.PriceData {
return w.priceUpdated

Check warning on line 78 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

func (w *PriceFeedWatcher) fetchPrice() error {
roundData, err := w.proxy.LatestRoundData(&bind.CallOpts{})
func (w *PriceFeedWatcher) updatePrice() error {
newPrice, err := w.priceFeed.FetchPriceData()
if err != nil {
return fmt.Errorf("failed to get latest round data: %w", err)
}

decimals, err := w.proxy.Decimals(&bind.CallOpts{})
if err != nil {
return fmt.Errorf("failed to get decimals: %w", err)
}

w.updatePrice(roundData.Answer, decimals, roundData.RoundId, roundData.UpdatedAt)
return nil
}

func (w *PriceFeedWatcher) updatePrice(current *big.Int, decimals uint8, roundId, updatedAt *big.Int) {
// Compute a big.int which is 10^decimals.
divisor := new(big.Int).Exp(
big.NewInt(10),
big.NewInt(int64(decimals)),
nil)

newPrice := PriceData{
RoundId: roundId.Int64(),
Price: new(big.Rat).SetFrac(current, divisor),
UpdatedAt: time.Unix(updatedAt.Int64(), 0),
return fmt.Errorf("failed to fetch price data: %w", err)

Check warning on line 84 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L81-L84

Added lines #L81 - L84 were not covered by tests
}

if newPrice.UpdatedAt.After(w.current.UpdatedAt) {
Expand All @@ -134,54 +91,38 @@ func (w *PriceFeedWatcher) updatePrice(current *big.Int, decimals uint8, roundId
default:

Check warning on line 91 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L87-L91

Added lines #L87 - L91 were not covered by tests
}
}

return nil

Check warning on line 95 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L95

Added line #L95 was not covered by tests
}

func (w *PriceFeedWatcher) watch() {
ctx, cancel := context.WithCancel(w.ctx)
defer cancel()
ticker := newTruncatedTicker(ctx, 1*time.Hour)
ticker := newTruncatedTicker(ctx, w.updatePeriod)

Check warning on line 101 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L98-L101

Added lines #L98 - L101 were not covered by tests

for {
select {
case <-w.ctx.Done():
return
case <-ticker:
retryDelay := priceUpdateBaseRetryDelay
for attempt := 1; attempt <= priceUpdateMaxRetries; attempt++ {
err := w.fetchPrice()
attempt, retryDelay := 1, priceUpdateBaseRetryDelay
for {
err := w.updatePrice()
if err == nil {
break
} else if attempt >= priceUpdateMaxRetries {
clog.Errorf(ctx, "Failed to fetch updated price from PriceFeed attempts=%d err=%q", attempt, err)
break

Check warning on line 115 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L103-L115

Added lines #L103 - L115 were not covered by tests
}

clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%v", retryDelay, attempt, err)
clog.Warningf(ctx, "Failed to fetch updated price from PriceFeed, retrying after retryDelay=%d attempt=%d err=%q", retryDelay, attempt, err)
time.Sleep(retryDelay)
retryDelay *= 2
attempt, retryDelay = attempt+1, retryDelay*2

Check warning on line 120 in eth/watchers/pricefeedwatcher.go

View check run for this annotation

Codecov / codecov/patch

eth/watchers/pricefeedwatcher.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}
}
}
}

func isContractAddress(addr string, client *ethclient.Client) bool {
if len(addr) == 0 {
return false
}

// Ensure it is an Ethereum address: 0x followed by 40 hexadecimal characters.
re := regexp.MustCompile("^0x[0-9a-fA-F]{40}$")
if !re.MatchString(addr) {
return false
}

// Ensure it is a contract address.
address := common.HexToAddress(addr)
bytecode, err := client.CodeAt(context.Background(), address, nil) // nil is latest block
if err != nil {
return false
}
isContract := len(bytecode) > 0
return isContract
}

func parseCurrencies(description string) (currencyBase string, currencyQuote string, err error) {
currencies := strings.Split(description, "/")
if len(currencies) != 2 {
Expand Down

0 comments on commit ae07a11

Please sign in to comment.