Skip to content

Commit

Permalink
chore: backport #705 (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
aljo242 authored Dec 11, 2024
1 parent fcccb1d commit f73a9b0
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 111 deletions.
9 changes: 9 additions & 0 deletions oracle/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type APIConfig struct {

// Name is the name of the provider that corresponds to this config.
Name string `json:"name"`

// MaxBlockHeightAge is the oldest an update from an on-chain data source can be without having its
// block height incremented. In the case where a data source has exceeded this limit and the block
// height is not increasing, price reporting will be skipped until the block height increases.
MaxBlockHeightAge time.Duration `json:"maxBlockHeightAge"`
}

// Endpoint holds all data necessary for an API provider to connect to a given endpoint
Expand Down Expand Up @@ -123,5 +128,9 @@ func (c *APIConfig) ValidateBasic() error {
}
}

if c.MaxBlockHeightAge < 0 {
return fmt.Errorf("max_block_height_age cannot be negative")
}

return nil
}
30 changes: 30 additions & 0 deletions oracle/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,36 @@ func TestAPIConfig(t *testing.T) {
},
expectedErr: false,
},
{
name: "good config with max_block_height_age",
config: config.APIConfig{
Enabled: true,
Timeout: time.Second,
Interval: time.Second,
ReconnectTimeout: time.Second,
MaxQueries: 1,
Name: "test",
Endpoints: []config.Endpoint{{URL: "http://test.com"}},
BatchSize: 1,
MaxBlockHeightAge: 10 * time.Second,
},
expectedErr: false,
},
{
name: "bad config with negative max_block_height_age",
config: config.APIConfig{
Enabled: true,
Timeout: time.Second,
Interval: time.Second,
ReconnectTimeout: time.Second,
MaxQueries: 1,
Name: "test",
Endpoints: []config.Endpoint{{URL: "http://test.com"}},
BatchSize: 1,
MaxBlockHeightAge: -10 * time.Second,
},
expectedErr: true,
},
{
name: "bad config with invalid endpoint (no url)",
config: config.APIConfig{
Expand Down
75 changes: 52 additions & 23 deletions providers/apis/defi/ethmulticlient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sync"

"github.com/skip-mev/slinky/providers/apis/defi/types"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"go.uber.org/zap"
Expand All @@ -23,6 +25,8 @@ type MultiRPCClient struct {

// underlying clients
clients []EVMClient

blockAgeChecker types.BlockAgeChecker
}

// NewMultiRPCClient returns a new MultiRPCClient.
Expand All @@ -32,9 +36,10 @@ func NewMultiRPCClient(
clients []EVMClient,
) EVMClient {
return &MultiRPCClient{
logger: logger,
clients: clients,
api: api,
logger: logger,
clients: clients,
api: api,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}
}

Expand Down Expand Up @@ -81,12 +86,20 @@ func NewMultiRPCClientFromEndpoints(
}

return &MultiRPCClient{
logger: logger.With(zap.String("multi_client", api.Name)),
api: api,
clients: clients,
logger: logger.With(zap.String("multi_client", api.Name)),
api: api,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}, nil
}

// define a result struct that go routines will populate and append to a slice when they complete their request.
type result struct {
height uint64
results []rpc.BatchElem
err error
}

// BatchCallContext injects a call to eth_blockNumber, and makes batch calls to the underlying EVMClients.
// It returns the response that has the greatest height from the eth_blockNumber call. An error is returned
// only when no client was able to successfully provide a height or errored when sending the BatchCall.
Expand All @@ -95,15 +108,9 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
m.logger.Debug("BatchCallContext called with 0 elems")
return nil
}
// define a result struct that go routines will populate and append to a slice when they complete their request.
type result struct {
height uint64
results []rpc.BatchElem
}

results := make([]result, len(m.clients))

// error slice to capture errors go routines encounter.
errs := make([]error, len(m.clients))
wg := new(sync.WaitGroup)
// this is the index of where we will have an eth_blockNumber call.
blockNumReqIndex := len(batchElems)
Expand All @@ -124,7 +131,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
// if there was an error, or if the block_num request didn't have result / errored
// we log the error and append to error slice.
if err != nil || req[blockNumReqIndex].Result == "" || req[blockNumReqIndex].Error != nil {
errs[i] = fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error)
resultErr := fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error)
results[i] = result{0, nil, resultErr}
m.logger.Debug(
"endpoint request failed",
zap.Error(err),
Expand All @@ -138,7 +146,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
// try to get the block number.
r, ok := req[blockNumReqIndex].Result.(*string)
if !ok {
errs[i] = fmt.Errorf("result from eth_blockNumber was not a string")
resultErr := fmt.Errorf("result from eth_blockNumber was not a string")
results[i] = result{0, nil, resultErr}
m.logger.Debug(
"result from eth_blockNumber was not a string",
zap.String("url", url),
Expand All @@ -149,7 +158,8 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
// decode the new height
height, err := hexutil.DecodeUint64(*r)
if err != nil { // if we can't decode the height, log an error.
errs[i] = fmt.Errorf("could not decode hex eth height: %w", err)
resultErr := fmt.Errorf("could not decode hex eth height: %w", err)
results[i] = result{0, nil, resultErr}
m.logger.Debug(
"could not decode hex eth height",
zap.String("url", url),
Expand All @@ -163,17 +173,31 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
zap.String("url", url),
)
// append the results, minus the appended eth_blockNumber request.
results[i] = result{height, req[:blockNumReqIndex]}
results[i] = result{height, req[:blockNumReqIndex], nil}
}(clientIdx)
}
wg.Wait()

filtered, err := m.filterResponses(results)
if err != nil {
return fmt.Errorf("error filtering responses: %w", err)
}

// copy the results from the results that had the largest height.
copy(batchElems, filtered)
return nil
}

// filterAccountsResponses chooses the rpc response with the highest block number.
func (m *MultiRPCClient) filterResponses(responses []result) ([]rpc.BatchElem, error) {
// see which of the results had the largest height, and store the index of that result.
var (
maxHeight uint64
maxHeightIndex int
errs = make([]error, len(responses))
)
for i, res := range results {
for i, res := range responses {
errs[i] = res.err
if res.height > maxHeight {
maxHeight = res.height
maxHeightIndex = i
Expand All @@ -183,12 +207,17 @@ func (m *MultiRPCClient) BatchCallContext(ctx context.Context, batchElems []rpc.
if maxHeight == 0 {
err := errors.Join(errs...)
if err != nil {
return err
return nil, err
}
// this should never happen... but who knows. maybe something terrible happened.
return errors.New("no errors were encountered, however no go routine was able to report a height")
return nil, errors.New("no errors were encountered, however no go routine was able to report a height")

}
// copy the results from the results that had the largest height.
copy(batchElems, results[maxHeightIndex].results)
return nil

// check the block height
if valid := m.blockAgeChecker.IsHeightValid(maxHeight); !valid {
return nil, fmt.Errorf("height %d is stale and older than %d", maxHeight, m.api.MaxBlockHeightAge)
}

return responses[maxHeightIndex].results, nil
}
82 changes: 55 additions & 27 deletions providers/apis/defi/osmosis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"go.uber.org/zap"

"github.com/skip-mev/slinky/oracle/config"
"github.com/skip-mev/slinky/pkg/http"
"github.com/skip-mev/slinky/providers/apis/defi/types"
"github.com/skip-mev/slinky/providers/base/api/metrics"
)

const (
headerBlockHeight = "grpc-metadata-x-cosmos-block-height"
)

var (
_ Client = &ClientImpl{}
_ Client = &MultiClientImpl{}
Expand All @@ -28,7 +33,7 @@ type Client interface {
poolID uint64,
baseAsset,
quoteAsset string,
) (SpotPriceResponse, error)
) (WrappedSpotPriceResponse, error)
}

// ClientImpl is an implementation of a client to Osmosis using a
Expand Down Expand Up @@ -74,31 +79,43 @@ func NewClient(
}

// SpotPrice uses the underlying x/poolmanager client to access spot prices.
func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) {
func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) {
start := time.Now()
defer func() {
c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start))
}()

url, err := CreateURL(c.endpoint.URL, poolID, baseAsset, quoteAsset)
if err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err
}

resp, err := c.httpClient.GetWithContext(ctx, url)
if err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err
}

c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp)

var blockHeight uint64
heightStr := resp.Header.Get(headerBlockHeight)
if heightStr != "" {
blockHeight, err = strconv.ParseUint(heightStr, 10, 64)
if err != nil {
return WrappedSpotPriceResponse{}, fmt.Errorf("failed to parse block height: %w", err)
}
}

var spotPriceResponse SpotPriceResponse
if err := json.NewDecoder(resp.Body).Decode(&spotPriceResponse); err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err
}

c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp)
return spotPriceResponse, nil
return WrappedSpotPriceResponse{
SpotPriceResponse: spotPriceResponse,
BlockHeight: blockHeight,
}, nil
}

// MultiClientImpl is an Osmosis client that wraps a set of multiple Clients.
Expand All @@ -108,6 +125,8 @@ type MultiClientImpl struct {
apiMetrics metrics.APIMetrics

clients []Client

blockAgeChecker types.BlockAgeChecker
}

// NewMultiClient creates a new Client.
Expand All @@ -134,10 +153,11 @@ func NewMultiClient(
}

return &MultiClientImpl{
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}, nil
}

Expand Down Expand Up @@ -174,17 +194,18 @@ func NewMultiClientFromEndpoints(
}

return &MultiClientImpl{
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}, nil
}

// SpotPrice delegates the request to all underlying clients and applies a filter to the
// set of responses.
func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) {
resps := make([]SpotPriceResponse, len(mc.clients))
func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) {
resps := make([]WrappedSpotPriceResponse, len(mc.clients))

var wg sync.WaitGroup
wg.Add(len(mc.clients))
Expand All @@ -209,22 +230,29 @@ func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAss

wg.Wait()

return filterSpotPriceResponses(resps)
return mc.filterSpotPriceResponses(resps)
}

// filterSpotPriceResponses currently just chooses a random response as there is no way to differentiate.
func filterSpotPriceResponses(responses []SpotPriceResponse) (SpotPriceResponse, error) {
// filterSpotPriceResponses chooses the response with the highest block height.
func (mc *MultiClientImpl) filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpotPriceResponse, error) {
if len(responses) == 0 {
return SpotPriceResponse{}, fmt.Errorf("no responses found")
return WrappedSpotPriceResponse{}, fmt.Errorf("no responses found")
}

perm := rand.Perm(len(responses))
for _, i := range perm {
resp := responses[perm[i]]
if resp.SpotPrice != "" {
return resp, nil
highestHeight := uint64(0)
highestHeightIndex := 0

for i, resp := range responses {
if resp.BlockHeight > highestHeight {
highestHeight = resp.BlockHeight
highestHeightIndex = i
}
}

return SpotPriceResponse{}, fmt.Errorf("no responses found")
// check the block height
if valid := mc.blockAgeChecker.IsHeightValid(highestHeight); !valid {
return WrappedSpotPriceResponse{}, fmt.Errorf("height %d is stale and older than %d", highestHeight, mc.api.MaxBlockHeightAge)
}

return responses[highestHeightIndex], nil
}
Loading

0 comments on commit f73a9b0

Please sign in to comment.