Skip to content

Commit

Permalink
services/horizon: Add horizon db detect-gaps command (#3672)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Jun 8, 2021
1 parent 96920ca commit 812c0b5
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 0 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Fix bug in `horizon db reingest range` command, which would throw a duplicate entry conflict error from the DB. ([3661](https://github.com/stellar/go/pull/3661)).
* Fix bug in DB metrics preventing Horizon from starting when read-only replica middleware is enabled. ([3668](https://github.com/stellar/go/pull/3668)).
* Add new command `horizon db detect-gaps`, which detects ingestion gaps in the database. The command prints out the `db reingest` commands to run in order to fill the gaps found.

## v2.4.0

Expand Down
37 changes: 37 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/services/horizon/internal/db2/history"

horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/schema"
Expand Down Expand Up @@ -295,6 +296,41 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
)
}

var dbDetectGapsCmd = &cobra.Command{
Use: "detect-gaps",
Short: "detects ingestion gaps in Horizon's database",
Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps",
Run: func(cmd *cobra.Command, args []string) {
requireAndSetFlag(horizon.DatabaseURLFlagName)
if len(args) != 0 {
cmd.Usage()
os.Exit(1)
}
gaps, err := runDBDetectGaps(*config)
if err != nil {
log.Fatal(err)
}
if len(gaps) == 0 {
hlog.Info("No gaps found")
return
}
fmt.Println("Horizon commands to run in order to fill in the gaps:")
cmdname := os.Args[0]
for _, g := range gaps {
fmt.Printf("%s db reingest %d %d\n", cmdname, g.StartSequence, g.EndSequence)
}
},
}

func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) {
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return nil, err
}
q := &history.Q{horizonSession}
return q.GetLedgerGaps(context.Background())
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
Expand All @@ -311,6 +347,7 @@ func init() {
dbMigrateCmd,
dbReapCmd,
dbReingestCmd,
dbDetectGapsCmd,
)
dbReingestCmd.AddCommand(dbReingestRangeCmd)
}
16 changes: 16 additions & 0 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ func (q *Q) InsertLedger(ctx context.Context,
return result.RowsAffected()
}

// GetLedgerGaps, obtains ingestion gaps in the history_ledgers table.
// Returns the gaps and error.
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerGap, error) {
var result []LedgerGap
err := q.SelectRaw(ctx, &result, `
SELECT sequence + 1 AS gap_start,
next_number - 1 AS gap_end
FROM (
SELECT sequence,
LEAD(sequence) OVER (ORDER BY sequence) AS next_number
FROM history_ledgers
) number
WHERE sequence + 1 <> next_number;`)
return result, err
}

func ledgerHeaderToMap(
ledger xdr.LedgerHeaderHistoryEntry,
successTxsCount int,
Expand Down
126 changes: 126 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package history

import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -148,3 +151,126 @@ func TestInsertLedger(t *testing.T) {
tt.Assert.True(exists)
tt.Assert.Equal(expectedLedger.LedgerHash, hash)
}

func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) {
// generate random hashes to avoid insert clashes due to UNIQUE constraints
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
ledgerHashHex := fmt.Sprintf("%064x", rnd.Uint32())
previousLedgerHashHex := fmt.Sprintf("%064x", rnd.Uint32())

expectedLedger := Ledger{
Sequence: int32(seq),
LedgerHash: ledgerHashHex,
PreviousLedgerHash: null.NewString(previousLedgerHashHex, true),
TotalOrderID: TotalOrderID{toid.New(int32(69859), 0, 0).ToInt64()},
ImporterVersion: 123,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
TxSetOperationCount: new(int32),
OperationCount: 23,
TotalCoins: 23451,
FeePool: 213,
BaseReserve: 687,
MaxTxSetSize: 345,
ProtocolVersion: 12,
BaseFee: 100,
ClosedAt: time.Now().UTC().Truncate(time.Second),
}
*expectedLedger.SuccessfulTransactionCount = 12
*expectedLedger.FailedTransactionCount = 3
*expectedLedger.TxSetOperationCount = 26

var ledgerHash, previousLedgerHash xdr.Hash

written, err := hex.Decode(ledgerHash[:], []byte(expectedLedger.LedgerHash))
tt.Assert.NoError(err)
tt.Assert.Equal(len(ledgerHash), written)

written, err = hex.Decode(previousLedgerHash[:], []byte(expectedLedger.PreviousLedgerHash.String))
tt.Assert.NoError(err)
tt.Assert.Equal(len(previousLedgerHash), written)

ledgerEntry := xdr.LedgerHeaderHistoryEntry{
Hash: ledgerHash,
Header: xdr.LedgerHeader{
LedgerVersion: 12,
PreviousLedgerHash: previousLedgerHash,
LedgerSeq: xdr.Uint32(expectedLedger.Sequence),
TotalCoins: xdr.Int64(expectedLedger.TotalCoins),
FeePool: xdr.Int64(expectedLedger.FeePool),
BaseFee: xdr.Uint32(expectedLedger.BaseFee),
BaseReserve: xdr.Uint32(expectedLedger.BaseReserve),
MaxTxSetSize: xdr.Uint32(expectedLedger.MaxTxSetSize),
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(expectedLedger.ClosedAt.Unix()),
},
},
}
ledgerHeaderBase64, err := xdr.MarshalBase64(ledgerEntry.Header)
tt.Assert.NoError(err)
expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true)
rowsAffected, err := q.InsertLedger(tt.Ctx,
ledgerEntry,
12,
3,
23,
26,
int(expectedLedger.ImporterVersion),
)
tt.Assert.NoError(err)
tt.Assert.Equal(rowsAffected, int64(1))
}

func TestGetLedgerGaps(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

q := &Q{tt.HorizonSession()}

// The DB is empty, so there shouldn't be any gaps
gaps, err := q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

// Lets insert a few gaps and make sure they are detected incrementally
insertLedgerWithSequence(tt, q, 4)
insertLedgerWithSequence(tt, q, 5)
insertLedgerWithSequence(tt, q, 6)
insertLedgerWithSequence(tt, q, 7)

// since there is a single ledger cluster, there should still be no gaps
// (we don't start from ledger 0)
gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

var expectedGaps []LedgerGap

insertLedgerWithSequence(tt, q, 99)
insertLedgerWithSequence(tt, q, 100)
insertLedgerWithSequence(tt, q, 101)
insertLedgerWithSequence(tt, q, 102)

gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{8, 98})
tt.Assert.Equal(expectedGaps, gaps)

// Yet another gap, this time to a single-ledger cluster
insertLedgerWithSequence(tt, q, 1000)

gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{103, 999})
tt.Assert.Equal(expectedGaps, gaps)

// Yet another gap, this time the gap only contains a ledger
insertLedgerWithSequence(tt, q, 1002)
gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{1001, 1001})
tt.Assert.Equal(expectedGaps, gaps)

}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ type LedgerCache struct {
queued map[int32]struct{}
}

type LedgerGap struct {
StartSequence uint32 `db:"gap_start"`
EndSequence uint32 `db:"gap_end"`
}

// LedgersQ is a helper struct to aid in configuring queries that loads
// slices of Ledger structs.
type LedgersQ struct {
Expand Down

0 comments on commit 812c0b5

Please sign in to comment.