Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Add test for batch index building map job #4440

Merged
merged 17 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
63 changes: 25 additions & 38 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,10 @@ func BuildIndices(
sourceUrl string, // where is raw txmeta coming from?
targetUrl string, // where should the resulting indices go?
networkPassphrase string,
startLedger, endLedger uint32,
ledgerRange historyarchive.Range, // inclusive
modules []string,
workerCount int,
) (*IndexBuilder, error) {
if endLedger < startLedger {
return nil, fmt.Errorf(
"nothing to do: start > end (%d > %d)", startLedger, endLedger)
}

L := log.Ctx(ctx)

indexStore, err := Connect(targetUrl)
Expand All @@ -57,24 +52,24 @@ func BuildIndices(
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()

if endLedger == 0 {
latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
return nil, err
if ledgerRange.High == 0 {
var backendErr error
ledgerRange.High, backendErr = ledgerBackend.GetLatestLedgerSequence(ctx)
if backendErr != nil {
return nil, backendErr
}
endLedger = latest
}

if endLedger < startLedger {
return nil, fmt.Errorf("invalid ledger range: end < start (%d < %d)", endLedger, startLedger)
if ledgerRange.High < ledgerRange.Low {
return nil, fmt.Errorf("invalid ledger range: %s", ledgerRange.String())
}

ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
parallel := max(1, workerCount)
ledgerCount := 1 + (ledgerRange.High - ledgerRange.Low) // +1 bc inclusive
parallel := int(max(1, uint32(workerCount)))

startTime := time.Now()
L.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
startLedger, endLedger, ledgerCount)
L.Infof("Creating indices for ledger range: [%d, %d] (%d ledgers)",
ledgerRange.Low, ledgerRange.High, ledgerCount)
L.Infof("Using %d workers", parallel)

// Create a bunch of workers that process ledgers a checkpoint range at a
Expand All @@ -91,6 +86,7 @@ func BuildIndices(
indexBuilder.RegisterModule(ProcessAccounts)
case "accounts_unbacked":
indexBuilder.RegisterModule(ProcessAccountsWithoutBackend)
indexStore.ClearMemory(false)
default:
return indexBuilder, fmt.Errorf("unknown module '%s'", part)
}
Expand All @@ -99,18 +95,14 @@ func BuildIndices(
// Submit the work to the channels, breaking up the range into individual
// checkpoint ranges.
go func() {
// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0
nextCheckpoint := (((startLedger / 64) * 64) + 63)

ledger := startLedger
nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger))
for ledger <= endLedger {
chunk := historyarchive.Range{Low: ledger, High: nextLedger}
L.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High)
ch <- chunk
checkpoints := historyarchive.NewCheckpointManager(0)
for ledger := range ledgerRange.GenerateCheckpoints(checkpoints) {
chunk := checkpoints.GetCheckpointRange(ledger)
chunk.High = min(chunk.High, ledgerRange.High)
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
chunk.Low = max(chunk.Low, ledgerRange.Low)

ledger = nextLedger + 1
nextLedger = min(endLedger, ledger+63) // don't exceed upper bound
fmt.Printf("Submitted [%d, %d] for work\n", chunk.Low, chunk.High)
ch <- chunk
}

close(ch)
Expand All @@ -121,15 +113,14 @@ func BuildIndices(
wg.Go(func() error {
for ledgerRange := range ch {
count := (ledgerRange.High - ledgerRange.Low) + 1
nprocessed := atomic.AddUint64(&processed, uint64(count))

L.Debugf("Working on checkpoint range [%d, %d]",
ledgerRange.Low, ledgerRange.High)
L.Debugf("Working on checkpoint range [%d, %d] (%d ledgers)",
ledgerRange.Low, ledgerRange.High, count)

if err = indexBuilder.Build(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "building indices failed")
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)

// Upload indices once per checkpoint to save memory
Expand Down Expand Up @@ -246,11 +237,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}
}

builder.lastBuiltLedger = uint32(
max(int(builder.lastBuiltLedger),
int(ledgerRange.High)),
)

builder.lastBuiltLedger = max(builder.lastBuiltLedger, ledgerRange.High)
return nil
}

Expand Down Expand Up @@ -360,7 +347,7 @@ func min(a, b uint32) uint32 {
return b
}

func max(a, b int) int {
func max(a, b uint32) uint32 {
if a > b {
return a
}
Expand Down
83 changes: 44 additions & 39 deletions exp/lighthorizon/index/cmd/batch/map/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"runtime"
"strconv"

"github.com/stellar/go/exp/lighthorizon/index"
Expand All @@ -15,7 +16,7 @@ import (

type BatchConfig struct {
historyarchive.Range
TxMetaSourceUrl, TargetUrl string
TxMetaSourceUrl, IndexTargetUrl string
}

const (
Expand All @@ -24,27 +25,12 @@ const (
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"

s3BucketName = "sdf-txmeta-pubnet"
workerCountEnv = "WORKER_COUNT"
)

func NewS3BatchConfig() (*BatchConfig, error) {
jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
}

url := fmt.Sprintf("s3://%s/job_%d?region=%s", s3BucketName, jobIndex, "us-east-1")
if err := os.Setenv(indexTargetUrlEnv, url); err != nil {
return nil, err
}

return NewBatchConfig()
}

func NewBatchConfig() (*BatchConfig, error) {
targetUrl := os.Getenv(indexTargetUrlEnv)
if targetUrl == "" {
indexTargetRootUrl := os.Getenv(indexTargetUrlEnv)
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if indexTargetRootUrl == "" {
return nil, errors.New("required parameter: " + indexTargetUrlEnv)
}

Expand All @@ -57,54 +43,73 @@ func NewBatchConfig() (*BatchConfig, error) {
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+firstCheckpointEnv)
}
if (firstCheckpoint+1)%64 != 0 {
return nil, fmt.Errorf("invalid checkpoint: %d", firstCheckpoint)

checkpoints := historyarchive.NewCheckpointManager(0)
if checkpoints.IsCheckpoint(uint32(firstCheckpoint) - 1) {
return nil, fmt.Errorf(
"%s (%d) must be the first ledger in a checkpoint range",
firstCheckpointEnv, firstCheckpoint)
}

batchSize, err := strconv.ParseUint(os.Getenv(batchSizeEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+batchSizeEnv)
} else if batchSize%uint64(checkpoints.GetCheckpointFrequency()) != 0 {
return nil, fmt.Errorf(
"%s (%d) must be a multiple of checkpoint frequency (%d)",
batchSizeEnv, batchSize, checkpoints.GetCheckpointFrequency())
}

sourceUrl := os.Getenv(txmetaSourceUrlEnv)
if sourceUrl == "" {
txmetaSourceUrl := os.Getenv(txmetaSourceUrlEnv)
if txmetaSourceUrl == "" {
return nil, errors.New("required parameter " + txmetaSourceUrlEnv)
}

log.Debugf("%s: %d", batchSizeEnv, batchSize)
log.Debugf("%s: %d", jobIndexEnv, jobIndex)
log.Debugf("%s: %d", firstCheckpointEnv, firstCheckpoint)
log.Debugf("%s: %v", txmetaSourceUrlEnv, sourceUrl)

startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex)
endCheckpoint := startCheckpoint + uint32(batchSize) - 1
firstLedger := uint32(firstCheckpoint + batchSize*jobIndex)
lastLedger := firstLedger + uint32(batchSize)
return &BatchConfig{
Range: historyarchive.Range{Low: startCheckpoint, High: endCheckpoint},
TxMetaSourceUrl: sourceUrl,
TargetUrl: targetUrl,
Range: historyarchive.Range{Low: firstLedger, High: lastLedger},
TxMetaSourceUrl: txmetaSourceUrl,
IndexTargetUrl: fmt.Sprintf("%s%cjob_%d", indexTargetRootUrl, os.PathSeparator, jobIndex),
}, nil
}

func main() {
// log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
// log.SetLevel(log.DebugLevel)

batch, err := NewBatchConfig()
if err != nil {
panic(err)
}

var workerCount int
workerCountStr := os.Getenv(workerCountEnv)
if workerCountStr == "" {
workerCount = runtime.NumCPU()
} else {
workerCountParsed, innerErr := strconv.ParseUint(workerCountStr, 10, 8)
if innerErr != nil {
panic(errors.Wrapf(innerErr,
"invalid worker count parameter (%s)", workerCountStr))
}
workerCount = int(workerCountParsed)
}

log.Infof("Uploading ledger range [%d, %d] to %s",
batch.Range.Low, batch.Range.High, batch.TargetUrl)
batch.Range.Low, batch.Range.High, batch.IndexTargetUrl)

if _, err := index.BuildIndices(
context.Background(),
batch.TxMetaSourceUrl,
batch.TargetUrl,
batch.IndexTargetUrl,
network.TestNetworkPassphrase,
batch.Low, batch.High,
[]string{"transactions", "accounts_unbacked"},
1,
batch.Range,
[]string{
"accounts_unbacked",
"transactions",
},
workerCount,
); err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions exp/lighthorizon/index/cmd/fixtures/latest
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1410367
Loading