Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backport https://github.com/skip-mev/connect/pull/705 #864

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions oracle/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type APIConfig struct {

// Name is the name of the provider that corresponds to this config.
Name string `json:"name"`

// MaxBlockHeightAge is the oldest an update from an on-chain data source can be without having its
// block height incremented. In the case where a data source has exceeded this limit and the block
// height is not increasing, price reporting will be skipped until the block height increases.
MaxBlockHeightAge time.Duration `json:"maxBlockHeightAge"`
}

// Endpoint holds all data necessary for an API provider to connect to a given endpoint
Expand Down Expand Up @@ -123,5 +128,9 @@ func (c *APIConfig) ValidateBasic() error {
}
}

if c.MaxBlockHeightAge < 0 {
return fmt.Errorf("max_block_height_age cannot be negative")
}

return nil
}
30 changes: 30 additions & 0 deletions oracle/config/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,36 @@ func TestAPIConfig(t *testing.T) {
},
expectedErr: false,
},
{
name: "good config with max_block_height_age",
config: config.APIConfig{
Enabled: true,
Timeout: time.Second,
Interval: time.Second,
ReconnectTimeout: time.Second,
MaxQueries: 1,
Name: "test",
Endpoints: []config.Endpoint{{URL: "http://test.com"}},
BatchSize: 1,
MaxBlockHeightAge: 10 * time.Second,
},
expectedErr: false,
},
{
name: "bad config with negative max_block_height_age",
config: config.APIConfig{
Enabled: true,
Timeout: time.Second,
Interval: time.Second,
ReconnectTimeout: time.Second,
MaxQueries: 1,
Name: "test",
Endpoints: []config.Endpoint{{URL: "http://test.com"}},
BatchSize: 1,
MaxBlockHeightAge: -10 * time.Second,
},
expectedErr: true,
},
{
name: "bad config with invalid endpoint (no url)",
config: config.APIConfig{
Expand Down
75 changes: 52 additions & 23 deletions providers/apis/defi/ethmulticlient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"fmt"
"sync"

"github.com/skip-mev/slinky/providers/apis/defi/types"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"go.uber.org/zap"
Expand All @@ -23,6 +25,8 @@

// underlying clients
clients []EVMClient

blockAgeChecker types.BlockAgeChecker
}

// NewMultiRPCClient returns a new MultiRPCClient.
Expand All @@ -32,9 +36,10 @@
clients []EVMClient,
) EVMClient {
return &MultiRPCClient{
logger: logger,
clients: clients,
api: api,
logger: logger,
clients: clients,
api: api,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}
}

Expand Down Expand Up @@ -81,12 +86,20 @@
}

return &MultiRPCClient{
logger: logger.With(zap.String("multi_client", api.Name)),
api: api,
clients: clients,
logger: logger.With(zap.String("multi_client", api.Name)),
api: api,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),

Check warning on line 92 in providers/apis/defi/ethmulticlient/multi_client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/ethmulticlient/multi_client.go#L89-L92

Added lines #L89 - L92 were not covered by tests
}, nil
}

// define a result struct that go routines will populate and append to a slice when they complete their request.
type result struct {
height uint64
results []rpc.BatchElem
err error
}

// BatchCallContext injects a call to eth_blockNumber, and makes batch calls to the underlying EVMClients.
// It returns the response that has the greatest height from the eth_blockNumber call. An error is returned
// only when no client was able to successfully provide a height or errored when sending the BatchCall.
Expand All @@ -95,15 +108,9 @@
m.logger.Debug("BatchCallContext called with 0 elems")
return nil
}
// define a result struct that go routines will populate and append to a slice when they complete their request.
type result struct {
height uint64
results []rpc.BatchElem
}

results := make([]result, len(m.clients))

// error slice to capture errors go routines encounter.
errs := make([]error, len(m.clients))
wg := new(sync.WaitGroup)
// this is the index of where we will have an eth_blockNumber call.
blockNumReqIndex := len(batchElems)
Expand All @@ -124,7 +131,8 @@
// if there was an error, or if the block_num request didn't have result / errored
// we log the error and append to error slice.
if err != nil || req[blockNumReqIndex].Result == "" || req[blockNumReqIndex].Error != nil {
errs[i] = fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error)
resultErr := fmt.Errorf("endpoint request failed: %w, %w", err, req[blockNumReqIndex].Error)
results[i] = result{0, nil, resultErr}
m.logger.Debug(
"endpoint request failed",
zap.Error(err),
Expand All @@ -138,7 +146,8 @@
// try to get the block number.
r, ok := req[blockNumReqIndex].Result.(*string)
if !ok {
errs[i] = fmt.Errorf("result from eth_blockNumber was not a string")
resultErr := fmt.Errorf("result from eth_blockNumber was not a string")
results[i] = result{0, nil, resultErr}

Check warning on line 150 in providers/apis/defi/ethmulticlient/multi_client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/ethmulticlient/multi_client.go#L149-L150

Added lines #L149 - L150 were not covered by tests
m.logger.Debug(
"result from eth_blockNumber was not a string",
zap.String("url", url),
Expand All @@ -149,7 +158,8 @@
// decode the new height
height, err := hexutil.DecodeUint64(*r)
if err != nil { // if we can't decode the height, log an error.
errs[i] = fmt.Errorf("could not decode hex eth height: %w", err)
resultErr := fmt.Errorf("could not decode hex eth height: %w", err)
results[i] = result{0, nil, resultErr}
m.logger.Debug(
"could not decode hex eth height",
zap.String("url", url),
Expand All @@ -163,17 +173,31 @@
zap.String("url", url),
)
// append the results, minus the appended eth_blockNumber request.
results[i] = result{height, req[:blockNumReqIndex]}
results[i] = result{height, req[:blockNumReqIndex], nil}
}(clientIdx)
}
wg.Wait()

filtered, err := m.filterResponses(results)
if err != nil {
return fmt.Errorf("error filtering responses: %w", err)
}

// copy the results from the results that had the largest height.
copy(batchElems, filtered)
return nil
}

// filterAccountsResponses chooses the rpc response with the highest block number.
func (m *MultiRPCClient) filterResponses(responses []result) ([]rpc.BatchElem, error) {
// see which of the results had the largest height, and store the index of that result.
var (
maxHeight uint64
maxHeightIndex int
errs = make([]error, len(responses))
)
for i, res := range results {
for i, res := range responses {
errs[i] = res.err
if res.height > maxHeight {
maxHeight = res.height
maxHeightIndex = i
Expand All @@ -183,12 +207,17 @@
if maxHeight == 0 {
err := errors.Join(errs...)
if err != nil {
return err
return nil, err
}
// this should never happen... but who knows. maybe something terrible happened.
return errors.New("no errors were encountered, however no go routine was able to report a height")
return nil, errors.New("no errors were encountered, however no go routine was able to report a height")

}
// copy the results from the results that had the largest height.
copy(batchElems, results[maxHeightIndex].results)
return nil

// check the block height
if valid := m.blockAgeChecker.IsHeightValid(maxHeight); !valid {
return nil, fmt.Errorf("height %d is stale and older than %d", maxHeight, m.api.MaxBlockHeightAge)
}

Check warning on line 220 in providers/apis/defi/ethmulticlient/multi_client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/ethmulticlient/multi_client.go#L219-L220

Added lines #L219 - L220 were not covered by tests

return responses[maxHeightIndex].results, nil
}
82 changes: 55 additions & 27 deletions providers/apis/defi/osmosis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
"context"
"encoding/json"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"go.uber.org/zap"

"github.com/skip-mev/slinky/oracle/config"
"github.com/skip-mev/slinky/pkg/http"
"github.com/skip-mev/slinky/providers/apis/defi/types"
"github.com/skip-mev/slinky/providers/base/api/metrics"
)

const (
headerBlockHeight = "grpc-metadata-x-cosmos-block-height"
)

var (
_ Client = &ClientImpl{}
_ Client = &MultiClientImpl{}
Expand All @@ -28,7 +33,7 @@
poolID uint64,
baseAsset,
quoteAsset string,
) (SpotPriceResponse, error)
) (WrappedSpotPriceResponse, error)
}

// ClientImpl is an implementation of a client to Osmosis using a
Expand Down Expand Up @@ -74,31 +79,43 @@
}

// SpotPrice uses the underlying x/poolmanager client to access spot prices.
func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) {
func (c *ClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) {

Check warning on line 82 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L82

Added line #L82 was not covered by tests
start := time.Now()
defer func() {
c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start))
}()

url, err := CreateURL(c.endpoint.URL, poolID, baseAsset, quoteAsset)
if err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err

Check warning on line 90 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L90

Added line #L90 was not covered by tests
}

resp, err := c.httpClient.GetWithContext(ctx, url)
if err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err

Check warning on line 95 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L95

Added line #L95 was not covered by tests
}

c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp)

var blockHeight uint64
heightStr := resp.Header.Get(headerBlockHeight)
if heightStr != "" {
blockHeight, err = strconv.ParseUint(heightStr, 10, 64)
if err != nil {
return WrappedSpotPriceResponse{}, fmt.Errorf("failed to parse block height: %w", err)
}

Check warning on line 106 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L100-L106

Added lines #L100 - L106 were not covered by tests
}

var spotPriceResponse SpotPriceResponse
if err := json.NewDecoder(resp.Body).Decode(&spotPriceResponse); err != nil {
return SpotPriceResponse{}, err
return WrappedSpotPriceResponse{}, err

Check warning on line 111 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L111

Added line #L111 was not covered by tests
}

c.apiMetrics.AddHTTPStatusCode(c.api.Name, resp)
return spotPriceResponse, nil
return WrappedSpotPriceResponse{
SpotPriceResponse: spotPriceResponse,
BlockHeight: blockHeight,
}, nil

Check warning on line 118 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L115-L118

Added lines #L115 - L118 were not covered by tests
}

// MultiClientImpl is an Osmosis client that wraps a set of multiple Clients.
Expand All @@ -108,6 +125,8 @@
apiMetrics metrics.APIMetrics

clients []Client

blockAgeChecker types.BlockAgeChecker
}

// NewMultiClient creates a new Client.
Expand All @@ -134,10 +153,11 @@
}

return &MultiClientImpl{
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}, nil
}

Expand Down Expand Up @@ -174,17 +194,18 @@
}

return &MultiClientImpl{
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
logger: logger,
api: api,
apiMetrics: apiMetrics,
clients: clients,
blockAgeChecker: types.NewBlockAgeChecker(api.MaxBlockHeightAge),
}, nil
}

// SpotPrice delegates the request to all underlying clients and applies a filter to the
// set of responses.
func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (SpotPriceResponse, error) {
resps := make([]SpotPriceResponse, len(mc.clients))
func (mc *MultiClientImpl) SpotPrice(ctx context.Context, poolID uint64, baseAsset, quoteAsset string) (WrappedSpotPriceResponse, error) {
resps := make([]WrappedSpotPriceResponse, len(mc.clients))

var wg sync.WaitGroup
wg.Add(len(mc.clients))
Expand All @@ -209,22 +230,29 @@

wg.Wait()

return filterSpotPriceResponses(resps)
return mc.filterSpotPriceResponses(resps)
}

// filterSpotPriceResponses currently just chooses a random response as there is no way to differentiate.
func filterSpotPriceResponses(responses []SpotPriceResponse) (SpotPriceResponse, error) {
// filterSpotPriceResponses chooses the response with the highest block height.
func (mc *MultiClientImpl) filterSpotPriceResponses(responses []WrappedSpotPriceResponse) (WrappedSpotPriceResponse, error) {
if len(responses) == 0 {
return SpotPriceResponse{}, fmt.Errorf("no responses found")
return WrappedSpotPriceResponse{}, fmt.Errorf("no responses found")

Check warning on line 239 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L239

Added line #L239 was not covered by tests
}

perm := rand.Perm(len(responses))
for _, i := range perm {
resp := responses[perm[i]]
if resp.SpotPrice != "" {
return resp, nil
highestHeight := uint64(0)
highestHeightIndex := 0

for i, resp := range responses {
if resp.BlockHeight > highestHeight {
highestHeight = resp.BlockHeight
highestHeightIndex = i

Check warning on line 248 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L247-L248

Added lines #L247 - L248 were not covered by tests
}
}

return SpotPriceResponse{}, fmt.Errorf("no responses found")
// check the block height
if valid := mc.blockAgeChecker.IsHeightValid(highestHeight); !valid {
return WrappedSpotPriceResponse{}, fmt.Errorf("height %d is stale and older than %d", highestHeight, mc.api.MaxBlockHeightAge)
}

Check warning on line 255 in providers/apis/defi/osmosis/client.go

View check run for this annotation

Codecov / codecov/patch

providers/apis/defi/osmosis/client.go#L254-L255

Added lines #L254 - L255 were not covered by tests

return responses[highestHeightIndex], nil
}
Loading
Loading