Skip to content

Commit

Permalink
exp/lighthorizon: Add test for batch index building map job (#4440)
Browse files Browse the repository at this point in the history
* Modify single-process test to generalize to whatever fixture data exists
This also adds a test to check that single-process works on a non-checkpoint
starting point which is important.

* Fix map program to properly build sub-paths depending on its job index
Previously, this only happened for explicitly S3 backends.

* Make map job default to using all CPUs
* Stop clearing indices from memory if using unbacked module
* Use historyarchive.CheckpointManager for all checkpoint math
* Update lastBuiltLedger w/ safely concurrent writes
  • Loading branch information
Shaptic authored Jun 30, 2022
1 parent 00ae4ed commit 740bc0b
Show file tree
Hide file tree
Showing 330 changed files with 674 additions and 297 deletions.
104 changes: 46 additions & 58 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math"
"os"
"sync"
"sync/atomic"
"time"

Expand All @@ -24,58 +25,52 @@ func BuildIndices(
sourceUrl string, // where is raw txmeta coming from?
targetUrl string, // where should the resulting indices go?
networkPassphrase string,
startLedger, endLedger uint32,
ledgerRange historyarchive.Range, // inclusive
modules []string,
workerCount int,
) (*IndexBuilder, error) {
if endLedger < startLedger {
return nil, fmt.Errorf(
"nothing to do: start > end (%d > %d)", startLedger, endLedger)
}

L := log.Ctx(ctx)

indexStore, indexErr := Connect(targetUrl)
if indexErr != nil {
return nil, indexErr
indexStore, err := Connect(targetUrl)
if err != nil {
return nil, err
}

// We use historyarchive as a backend here just to abstract away dealing
// with the filesystem directly.
source, backendErr := historyarchive.ConnectBackend(
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: ctx,
NetworkPassphrase: networkPassphrase,
S3Region: "us-east-1",
},
)
if backendErr != nil {
return nil, backendErr
if err != nil {
return nil, err
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()

if endLedger == 0 {

latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
return nil, err
if ledgerRange.High == 0 {
var backendErr error
ledgerRange.High, backendErr = ledgerBackend.GetLatestLedgerSequence(ctx)
if backendErr != nil {
return nil, backendErr
}
endLedger = latest
}

if endLedger < startLedger {
return nil, fmt.Errorf("invalid ledger range: end < start (%d < %d)", endLedger, startLedger)
if ledgerRange.High < ledgerRange.Low {
return nil, fmt.Errorf("invalid ledger range: %s", ledgerRange.String())
}

ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
parallel := max(1, workerCount)
ledgerCount := 1 + (ledgerRange.High - ledgerRange.Low) // +1 bc inclusive
parallel := int(max(1, uint32(workerCount)))

startTime := time.Now()
L.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
startLedger, endLedger, ledgerCount)
L.Infof("Creating indices for ledger range: [%d, %d] (%d ledgers)",
ledgerRange.Low, ledgerRange.High, ledgerCount)
L.Infof("Using %d workers", parallel)

// Create a bunch of workers that process ledgers a checkpoint range at a
Expand All @@ -92,6 +87,7 @@ func BuildIndices(
indexBuilder.RegisterModule(ProcessAccounts)
case "accounts_unbacked":
indexBuilder.RegisterModule(ProcessAccountsWithoutBackend)
indexStore.ClearMemory(false)
default:
return indexBuilder, fmt.Errorf("unknown module '%s'", part)
}
Expand All @@ -100,18 +96,13 @@ func BuildIndices(
// Submit the work to the channels, breaking up the range into individual
// checkpoint ranges.
go func() {
// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0
nextCheckpoint := (((startLedger / 64) * 64) + 63)

ledger := startLedger
nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger))
for ledger <= endLedger {
chunk := historyarchive.Range{Low: ledger, High: nextLedger}
L.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High)
ch <- chunk
checkpoints := historyarchive.NewCheckpointManager(0)
for ledger := range ledgerRange.GenerateCheckpoints(checkpoints) {
chunk := checkpoints.GetCheckpointRange(ledger)
chunk.High = min(chunk.High, ledgerRange.High)
chunk.Low = max(chunk.Low, ledgerRange.Low)

ledger = nextLedger + 1
nextLedger = min(endLedger, ledger+63) // don't exceed upper bound
ch <- chunk
}

close(ch)
Expand All @@ -122,15 +113,14 @@ func BuildIndices(
wg.Go(func() error {
for ledgerRange := range ch {
count := (ledgerRange.High - ledgerRange.Low) + 1
nprocessed := atomic.AddUint64(&processed, uint64(count))

L.Debugf("Working on checkpoint range [%d, %d]",
ledgerRange.Low, ledgerRange.High)
L.Debugf("Working on checkpoint range [%d, %d] (%d ledgers)",
ledgerRange.Low, ledgerRange.High, count)

if err := indexBuilder.Build(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "building indices failed")
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)

// Upload indices once per checkpoint to save memory
Expand Down Expand Up @@ -174,7 +164,9 @@ type IndexBuilder struct {
store Store
ledgerBackend ledgerbackend.LedgerBackend
networkPassphrase string
lastBuiltLedger uint32

lastBuiltLedgerWriteLock sync.Mutex
lastBuiltLedger uint32

modules []Module
}
Expand Down Expand Up @@ -247,35 +239,31 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}
}

builder.lastBuiltLedger = uint32(
max(int(builder.lastBuiltLedger),
int(ledgerRange.High)),
)
builder.lastBuiltLedgerWriteLock.Lock()
builder.lastBuiltLedger = max(builder.lastBuiltLedger, ledgerRange.High)
builder.lastBuiltLedgerWriteLock.Unlock()

return nil
}

func (b *IndexBuilder) Watch(ctx context.Context) error {
latestLedger, seqErr := b.ledgerBackend.GetLatestLedgerSequence(ctx)
if seqErr != nil {
log.Errorf("Failed to retrieve latest ledger: %v", seqErr)
return seqErr
func (builder *IndexBuilder) Watch(ctx context.Context) error {
latestLedger, err := builder.ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
log.Errorf("Failed to retrieve latest ledger: %v", err)
return err
}
nextLedger := builder.lastBuiltLedger + 1

nextLedger := b.lastBuiltLedger + 1

log.Infof("Catching up to latest ledger: (%d, %d]",
nextLedger, latestLedger)

if err := b.Build(ctx, historyarchive.Range{
log.Infof("Catching up to latest ledger: (%d, %d]", nextLedger, latestLedger)
if err = builder.Build(ctx, historyarchive.Range{
Low: nextLedger,
High: latestLedger,
}); err != nil {
log.Errorf("Initial catchup failed: %v", err)
}

for {
nextLedger = b.lastBuiltLedger + 1
nextLedger = builder.lastBuiltLedger + 1
log.Infof("Awaiting next ledger (%d)", nextLedger)

// To keep the MVP simple, let's just naively poll the backend until the
Expand Down Expand Up @@ -304,7 +292,7 @@ func (b *IndexBuilder) Watch(ctx context.Context) error {
return errors.Wrap(timedCtx.Err(), "awaiting next ledger failed")

default:
buildErr := b.Build(timedCtx, historyarchive.Range{
buildErr := builder.Build(timedCtx, historyarchive.Range{
Low: nextLedger,
High: nextLedger,
})
Expand Down Expand Up @@ -361,7 +349,7 @@ func min(a, b uint32) uint32 {
return b
}

func max(a, b int) int {
func max(a, b uint32) uint32 {
if a > b {
return a
}
Expand Down
83 changes: 44 additions & 39 deletions exp/lighthorizon/index/cmd/batch/map/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"runtime"
"strconv"

"github.com/stellar/go/exp/lighthorizon/index"
Expand All @@ -15,7 +16,7 @@ import (

type BatchConfig struct {
historyarchive.Range
TxMetaSourceUrl, TargetUrl string
TxMetaSourceUrl, IndexTargetUrl string
}

const (
Expand All @@ -24,27 +25,12 @@ const (
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"

s3BucketName = "sdf-txmeta-pubnet"
workerCountEnv = "WORKER_COUNT"
)

func NewS3BatchConfig() (*BatchConfig, error) {
jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
}

url := fmt.Sprintf("s3://%s/job_%d?region=%s", s3BucketName, jobIndex, "us-east-1")
if err := os.Setenv(indexTargetUrlEnv, url); err != nil {
return nil, err
}

return NewBatchConfig()
}

func NewBatchConfig() (*BatchConfig, error) {
targetUrl := os.Getenv(indexTargetUrlEnv)
if targetUrl == "" {
indexTargetRootUrl := os.Getenv(indexTargetUrlEnv)
if indexTargetRootUrl == "" {
return nil, errors.New("required parameter: " + indexTargetUrlEnv)
}

Expand All @@ -57,54 +43,73 @@ func NewBatchConfig() (*BatchConfig, error) {
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+firstCheckpointEnv)
}
if (firstCheckpoint+1)%64 != 0 {
return nil, fmt.Errorf("invalid checkpoint: %d", firstCheckpoint)

checkpoints := historyarchive.NewCheckpointManager(0)
if !checkpoints.IsCheckpoint(uint32(firstCheckpoint - 1)) {
return nil, fmt.Errorf(
"%s (%d) must be the first ledger in a checkpoint range",
firstCheckpointEnv, firstCheckpoint)
}

batchSize, err := strconv.ParseUint(os.Getenv(batchSizeEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+batchSizeEnv)
} else if batchSize%uint64(checkpoints.GetCheckpointFrequency()) != 0 {
return nil, fmt.Errorf(
"%s (%d) must be a multiple of checkpoint frequency (%d)",
batchSizeEnv, batchSize, checkpoints.GetCheckpointFrequency())
}

sourceUrl := os.Getenv(txmetaSourceUrlEnv)
if sourceUrl == "" {
txmetaSourceUrl := os.Getenv(txmetaSourceUrlEnv)
if txmetaSourceUrl == "" {
return nil, errors.New("required parameter " + txmetaSourceUrlEnv)
}

log.Debugf("%s: %d", batchSizeEnv, batchSize)
log.Debugf("%s: %d", jobIndexEnv, jobIndex)
log.Debugf("%s: %d", firstCheckpointEnv, firstCheckpoint)
log.Debugf("%s: %v", txmetaSourceUrlEnv, sourceUrl)

startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex)
endCheckpoint := startCheckpoint + uint32(batchSize) - 1
firstLedger := uint32(firstCheckpoint + batchSize*jobIndex)
lastLedger := firstLedger + uint32(batchSize) - 1
return &BatchConfig{
Range: historyarchive.Range{Low: startCheckpoint, High: endCheckpoint},
TxMetaSourceUrl: sourceUrl,
TargetUrl: targetUrl,
Range: historyarchive.Range{Low: firstLedger, High: lastLedger},
TxMetaSourceUrl: txmetaSourceUrl,
IndexTargetUrl: fmt.Sprintf("%s%cjob_%d", indexTargetRootUrl, os.PathSeparator, jobIndex),
}, nil
}

func main() {
// log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
// log.SetLevel(log.DebugLevel)

batch, err := NewBatchConfig()
if err != nil {
panic(err)
}

var workerCount int
workerCountStr := os.Getenv(workerCountEnv)
if workerCountStr == "" {
workerCount = runtime.NumCPU()
} else {
workerCountParsed, innerErr := strconv.ParseUint(workerCountStr, 10, 8)
if innerErr != nil {
panic(errors.Wrapf(innerErr,
"invalid worker count parameter (%s)", workerCountStr))
}
workerCount = int(workerCountParsed)
}

log.Infof("Uploading ledger range [%d, %d] to %s",
batch.Range.Low, batch.Range.High, batch.TargetUrl)
batch.Range.Low, batch.Range.High, batch.IndexTargetUrl)

if _, err := index.BuildIndices(
context.Background(),
batch.TxMetaSourceUrl,
batch.TargetUrl,
batch.IndexTargetUrl,
network.TestNetworkPassphrase,
batch.Low, batch.High,
[]string{"transactions", "accounts_unbacked"},
1,
batch.Range,
[]string{
"accounts_unbacked",
"transactions",
},
workerCount,
); err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions exp/lighthorizon/index/cmd/fixtures/latest
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1410367
Loading

0 comments on commit 740bc0b

Please sign in to comment.