Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/ticker: cache tomls during scraping #4286

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions services/ticker/internal/scraper/asset_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ func decodeTOMLIssuer(tomlData string) (issuer TOMLIssuer, err error) {
return
}

// fetchTOMLData fetches the TOML data for a given hProtocol.AssetStat
func fetchTOMLData(asset hProtocol.AssetStat) (data string, err error) {
tomlURL := asset.Links.Toml.Href

// fetchTOMLData fetches the TOML data from the URL.
func fetchTOMLData(tomlURL string) (data string, err error) {
if tomlURL == "" {
err = errors.New("Asset does not have a TOML URL")
return
Expand Down Expand Up @@ -214,19 +212,30 @@ func makeFinalAsset(
}

// processAsset merges data from an AssetStat with data retrieved from its corresponding TOML file
func processAsset(asset hProtocol.AssetStat, shouldValidateTOML bool) (FinalAsset, error) {
func (c *ScraperConfig) processAsset(asset hProtocol.AssetStat, tomlCache map[string]TOMLIssuer, shouldValidateTOML bool) (FinalAsset, error) {
var errors []error
var issuer TOMLIssuer

if shouldValidateTOML {
tomlData, err := fetchTOMLData(asset)
if err != nil {
errors = append(errors, err)
}
tomlURL := asset.Links.Toml.Href

var ok bool
issuer, ok = tomlCache[tomlURL]
if ok {
c.Logger.Infof("Using cached TOML for asset %s:%s", asset.Asset.Code, asset.Asset.Issuer)
} else {
c.Logger.Infof("Fetching TOML for asset %s:%s", asset.Asset.Code, asset.Asset.Issuer)
tomlData, err := fetchTOMLData(tomlURL)
if err != nil {
errors = append(errors, err)
}

issuer, err = decodeTOMLIssuer(tomlData)
if err != nil {
errors = append(errors, err)
issuer, err = decodeTOMLIssuer(tomlData)
if err != nil {
errors = append(errors, err)
}

tomlCache[tomlURL] = issuer
}
}

Expand Down Expand Up @@ -255,9 +264,16 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para
end = numAssets
}

// Each routine running concurrently has a separate cache of TOMLs
// loaded. A single shared cache would be better, but this is a
// tradeoff for simplicity because a shared map mutated with HTTP
// lookups would have a significant amount of contention.
tomlCache := map[string]TOMLIssuer{}

for j := start; j < end; j++ {
if !shouldDiscardAsset(assets[j], shouldValidateTOML) {
finalAsset, err := processAsset(assets[j], shouldValidateTOML)
c.Logger.Infof("Processing asset %s:%s", assets[j].Asset.Code, assets[j].Asset.Issuer)
finalAsset, err := c.processAsset(assets[j], tomlCache, shouldValidateTOML)
if err != nil {
mutex.Lock()
numTrash++
Expand All @@ -266,6 +282,7 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para
}
assetQueue <- finalAsset
} else {
c.Logger.Infof("Discarding asset %s:%s", assets[j].Asset.Code, assets[j].Asset.Issuer)
mutex.Lock()
numTrash++
mutex.Unlock()
Expand Down
48 changes: 43 additions & 5 deletions services/ticker/internal/scraper/asset_scraper_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package scraper

import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
"github.com/stellar/go/support/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestShouldDiscardAsset(t *testing.T) {
Expand Down Expand Up @@ -134,10 +138,7 @@ func TestIsDomainVerified(t *testing.T) {

func TestIgnoreInvalidTOMLUrls(t *testing.T) {
invalidURL := "https:// there is something wrong here.com/stellar.toml"
assetStat := hProtocol.AssetStat{}
assetStat.Links.Toml = hal.Link{Href: invalidURL}

_, err := fetchTOMLData(assetStat)
_, err := fetchTOMLData(invalidURL)

urlErr, ok := errors.Cause(err).(*url.Error)
if !ok {
Expand All @@ -147,3 +148,40 @@ func TestIgnoreInvalidTOMLUrls(t *testing.T) {
assert.Equal(t, "https:// there is something wrong here.com/stellar.toml", urlErr.URL)
assert.EqualError(t, urlErr.Err, `invalid character " " in host name`)
}

func TestProcessAsset_notCached(t *testing.T) {
scraper := &ScraperConfig{Logger: log.DefaultLogger}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `SIGNING_KEY="not cached signing key"`)
}))
asset := hProtocol.AssetStat{
Amount: "123901.0129310",
NumAccounts: 100,
}
asset.Code = "SOMETHINGVALID"
asset.Links.Toml.Href = server.URL
tomlCache := map[string]TOMLIssuer{}
finalAsset, err := scraper.processAsset(asset, tomlCache, true)
require.NoError(t, err)
assert.NotZero(t, finalAsset)
assert.Equal(t, "not cached signing key", finalAsset.IssuerDetails.SigningKey)
}

func TestProcessAsset_cached(t *testing.T) {
scraper := &ScraperConfig{Logger: log.DefaultLogger}
asset := hProtocol.AssetStat{
Amount: "123901.0129310",
NumAccounts: 100,
}
asset.Code = "SOMETHINGVALID"
asset.Links.Toml.Href = "url"
tomlCache := map[string]TOMLIssuer{
"url": {
SigningKey: "signing key",
},
}
finalAsset, err := scraper.processAsset(asset, tomlCache, true)
require.NoError(t, err)
assert.NotZero(t, finalAsset)
assert.Equal(t, "signing key", finalAsset.IssuerDetails.SigningKey)
}