Skip to content

Commit

Permalink
Merge pull request #160 from SiaFoundation/christopher/scanning-expon…
Browse files Browse the repository at this point in the history
…ential-backoff

Exponential delay if scan fails
  • Loading branch information
chris124567 authored Jan 23, 2025
2 parents 9ab3c45 + 183f432 commit 04df7ba
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 101 deletions.
2 changes: 1 addition & 1 deletion cmd/explored/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var cfg = config.Config{
Threads: 10,
Timeout: 30 * time.Second,
MaxLastScan: 3 * time.Hour,
MinLastAnnouncement: 90 * 24 * time.Hour,
MinLastAnnouncement: 365 * 24 * time.Hour,
},
ExchangeRates: config.ExchangeRates{
Refresh: 3 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion explorer/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Store interface {
Search(id string) (SearchType, error)

QueryHosts(params HostQuery, sortBy HostSortColumn, dir HostSortDir, offset, limit uint64) ([]Host, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) ([]Host, error)
HostsForScanning(minLastAnnouncement time.Time, limit uint64) ([]UnscannedHost, error)
}

// Explorer implements a Sia explorer.
Expand Down
21 changes: 13 additions & 8 deletions explorer/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package explorer
import (
"context"
"fmt"
"math"
"net"
"time"

Expand Down Expand Up @@ -51,7 +52,7 @@ func (e *Explorer) waitForSync() error {
return nil
}

func (e *Explorer) scanV1Host(locator geoip.Locator, host Host) (HostScan, error) {
func (e *Explorer) scanV1Host(locator geoip.Locator, host UnscannedHost) (HostScan, error) {
ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout)
defer cancel()

Expand Down Expand Up @@ -112,7 +113,7 @@ func (e *Explorer) scanV1Host(locator geoip.Locator, host Host) (HostScan, error
}, nil
}

func (e *Explorer) scanV2Host(locator geoip.Locator, host Host) (HostScan, error) {
func (e *Explorer) scanV2Host(locator geoip.Locator, host UnscannedHost) (HostScan, error) {
ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout)
defer cancel()

Expand Down Expand Up @@ -184,10 +185,10 @@ func (e *Explorer) scanHosts() {
defer locator.Close()

for !e.isClosed() {
lastScanCutoff := time.Now().Add(-e.scanCfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-e.scanCfg.MinLastAnnouncement)
now := types.CurrentTimestamp()
lastAnnouncementCutoff := now.Add(-e.scanCfg.MinLastAnnouncement)

batch, err := e.s.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, scanBatchSize)
batch, err := e.s.HostsForScanning(lastAnnouncementCutoff, scanBatchSize)
if err != nil {
e.log.Info("failed to get hosts for scanning:", zap.Error(err))
return
Expand All @@ -204,23 +205,27 @@ func (e *Explorer) scanHosts() {
results := make([]HostScan, len(batch))
for i, host := range batch {
e.wg.Add(1)
go func(i int, host Host) {
go func(i int, host UnscannedHost) {
defer e.wg.Done()

var err error
if len(host.V2NetAddresses) > 0 {
if host.IsV2() {
results[i], err = e.scanV2Host(locator, host)
} else {
results[i], err = e.scanV1Host(locator, host)
}
now := types.CurrentTimestamp()
if err != nil {
e.log.Debug("host scan failed", zap.Stringer("pk", host.PublicKey), zap.Error(err))
results[i] = HostScan{
PublicKey: host.PublicKey,
Success: false,
Timestamp: types.CurrentTimestamp(),
Timestamp: now,
NextScan: now.Add(e.scanCfg.MaxLastScan * time.Duration(math.Pow(2, float64(host.FailedInteractionsStreak)+1))),
}
return
} else {
results[i].NextScan = now.Add(e.scanCfg.MaxLastScan)
}
}(i, host)
}
Expand Down
44 changes: 27 additions & 17 deletions explorer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,40 @@ type HostScan struct {
CountryCode string `json:"countryCode"`
Success bool `json:"success"`
Timestamp time.Time `json:"timestamp"`
NextScan time.Time `json:"nextScan"`

Settings rhpv2.HostSettings `json:"settings"`
PriceTable rhpv3.HostPriceTable `json:"priceTable"`

RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"`
}

// UnscannedHost represents the metadata needed to scan a host.
type UnscannedHost struct {
PublicKey types.PublicKey `json:"publicKey"`
V2 bool `json:"v2"`
NetAddress string `json:"netAddress"`
V2NetAddresses []chain.NetAddress `json:"v2NetAddresses,omitempty"`
FailedInteractionsStreak uint64 `json:"failedInteractionsStreak"`
}

// V2SiamuxAddr returns the `Address` of the first TCP siamux `NetAddress` it
// finds in the host's list of net addresses. The protocol for this address is
// ProtocolTCPSiaMux.
func (h UnscannedHost) V2SiamuxAddr() (string, bool) {
for _, netAddr := range h.V2NetAddresses {
if netAddr.Protocol == crhpv4.ProtocolTCPSiaMux {
return netAddr.Address, true
}
}
return "", false
}

// IsV2 returns whether a host supports V2 or not.
func (h UnscannedHost) IsV2() bool {
return len(h.V2NetAddresses) > 0
}

// Host represents a host and the information gathered from scanning it.
type Host struct {
PublicKey types.PublicKey `json:"publicKey"`
Expand All @@ -360,23 +387,6 @@ type Host struct {
RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"`
}

// V2SiamuxAddr returns the `Address` of the first TCP siamux `NetAddress` it
// finds in the host's list of net addresses. The protocol for this address is
// ProtocolTCPSiaMux.
func (h Host) V2SiamuxAddr() (string, bool) {
for _, netAddr := range h.V2NetAddresses {
if netAddr.Protocol == crhpv4.ProtocolTCPSiaMux {
return netAddr.Address, true
}
}
return "", false
}

// IsV2 returns whether a host supports V2 or not.
func (h Host) IsV2() bool {
return len(h.V2NetAddresses) > 0
}

// HostMetrics represents averages of scanned information from hosts.
type HostMetrics struct {
// Number of hosts that were up as of there last scan
Expand Down
110 changes: 53 additions & 57 deletions persist/sqlite/consensus.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions persist/sqlite/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,8 +1688,7 @@ func TestHostAnnouncement(t *testing.T) {
testutil.CheckTransaction(t, txn3, dbTxns[0])
}

ts := time.Unix(0, 0)
hosts, err := db.HostsForScanning(ts, ts, 100)
hosts, err := db.HostsForScanning(time.Unix(0, 0), 100)
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 10 additions & 7 deletions persist/sqlite/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"strings"
"time"

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/explored/explorer"
)

// HostsForScanning returns hosts ordered by the transaction they were created in.
// Note that only the PublicKey, V2, NetAddress, and V2NetAddresses fields are
// populated.
func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) (result []explorer.Host, err error) {
// HostsForScanning returns hosts ordered by their time to next scan. Hosts
// which are repeatedly offline will face an exponentially growing next scan
// time to avoid wasting resources.
// Note that only the PublicKey, V2, NetAddress, V2NetAddresses,
// FailedInteractionsStreak fields are populated.
func (s *Store) HostsForScanning(minLastAnnouncement time.Time, limit uint64) (result []explorer.UnscannedHost, err error) {
err = s.transaction(func(tx *txn) error {
rows, err := tx.Query(`SELECT public_key, v2, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ?`, encode(maxLastScan), encode(minLastAnnouncement), limit)
rows, err := tx.Query(`SELECT public_key, v2, net_address, failed_interactions_streak FROM host_info WHERE next_scan <= ? AND last_announcement >= ? ORDER BY next_scan ASC LIMIT ?`, encode(types.CurrentTimestamp()), encode(minLastAnnouncement), limit)
if err != nil {
return err
}
Expand All @@ -27,8 +30,8 @@ func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, lim
defer v2AddrStmt.Close()

for rows.Next() {
var host explorer.Host
if err := rows.Scan(decode(&host.PublicKey), &host.V2, &host.NetAddress); err != nil {
var host explorer.UnscannedHost
if err := rows.Scan(decode(&host.PublicKey), &host.V2, &host.NetAddress, &host.FailedInteractionsStreak); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions persist/sqlite/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,13 @@ CREATE TABLE host_info (
known_since INTEGER NOT NULL,
last_scan INTEGER NOT NULL,
last_scan_successful INTEGER NOT NULL,
next_scan INTEGER NOT NULL,
last_announcement INTEGER NOT NULL,
total_scans INTEGER NOT NULL,
successful_interactions INTEGER NOT NULL,
failed_interactions INTEGER NOT NULL,
-- number of failed interactions since the last successful interaction
failed_interactions_streak INTEGER NOT NULL,
-- settings
settings_accepting_contracts INTEGER NOT NULL,
settings_max_download_batch_size BLOB NOT NULL,
Expand Down
16 changes: 8 additions & 8 deletions persist/sqlite/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ func TestScan(t *testing.T) {
}

{
lastScanCutoff := time.Now().Add(-cfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement)
now := types.CurrentTimestamp()
lastAnnouncementCutoff := now.Add(-cfg.MinLastAnnouncement)

dbHosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100)
dbHosts, err := db.HostsForScanning(lastAnnouncementCutoff, 100)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestScan(t *testing.T) {
t.Fatal(err)
}

time.Sleep(cfg.Timeout)
time.Sleep(2 * cfg.Timeout)

{
dbHosts, err := e.Hosts([]types.PublicKey{pubkey3, pubkey2, pubkey1})
Expand Down Expand Up @@ -343,13 +343,13 @@ func TestScan(t *testing.T) {
}

{
lastScanCutoff := time.Now().Add(-cfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement)
now := types.CurrentTimestamp()
lastAnnouncementCutoff := now.Add(-cfg.MinLastAnnouncement)

hosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100)
dbHosts, err := db.HostsForScanning(lastAnnouncementCutoff, 100)
if err != nil {
t.Fatal(err)
}
testutil.Equal(t, "len(hostsForScanning)", 0, len(hosts))
testutil.Equal(t, "len(hostsForScanning)", 0, len(dbHosts))
}
}

0 comments on commit 04df7ba

Please sign in to comment.