Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more prometheus metrics #1054

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY . /app

ENV CGO_ENABLED=0

RUN make install
RUN make release-install TAGS=monitoring

# FINAL IMAGE
FROM alpine as final
Expand Down
120 changes: 120 additions & 0 deletions monitoring/db_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package monitoring

import (
"context"
"errors"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

const (
dbSizeMetric = "total_db_size"

assetProofSizesHistogram = "asset_proofs_sizes"
)

// dbCollector is a Prometheus collector that exports metrics related to the
// daemon's database.
type dbCollector struct {
collectMx sync.Mutex

cfg *PrometheusConfig
registry *prometheus.Registry

dbSize prometheus.Gauge
proofSizesHistogram prometheus.Histogram
}

func newDbCollector(cfg *PrometheusConfig,
registry *prometheus.Registry) (*dbCollector, error) {

if cfg == nil {
return nil, errors.New("db collector prometheus cfg is nil")
}

if cfg.AssetStore == nil {
return nil, errors.New("db collector asset store is nil")
}

return &dbCollector{
cfg: cfg,
registry: registry,
dbSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: dbSizeMetric,
Help: "Total size of db",
},
),
proofSizesHistogram: newProofSizesHistogram(),
}, nil
}

// newProofSizesHistogram generates a fresh instance of the proof sizes
// histogram.
func newProofSizesHistogram() (h prometheus.Histogram) {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: assetProofSizesHistogram,
Help: "Histogram of asset proof sizes",
Buckets: prometheus.ExponentialBuckets(
2, 2, 32,
),
},
)
}

// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once the
// last descriptor has been sent.
//
// NOTE: Part of the prometheus.Collector interface.
func (a *dbCollector) Describe(ch chan<- *prometheus.Desc) {
a.collectMx.Lock()
defer a.collectMx.Unlock()

a.dbSize.Describe(ch)
a.proofSizesHistogram.Describe(ch)
}

// Collect is called by the Prometheus registry when collecting metrics.
//
// NOTE: Part of the prometheus.Collector interface.
func (a *dbCollector) Collect(ch chan<- prometheus.Metric) {
a.collectMx.Lock()
defer a.collectMx.Unlock()

ctxdb, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

// Fetch the db size.
dbSize, err := a.cfg.AssetStore.AssetsDBSize(ctxdb)
if err != nil {
log.Errorf("unable to fetch db size: %v", err)
return
}

a.dbSize.Set(float64(dbSize))

// Fetch all proof sizes.
proofSizes, err := a.cfg.AssetStore.FetchAssetProofsSizes(ctxdb)
if err != nil {
log.Errorf("unable to fetch asset proofs: %v", err)
return
}

// We use the histogram in a non-standard way. Everytime we collect data
// we ask the database to return all proof sizes and then we feed them
// to the histogram. That's why on every different pass we need to reset
// the histogram instance, in order to not duplicate data on every
// prometheus pass.
a.proofSizesHistogram = newProofSizesHistogram()

// We'll feed the proof sizes to the histogram.
for _, p := range proofSizes {
a.proofSizesHistogram.Observe(p.ProofFileLength)
}

a.proofSizesHistogram.Collect(ch)
a.dbSize.Collect(ch)
}
6 changes: 6 additions & 0 deletions monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (p *PrometheusExporter) Start() error {
}
p.registry.MustRegister(gardenCollector)

dbCollector, err := newDbCollector(p.config, p.registry)
if err != nil {
return err
}
p.registry.MustRegister(dbCollector)

// Make ensure that all metrics exist when collecting and querying.
serverMetrics.InitializeMetrics(p.config.RPCServer)

Expand Down
19 changes: 16 additions & 3 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,25 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
lndServices *lndclient.LndServices, enableChannelFeatures bool,
mainErrChan chan<- error) (*tap.Config, error) {

var err error
var (
err error
db databaseBackend
dbType sqlc.BackendType
)

// Now that we know where the database will live, we'll go ahead and
// open up the default implementation of it.
var db databaseBackend
switch cfg.DatabaseBackend {
case DatabaseBackendSqlite:
dbType = sqlc.BackendTypeSqlite

cfgLogger.Infof("Opening sqlite3 database at: %v",
cfg.Sqlite.DatabaseFileName)
db, err = tapdb.NewSqliteStore(cfg.Sqlite)

case DatabaseBackendPostgres:
dbType = sqlc.BackendTypePostgres

cfgLogger.Infof("Opening postgres database at: %v",
cfg.Postgres.DSN(true))
db, err = tapdb.NewPostgresStore(cfg.Postgres)
Expand Down Expand Up @@ -85,6 +92,12 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
},
)

metaDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.MetaStore {
return db.WithTx(tx)
},
)

addrBookDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.AddrBook {
return db.WithTx(tx)
Expand All @@ -94,7 +107,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
tapdbAddrBook := tapdb.NewTapAddressBook(
addrBookDB, &tapChainParams, defaultClock,
)
assetStore := tapdb.NewAssetStore(assetDB, defaultClock)
assetStore := tapdb.NewAssetStore(assetDB, metaDB, defaultClock, dbType)

keyRing := tap.NewLndRpcKeyRing(lndServices)
walletAnchor := tap.NewLndRpcWalletAnchor(lndServices)
Expand Down
8 changes: 7 additions & 1 deletion tapdb/asset_minting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ func newAssetStoreFromDB(db *BaseDB) (*AssetMintingStore, *AssetStore) {
return db.WithTx(tx)
}

metaTxCreator := func(tx *sql.Tx) MetaStore {
return db.WithTx(tx)
}

assetMintingDB := NewTransactionExecutor(db, txCreator)
assetsDB := NewTransactionExecutor(db, activeTxCreator)
metaDB := NewTransactionExecutor(db, metaTxCreator)

testClock := clock.NewTestClock(time.Now())

return NewAssetMintingStore(assetMintingDB),
NewAssetStore(assetsDB, testClock)
NewAssetStore(assetsDB, metaDB, testClock, db.Backend())
}

func assertBatchState(t *testing.T, batch *tapgarden.MintingBatch,
Expand Down
110 changes: 109 additions & 1 deletion tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type (
// script key.
AssetProof = sqlc.FetchAssetProofsRow

// AssetProofSize is the asset proof size for a given asset, identified
// by its script key.
AssetProofSize = sqlc.FetchAssetProofsSizesRow

// AssetProofI is identical to AssetProof but is used for the case
// where the proofs for a specific asset are fetched.
AssetProofI = sqlc.FetchAssetProofRow
Expand Down Expand Up @@ -195,6 +199,10 @@ type ActiveAssetsStore interface {
// disk.
FetchAssetProofs(ctx context.Context) ([]AssetProof, error)

// FetchAssetsProofsSizes fetches all the asset proofs lengths that are
// stored on disk.
FetchAssetProofsSizes(ctx context.Context) ([]AssetProofSize, error)

// FetchAssetProof fetches the asset proof for a given asset identified
// by its script key.
FetchAssetProof(ctx context.Context,
Expand Down Expand Up @@ -339,6 +347,18 @@ type ActiveAssetsStore interface {
assetID []byte) (sqlc.FetchAssetMetaForAssetRow, error)
}

// MetaStore is a sub-set of the main sqlc.Querier interface that contains
// methods related to metadata of the daemon.
type MetaStore interface {
// AssetsDBSize returns the total size of the taproot assets sqlite
// database.
AssetsDBSizeSqlite(ctx context.Context) (int32, error)

// AssetsDBSize returns the total size of the taproot assets postgres
// database.
AssetsDBSizePostgres(ctx context.Context) (int64, error)
}

// AssetBalance holds a balance query result for a particular asset or all
// assets tracked by this daemon.
type AssetBalance struct {
Expand Down Expand Up @@ -378,29 +398,46 @@ type BatchedAssetStore interface {
BatchedTx[ActiveAssetsStore]
}

// BatchedMetaStore combines the MetaStore interface with the BatchedTx
// interface, allowing for multiple queries to be executed in a single SQL
// transaction.
type BatchedMetaStore interface {
MetaStore

BatchedTx[MetaStore]
}

// AssetStore is used to query for the set of pending and confirmed assets.
type AssetStore struct {
db BatchedAssetStore

metaDb BatchedMetaStore

// eventDistributor is an event distributor that will be used to notify
// subscribers about new proofs that are added to the archiver.
eventDistributor *fn.EventDistributor[proof.Blob]

clock clock.Clock

txHeights *lru.Cache[chainhash.Hash, cacheableBlockHeight]

dbType sqlc.BackendType
}

// NewAssetStore creates a new AssetStore from the specified BatchedAssetStore
// interface.
func NewAssetStore(db BatchedAssetStore, clock clock.Clock) *AssetStore {
func NewAssetStore(db BatchedAssetStore, metaDB BatchedMetaStore,
clock clock.Clock, dbType sqlc.BackendType) *AssetStore {

return &AssetStore{
db: db,
metaDb: metaDB,
eventDistributor: fn.NewEventDistributor[proof.Blob](),
clock: clock,
txHeights: lru.NewCache[chainhash.Hash, cacheableBlockHeight](
10_000,
),
dbType: dbType,
}
}

Expand Down Expand Up @@ -1171,6 +1208,38 @@ func (a *AssetStore) FetchManagedUTXOs(ctx context.Context) (
return managedUtxos, nil
}

// FetchAssetProofsSizes fetches the sizes of the proofs in the db.
func (a *AssetStore) FetchAssetProofsSizes(
ctx context.Context) ([]AssetProofSize, error) {

var pSizes []AssetProofSize

readOpts := NewAssetStoreReadTx()
dbErr := a.db.ExecTx(ctx, &readOpts, func(q ActiveAssetsStore) error {
proofSizes, err := q.FetchAssetProofsSizes(ctx)
if err != nil {
return err
}

for _, v := range proofSizes {
pSizes = append(
pSizes, AssetProofSize{
ScriptKey: v.ScriptKey,
ProofFileLength: v.ProofFileLength,
},
)
}

return nil
})

if dbErr != nil {
return nil, dbErr
}

return pSizes, nil
}

// FetchAssetProofs returns the latest proof file for either the set of target
// assets, or all assets if no script keys for an asset are passed in.
//
Expand Down Expand Up @@ -3267,6 +3336,45 @@ func (a *AssetStore) FetchAssetMetaForAsset(ctx context.Context,
return assetMeta, nil
}

// AssetsDBSize returns the total size of the taproot assets database.
func (a *AssetStore) AssetsDBSize(ctx context.Context) (int64, error) {
var totalSize int64

readOpts := NewAssetStoreReadTx()
dbErr := a.metaDb.ExecTx(ctx, &readOpts, func(q MetaStore) error {
var (
size int64
err error
)
switch a.dbType {
case sqlc.BackendTypePostgres:
size, err = q.AssetsDBSizePostgres(ctx)

case sqlc.BackendTypeSqlite:
var res int32
res, err = q.AssetsDBSizeSqlite(ctx)
size = int64(res)

default:
return fmt.Errorf("unsupported db backend type")
}

if err != nil {
return err
}

totalSize = size

return nil
})

if dbErr != nil {
return 0, dbErr
}

return totalSize, nil
}

// FetchAssetMetaByHash attempts to fetch an asset meta based on an asset hash.
func (a *AssetStore) FetchAssetMetaByHash(ctx context.Context,
metaHash [asset.MetaHashLen]byte) (*proof.MetaReveal, error) {
Expand Down
Loading
Loading