From 995905d81c64db8574ac7ee782f6ebb2c0b135aa Mon Sep 17 00:00:00 2001 From: stfung77 Date: Tue, 8 Feb 2022 13:53:54 -0500 Subject: [PATCH] services/ticker: ingest assets optimizations (#4218) * ticker ingest assets optimizations * Update services/ticker/internal/actions_asset.go Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> * Update services/ticker/internal/scraper/asset_scraper.go Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> * Update services/ticker/internal/scraper/main.go Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> * remove commented code * remove pointer passing of FinalAsset Co-authored-by: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> --- services/ticker/internal/actions_asset.go | 54 +++++++++++-------- .../ticker/internal/scraper/asset_scraper.go | 36 +++---------- services/ticker/internal/scraper/main.go | 8 +-- 3 files changed, 44 insertions(+), 54 deletions(-) diff --git a/services/ticker/internal/actions_asset.go b/services/ticker/internal/actions_asset.go index d333c87087..037e7b1f2d 100644 --- a/services/ticker/internal/actions_asset.go +++ b/services/ticker/internal/actions_asset.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "strings" + "sync" "time" horizonclient "github.com/stellar/go/clients/horizonclient" @@ -19,29 +20,38 @@ func RefreshAssets(ctx context.Context, s *tickerdb.TickerSession, c *horizoncli Client: c, Logger: l, } - finalAssetList, err := sc.FetchAllAssets(0, 10) - if err != nil { - return - } - - for _, finalAsset := range finalAssetList { - dbIssuer := tomlIssuerToDBIssuer(finalAsset.IssuerDetails) - if dbIssuer.PublicKey == "" { - dbIssuer.PublicKey = finalAsset.Issuer - } - issuerID, err := s.InsertOrUpdateIssuer(ctx, &dbIssuer, []string{"public_key"}) - if err != nil { - l.Error("Error inserting issuer:", dbIssuer, err) - continue + var wg sync.WaitGroup + parallelism := 20 + assetQueue := make(chan scraper.FinalAsset, parallelism) + + go sc.ProcessAllAssets(0, parallelism, assetQueue) + + wg.Add(1) + go func() { + count := 0 + defer wg.Done() + for finalAsset := range assetQueue { + dbIssuer := tomlIssuerToDBIssuer(finalAsset.IssuerDetails) + if dbIssuer.PublicKey == "" { + dbIssuer.PublicKey = finalAsset.Issuer + } + issuerID, err := s.InsertOrUpdateIssuer(ctx, &dbIssuer, []string{"public_key"}) + if err != nil { + l.Error("Error inserting issuer:", dbIssuer, err) + continue + } + + dbAsset := finalAssetToDBAsset(finalAsset, issuerID) + err = s.InsertOrUpdateAsset(ctx, &dbAsset, []string{"code", "issuer_account", "issuer_id"}) + if err != nil { + l.Error("Error inserting asset:", dbAsset, err) + continue + } + count += 1 + l.Debugf("Assets added -- count: %d - issuer: %d, asset: %s", count, issuerID, dbAsset.Code) } - - dbAsset := finalAssetToDBAsset(finalAsset, issuerID) - err = s.InsertOrUpdateAsset(ctx, &dbAsset, []string{"code", "issuer_account", "issuer_id"}) - if err != nil { - l.Error("Error inserting asset:", dbAsset, err) - } - } - + }() + wg.Wait() return } diff --git a/services/ticker/internal/scraper/asset_scraper.go b/services/ticker/internal/scraper/asset_scraper.go index 6fc1c942d3..6c60ffd535 100644 --- a/services/ticker/internal/scraper/asset_scraper.go +++ b/services/ticker/internal/scraper/asset_scraper.go @@ -234,23 +234,23 @@ func processAsset(asset hProtocol.AssetStat, shouldValidateTOML bool) (FinalAsse } // parallelProcessAssets filters the assets that don't match the shouldDiscardAsset criteria. +// non-trash assets are sent to the assetQueue. // The TOML validation is performed in parallel to improve performance. -func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, parallelism int) (cleanAssets []FinalAsset, numTrash int) { - queue := make(chan FinalAsset, parallelism) +func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, parallelism int, assetQueue chan<- FinalAsset) (numNonTrash int, numTrash int) { shouldValidateTOML := c.Client != horizonclient.DefaultTestNetClient // TOMLs shouldn't be validated on TestNet - var mutex = &sync.Mutex{} var wg sync.WaitGroup numAssets := len(assets) chunkSize := int(math.Ceil(float64(numAssets) / float64(parallelism))) - wg.Add(numAssets) + wg.Add(parallelism) // The assets are divided into chunks of chunkSize, and each goroutine is responsible // for cleaning up one chunk for i := 0; i < parallelism; i++ { go func(start int) { - end := start + chunkSize + defer wg.Done() + end := start + chunkSize if end > numAssets { end = numAssets } @@ -262,42 +262,22 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para mutex.Lock() numTrash++ mutex.Unlock() - // Invalid assets are also sent to the queue to preserve - // the WaitGroup count - queue <- FinalAsset{IsTrash: true} continue } - queue <- finalAsset + assetQueue <- finalAsset } else { mutex.Lock() numTrash++ mutex.Unlock() - // Discarded assets are also sent to the queue to preserve - // the WaitGroup count - queue <- FinalAsset{IsTrash: true} } } }(i * chunkSize) } - // Whenever a new asset is sent to the channel, it is appended to the cleanAssets - // slice. This does not preserve the original order, but shouldn't be an issue - // in this case. - go func() { - count := 0 - for t := range queue { - count++ - if !t.IsTrash { - cleanAssets = append(cleanAssets, t) - } - c.Logger.Debug("Total assets processed:", count) - wg.Done() - } - }() - wg.Wait() - close(queue) + close(assetQueue) + numNonTrash = len(assets) - numTrash return } diff --git a/services/ticker/internal/scraper/main.go b/services/ticker/internal/scraper/main.go index ce61439bb8..bd8a0ba3d2 100644 --- a/services/ticker/internal/scraper/main.go +++ b/services/ticker/internal/scraper/main.go @@ -110,20 +110,20 @@ type OrderbookStats struct { SpreadMidPoint float64 } -// FetchAllAssets fetches assets from the Horizon public net. If limit = 0, will fetch all assets. -func (c *ScraperConfig) FetchAllAssets(limit int, parallelism int) (assets []FinalAsset, err error) { +// ProcessAllAssets fetches assets from the Horizon public net. If limit = 0, will fetch all assets. +func (c *ScraperConfig) ProcessAllAssets(limit int, parallelism int, assetQueue chan<- FinalAsset) (numNonTrash int, numTrash int) { dirtyAssets, err := c.retrieveAssets(limit) if err != nil { return } - assets, numTrash := c.parallelProcessAssets(dirtyAssets, parallelism) + numNonTrash, numTrash = c.parallelProcessAssets(dirtyAssets, parallelism, assetQueue) c.Logger.Infof( "Scanned %d entries. Trash: %d. Non-trash: %d\n", len(dirtyAssets), numTrash, - len(assets), + numNonTrash, ) return }