Skip to content

Commit

Permalink
services/ticker: ingest assets optimizations (#4218)
Browse files Browse the repository at this point in the history
* ticker ingest assets optimizations

* Update services/ticker/internal/actions_asset.go

Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com>

* Update services/ticker/internal/scraper/asset_scraper.go

Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com>

* Update services/ticker/internal/scraper/main.go

Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com>

* remove commented code

* remove pointer passing of FinalAsset

Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com>
  • Loading branch information
stfung77 and leighmcculloch authored Feb 8, 2022
1 parent f087487 commit 995905d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 54 deletions.
54 changes: 32 additions & 22 deletions services/ticker/internal/actions_asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"strings"
"sync"
"time"

horizonclient "github.com/stellar/go/clients/horizonclient"
Expand All @@ -19,29 +20,38 @@ func RefreshAssets(ctx context.Context, s *tickerdb.TickerSession, c *horizoncli
Client: c,
Logger: l,
}
finalAssetList, err := sc.FetchAllAssets(0, 10)
if err != nil {
return
}

for _, finalAsset := range finalAssetList {
dbIssuer := tomlIssuerToDBIssuer(finalAsset.IssuerDetails)
if dbIssuer.PublicKey == "" {
dbIssuer.PublicKey = finalAsset.Issuer
}
issuerID, err := s.InsertOrUpdateIssuer(ctx, &dbIssuer, []string{"public_key"})
if err != nil {
l.Error("Error inserting issuer:", dbIssuer, err)
continue
var wg sync.WaitGroup
parallelism := 20
assetQueue := make(chan scraper.FinalAsset, parallelism)

go sc.ProcessAllAssets(0, parallelism, assetQueue)

wg.Add(1)
go func() {
count := 0
defer wg.Done()
for finalAsset := range assetQueue {
dbIssuer := tomlIssuerToDBIssuer(finalAsset.IssuerDetails)
if dbIssuer.PublicKey == "" {
dbIssuer.PublicKey = finalAsset.Issuer
}
issuerID, err := s.InsertOrUpdateIssuer(ctx, &dbIssuer, []string{"public_key"})
if err != nil {
l.Error("Error inserting issuer:", dbIssuer, err)
continue
}

dbAsset := finalAssetToDBAsset(finalAsset, issuerID)
err = s.InsertOrUpdateAsset(ctx, &dbAsset, []string{"code", "issuer_account", "issuer_id"})
if err != nil {
l.Error("Error inserting asset:", dbAsset, err)
continue
}
count += 1
l.Debugf("Assets added -- count: %d - issuer: %d, asset: %s", count, issuerID, dbAsset.Code)
}

dbAsset := finalAssetToDBAsset(finalAsset, issuerID)
err = s.InsertOrUpdateAsset(ctx, &dbAsset, []string{"code", "issuer_account", "issuer_id"})
if err != nil {
l.Error("Error inserting asset:", dbAsset, err)
}
}

}()
wg.Wait()
return
}

Expand Down
36 changes: 8 additions & 28 deletions services/ticker/internal/scraper/asset_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,23 @@ func processAsset(asset hProtocol.AssetStat, shouldValidateTOML bool) (FinalAsse
}

// parallelProcessAssets filters the assets that don't match the shouldDiscardAsset criteria.
// non-trash assets are sent to the assetQueue.
// The TOML validation is performed in parallel to improve performance.
func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, parallelism int) (cleanAssets []FinalAsset, numTrash int) {
queue := make(chan FinalAsset, parallelism)
func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, parallelism int, assetQueue chan<- FinalAsset) (numNonTrash int, numTrash int) {
shouldValidateTOML := c.Client != horizonclient.DefaultTestNetClient // TOMLs shouldn't be validated on TestNet

var mutex = &sync.Mutex{}
var wg sync.WaitGroup
numAssets := len(assets)
chunkSize := int(math.Ceil(float64(numAssets) / float64(parallelism)))
wg.Add(numAssets)
wg.Add(parallelism)

// The assets are divided into chunks of chunkSize, and each goroutine is responsible
// for cleaning up one chunk
for i := 0; i < parallelism; i++ {
go func(start int) {
end := start + chunkSize
defer wg.Done()

end := start + chunkSize
if end > numAssets {
end = numAssets
}
Expand All @@ -262,42 +262,22 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para
mutex.Lock()
numTrash++
mutex.Unlock()
// Invalid assets are also sent to the queue to preserve
// the WaitGroup count
queue <- FinalAsset{IsTrash: true}
continue
}
queue <- finalAsset
assetQueue <- finalAsset
} else {
mutex.Lock()
numTrash++
mutex.Unlock()
// Discarded assets are also sent to the queue to preserve
// the WaitGroup count
queue <- FinalAsset{IsTrash: true}
}
}
}(i * chunkSize)
}

// Whenever a new asset is sent to the channel, it is appended to the cleanAssets
// slice. This does not preserve the original order, but shouldn't be an issue
// in this case.
go func() {
count := 0
for t := range queue {
count++
if !t.IsTrash {
cleanAssets = append(cleanAssets, t)
}
c.Logger.Debug("Total assets processed:", count)
wg.Done()
}
}()

wg.Wait()
close(queue)
close(assetQueue)

numNonTrash = len(assets) - numTrash
return
}

Expand Down
8 changes: 4 additions & 4 deletions services/ticker/internal/scraper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,20 @@ type OrderbookStats struct {
SpreadMidPoint float64
}

// FetchAllAssets fetches assets from the Horizon public net. If limit = 0, will fetch all assets.
func (c *ScraperConfig) FetchAllAssets(limit int, parallelism int) (assets []FinalAsset, err error) {
// ProcessAllAssets fetches assets from the Horizon public net. If limit = 0, will fetch all assets.
func (c *ScraperConfig) ProcessAllAssets(limit int, parallelism int, assetQueue chan<- FinalAsset) (numNonTrash int, numTrash int) {
dirtyAssets, err := c.retrieveAssets(limit)
if err != nil {
return
}

assets, numTrash := c.parallelProcessAssets(dirtyAssets, parallelism)
numNonTrash, numTrash = c.parallelProcessAssets(dirtyAssets, parallelism, assetQueue)

c.Logger.Infof(
"Scanned %d entries. Trash: %d. Non-trash: %d\n",
len(dirtyAssets),
numTrash,
len(assets),
numNonTrash,
)
return
}
Expand Down

0 comments on commit 995905d

Please sign in to comment.