Skip to content

Commit

Permalink
exp/lighthorizon: Add basic scaffolding for metrics. (#4456)
Browse files Browse the repository at this point in the history
* Use correct network passphrase when populating transaction
* Add scaffolding for Prom/log metrics and some example ones
* Misc. clarifications and fixes to the index builder
  • Loading branch information
Shaptic authored Jul 11, 2022
1 parent 17e0db9 commit 2f0fa9c
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 59 deletions.
54 changes: 45 additions & 9 deletions exp/lighthorizon/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package actions

import (
"encoding/hex"
"github.com/stellar/go/support/log"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stellar/go/support/log"

"github.com/stellar/go/exp/lighthorizon/adapters"
"github.com/stellar/go/exp/lighthorizon/archive"
Expand All @@ -15,8 +20,30 @@ import (
"github.com/stellar/go/toid"
)

var (
requestCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "horizon_lite_request_count",
Help: "How many requests have occurred?",
})
requestTime = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "horizon_lite_request_duration",
Help: "How long do requests take?",
Buckets: append(
prometheus.LinearBuckets(0, 50, 20),
prometheus.LinearBuckets(1000, 1000, 8)...,
),
})
)

func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
defer func() {
duration := time.Since(start)
requestTime.Observe(float64(duration.Milliseconds()))
}()
requestCount.Inc()

// For _links rendering, imitate horizon.stellar.org links for horizon-cmp
r.URL.Scheme = "http"
r.URL.Host = "localhost:8080"
Expand Down Expand Up @@ -58,29 +85,37 @@ func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(h
}

if txId != "" {
// if 'id' is on request, it overrides any paging cursor that may be on request.
// if 'id' is on request, it overrides any paging parameters on the request.
var b []byte
b, err = hex.DecodeString(txId)
if err != nil {
sendErrorResponse(w, http.StatusBadRequest, "Invalid transaction id request parameter, not valid hex encoding")
sendErrorResponse(w, http.StatusBadRequest,
"Invalid transaction id request parameter, not valid hex encoding")
return
}
if len(b) != 32 {
sendErrorResponse(w, http.StatusBadRequest, "Invalid transaction id request parameter, the encoded hex value must decode to length of 32 bytes")
sendErrorResponse(w, http.StatusBadRequest,
"Invalid transaction id request parameter, the encoded hex value must decode to length of 32 bytes")
return
}
var hash [32]byte
copy(hash[:], b)

if paginate.Cursor, err = indexStore.TransactionTOID(hash); err != nil {
var foundTOID int64
foundTOID, err = indexStore.TransactionTOID(hash)
if err == io.EOF {
log.Error(err)
sendErrorResponse(w, http.StatusNotFound,
fmt.Sprintf("Transaction with ID %x does not exist", hash))
return
} else if err != nil {
log.Error(err)
sendErrorResponse(w, http.StatusInternalServerError, "")
}
if err == io.EOF {
page.PopulateLinks()
sendPageResponse(w, page)
return
}

paginate.Cursor = foundTOID
paginate.Limit = 1
}

//TODO - implement paginate.Order(asc/desc)
Expand All @@ -93,6 +128,7 @@ func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(h

for _, txn := range txns {
var response hProtocol.Transaction
txn.NetworkPassphrase = archiveWrapper.Passphrase
response, err = adapters.PopulateTransaction(r, &txn)
if err != nil {
log.Error(err)
Expand Down
2 changes: 2 additions & 0 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, erro
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface
9 changes: 8 additions & 1 deletion exp/lighthorizon/common/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"encoding/hex"
"errors"

"github.com/stellar/go/network"
"github.com/stellar/go/toid"
Expand All @@ -13,10 +14,16 @@ type Transaction struct {
TransactionResult *xdr.TransactionResult
LedgerHeader *xdr.LedgerHeader
TxIndex int32

NetworkPassphrase string
}

func (o *Transaction) TransactionHash() (string, error) {
hash, err := network.HashTransactionInEnvelope(*o.TransactionEnvelope, network.PublicNetworkPassphrase)
if o.NetworkPassphrase == "" {
return "", errors.New("network passphrase unspecified")
}

hash, err := network.HashTransactionInEnvelope(*o.TransactionEnvelope, o.NetworkPassphrase)
if err != nil {
return "", err
}
Expand Down
41 changes: 21 additions & 20 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ func BuildIndices(
modules []string,
workerCount int,
) (*IndexBuilder, error) {
L := log.Ctx(ctx)
L := log.Ctx(ctx).WithField("service", "builder")

indexStore, err := Connect(targetUrl)
indexStore, err := ConnectWithConfig(StoreConfig{
Url: targetUrl,
Workers: uint32(workerCount),
Log: L.WithField("subservice", "index"),
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,8 +103,8 @@ func BuildIndices(
checkpoints := historyarchive.NewCheckpointManager(0)
for ledger := range ledgerRange.GenerateCheckpoints(checkpoints) {
chunk := checkpoints.GetCheckpointRange(ledger)
chunk.High = min(chunk.High, ledgerRange.High)
chunk.Low = max(chunk.Low, ledgerRange.Low)
chunk.High = min(chunk.High, ledgerRange.High) // don't exceed upper bound
chunk.Low = max(chunk.Low, ledgerRange.Low) // nor the lower bound

ch <- chunk
}
Expand All @@ -117,11 +121,15 @@ func BuildIndices(
ledgerRange.Low, ledgerRange.High, count)

if err := indexBuilder.Build(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "building indices failed")
return errors.Wrapf(err,
"building indices for ledger range [%d, %d] failed",
ledgerRange.Low, ledgerRange.High)
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
if nprocessed%19 == 0 {
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
}

// Upload indices once per checkpoint to save memory
if err := indexStore.Flush(); err != nil {
Expand All @@ -136,19 +144,19 @@ func BuildIndices(
return indexBuilder, errors.Wrap(err, "one or more workers failed")
}

printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime)

// Assertion for testing
if processed != uint64(ledgerCount) {
L.Fatalf("processed %d but expected %d", processed, ledgerCount)
}
printProgress("Reading ledgers", processed, uint64(ledgerCount), startTime)

L.Infof("Processed %d ledgers via %d workers", processed, parallel)
L.Infof("Uploading indices to %s", targetUrl)
if err := indexStore.Flush(); err != nil {
return indexBuilder, errors.Wrap(err, "flushing indices failed")
}

// Assertion for testing
if processed != uint64(ledgerCount) {
L.Warnf("processed %d but expected %d", processed, ledgerCount)
}

return indexBuilder, nil
}

Expand Down Expand Up @@ -214,7 +222,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
ledger, err := builder.ledgerBackend.GetLedger(ctx, ledgerSeq)
if err != nil {
if !os.IsNotExist(err) {
log.WithField("error", err).Errorf("error getting ledger %d", ledgerSeq)
log.Errorf("error getting ledger %d: %v", ledgerSeq, err)
}
return err
}
Expand Down Expand Up @@ -313,13 +321,6 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
}

func printProgress(prefix string, done, total uint64, startTime time.Time) {
// This should never happen, more of a runtime assertion for now.
// We can remove it when production-ready.
if done > total {
panic(fmt.Errorf("error for %s: done > total (%d > %d)",
prefix, done, total))
}

progress := float64(done) / float64(total)
elapsed := time.Since(startTime)

Expand Down
30 changes: 18 additions & 12 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,47 @@ import (
)

func Connect(backendUrl string) (Store, error) {
parsed, err := url.Parse(backendUrl)
return ConnectWithConfig(StoreConfig{Url: backendUrl})
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
parsed, err := url.Parse(config.Url)
if err != nil {
return nil, err
}
switch parsed.Scheme {
case "s3":
config := &aws.Config{}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
config.Region = aws.String(region)
awsConfig.Region = aws.String(region)
}

return NewS3Store(config, parsed.Path, 20)
config.Url = parsed.Path
return NewS3Store(awsConfig, config)

case "file":
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), 20)
config.Url = filepath.Join(parsed.Host, parsed.Path)
return NewFileStore(config)

default:
return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)",
parsed.Scheme, backendUrl)
parsed.Scheme, config.Url)
}
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := backend.NewFileBackend(dir, parallel)
func NewFileStore(config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(config.Url, config.Workers)
if err != nil {
return nil, err
}
return NewStore(backend)
return NewStore(backend, config)
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, prefix, parallel)
func NewS3Store(awsConfig *aws.Config, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, indexConfig.Url, indexConfig.Workers)
if err != nil {
return nil, err
}
return NewStore(backend)
return NewStore(backend, indexConfig)
}
Loading

0 comments on commit 2f0fa9c

Please sign in to comment.