From 99259dc5232f3c95a5cc223977ee6ed444e5788c Mon Sep 17 00:00:00 2001 From: Ryan Staudt Date: Sun, 18 Apr 2021 06:39:43 -0500 Subject: [PATCH] multi: Introduce UTXO database. This introduces a separate UTXO database that holds the UTXO database info. In a subsequent commit it will additionally hold the UTXO set and state. An overview of the changes is as follows: - Introduce LoadUtxoDB to load (or create when needed) the new UTXO database on startup - Add utxoDb and utxoDbInfo to BlockChain to hold the new UTXO database - Initialize the UTXO database info in the UTXO database on startup - Populate utxoDb for mock chains in tests This is part of moving the UTXO set and state to a separate database. --- blockchain/chain.go | 23 ++++++- blockchain/chainio.go | 17 +++--- blockchain/common_test.go | 63 +++++++++++++------ blockchain/fullblocks_test.go | 63 +++++++++++++------ blockchain/utxodb.go | 95 ++++++++++++++++++++++++++++ blockchain/utxoio.go | 112 ++++++++++++++++++++++++++++++++++ blockchain/validate_test.go | 21 ++++--- blockdb.go | 4 +- dcrd.go | 22 ++++++- server.go | 6 +- 10 files changed, 367 insertions(+), 59 deletions(-) create mode 100644 blockchain/utxodb.go diff --git a/blockchain/chain.go b/blockchain/chain.go index 27adba5a07..a4100b3ae0 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -137,6 +137,8 @@ type BlockChain struct { deploymentVers map[string]uint32 db database.DB dbInfo *databaseInfo + utxoDb database.DB + utxoDbInfo *utxoDatabaseInfo chainParams *chaincfg.Params timeSource MedianTimeSource notifications NotificationCallback @@ -2102,11 +2104,17 @@ func (q *chainQueryerAdapter) PrevScripts(dbTx database.Tx, block *dcrutil.Block // Config is a descriptor which specifies the blockchain instance configuration. type Config struct { // DB defines the database which houses the blocks and will be used to - // store all metadata created by this package such as the utxo set. + // store all metadata created by this package outside of the UTXO set, which + // is stored in a separate database. // // This field is required. DB database.DB + // UtxoDB defines the database which houses the UTXO set. + // + // This field is required. + UtxoDB database.DB + // ChainParams identifies which chain parameters the chain is associated // with. // @@ -2175,6 +2183,9 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { if config.DB == nil { return nil, AssertError("blockchain.New database is nil") } + if config.UtxoDB == nil { + return nil, AssertError("blockchain.New UTXO database is nil") + } if config.ChainParams == nil { return nil, AssertError("blockchain.New chain parameters nil") } @@ -2215,6 +2226,7 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { checkpointsByHeight: checkpointsByHeight, deploymentVers: deploymentVers, db: config.DB, + utxoDb: config.UtxoDB, chainParams: params, timeSource: config.TimeSource, notifications: config.Notifications, @@ -2243,6 +2255,12 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { return nil, err } + // Initialize the UTXO state. This entails running any database migrations as + // necessary as well as initializing the UTXO cache. + if err := b.initUtxoState(ctx); err != nil { + return nil, err + } + // Initialize and catch up all of the currently active optional indexes // as needed. queryAdapter := chainQueryerAdapter{BlockChain: &b} @@ -2257,6 +2275,9 @@ func New(ctx context.Context, config *Config) (*BlockChain, error) { "%d, block index: %d", b.dbInfo.version, b.dbInfo.compVer, b.dbInfo.bidxVer) + log.Infof("UTXO database version info: version: %d, compression: %d, utxo "+ + "set: %d", b.utxoDbInfo.version, b.utxoDbInfo.compVer, b.utxoDbInfo.utxoVer) + b.index.RLock() bestHdr := b.index.bestHeader b.index.RUnlock() diff --git a/blockchain/chainio.go b/blockchain/chainio.go index 82fc3c681f..afdd67bf7c 100644 --- a/blockchain/chainio.go +++ b/blockchain/chainio.go @@ -1362,6 +1362,14 @@ func (b *BlockChain) initChainState(ctx context.Context) error { } } + // Initialize the UTXO database info. This must be initialized after the + // block database info is loaded, but before block database migrations are + // run, since setting the initial UTXO set version depends on the block + // database version as that is where it originally resided. + if err := b.initUtxoDbInfo(ctx); err != nil { + return err + } + // Upgrade the database as needed. err = upgradeDB(ctx, b.db, b.chainParams, b.dbInfo) if err != nil { @@ -1463,14 +1471,7 @@ func (b *BlockChain) initChainState(ctx context.Context) error { } // Upgrade the database post block index load as needed. - err = upgradeDBPostBlockIndexLoad(ctx, b) - if err != nil { - return err - } - - // Initialize the utxo cache to ensure that the state of the utxo set is - // caught up to the tip of the best chain. - return b.utxoCache.Initialize(b, tip) + return upgradeDBPostBlockIndexLoad(ctx, b) } // dbFetchBlockByNode uses an existing database transaction to retrieve the raw diff --git a/blockchain/common_test.go b/blockchain/common_test.go index 7c1e939789..8a3a91b8c5 100644 --- a/blockchain/common_test.go +++ b/blockchain/common_test.go @@ -52,32 +52,27 @@ func isSupportedDbType(dbType string) bool { return false } -// chainSetup is used to create a new db and chain instance with the genesis -// block already inserted. In addition to the new chain instance, it returns -// a teardown function the caller should invoke when done testing to clean up. -func chainSetup(dbName string, params *chaincfg.Params) (*BlockChain, func(), error) { - if !isSupportedDbType(testDbType) { - return nil, nil, fmt.Errorf("unsupported db type %v", testDbType) - } - - // Handle memory database specially since it doesn't need the disk - // specific handling. +// createTestDatabase creates a test database with the provided database name +// and database type for the given network. +func createTestDatabase(dbName string, dbType string, net wire.CurrencyNet) (database.DB, func(), error) { + // Handle memory database specially since it doesn't need the disk specific + // handling. var db database.DB var teardown func() - if testDbType == "memdb" { - ndb, err := database.Create(testDbType) + if dbType == "memdb" { + ndb, err := database.Create(dbType) if err != nil { return nil, nil, fmt.Errorf("error creating db: %w", err) } db = ndb - // Setup a teardown function for cleaning up. This function is - // returned to the caller to be invoked when it is done testing. + // Setup a teardown function for cleaning up. This function is returned to + // the caller to be invoked when it is done testing. teardown = func() { db.Close() } } else { - // Create the directory for test database. + // Create the directory for the test database. dbPath, err := ioutil.TempDir("", dbName) if err != nil { err := fmt.Errorf("unable to create test db path: %w", @@ -85,22 +80,51 @@ func chainSetup(dbName string, params *chaincfg.Params) (*BlockChain, func(), er return nil, nil, err } - // Create a new database to store the accepted blocks into. - ndb, err := database.Create(testDbType, dbPath, blockDataNet) + // Create the test database. + ndb, err := database.Create(dbType, dbPath, net) if err != nil { os.RemoveAll(dbPath) return nil, nil, fmt.Errorf("error creating db: %w", err) } db = ndb - // Setup a teardown function for cleaning up. This function is - // returned to the caller to be invoked when it is done testing. + // Setup a teardown function for cleaning up. This function is returned to + // the caller to be invoked when it is done testing. teardown = func() { db.Close() os.RemoveAll(dbPath) } } + return db, teardown, nil +} + +// chainSetup is used to create a new db and chain instance with the genesis +// block already inserted. In addition to the new chain instance, it returns +// a teardown function the caller should invoke when done testing to clean up. +func chainSetup(dbName string, params *chaincfg.Params) (*BlockChain, func(), error) { + if !isSupportedDbType(testDbType) { + return nil, nil, fmt.Errorf("unsupported db type %v", testDbType) + } + + // Create a test block database. + db, teardownDb, err := createTestDatabase(dbName, testDbType, blockDataNet) + if err != nil { + return nil, nil, err + } + + // Create a test UTXO database. + utxoDb, teardownUtxoDb, err := createTestDatabase(dbName+"_utxo", testDbType, + blockDataNet) + if err != nil { + teardownDb() + return nil, nil, err + } + teardown := func() { + teardownUtxoDb() + teardownDb() + } + // Copy the chain params to ensure any modifications the tests do to // the chain parameters do not affect the global instance. paramsCopy := *params @@ -115,6 +139,7 @@ func chainSetup(dbName string, params *chaincfg.Params) (*BlockChain, func(), er chain, err := New(context.Background(), &Config{ DB: db, + UtxoDB: utxoDb, ChainParams: ¶msCopy, TimeSource: NewMedianTime(), SigCache: sigCache, diff --git a/blockchain/fullblocks_test.go b/blockchain/fullblocks_test.go index f59ae7c188..9c5b1a1834 100644 --- a/blockchain/fullblocks_test.go +++ b/blockchain/fullblocks_test.go @@ -45,32 +45,27 @@ func isSupportedDbType(dbType string) bool { return false } -// chainSetup is used to create a new db and chain instance with the genesis -// block already inserted. In addition to the new chain instance, it returns -// a teardown function the caller should invoke when done testing to clean up. -func chainSetup(dbName string, params *chaincfg.Params) (*blockchain.BlockChain, func(), error) { - if !isSupportedDbType(testDbType) { - return nil, nil, fmt.Errorf("unsupported db type %v", testDbType) - } - - // Handle memory database specially since it doesn't need the disk - // specific handling. +// createTestDatabase creates a test database with the provided database name +// and database type for the given network. +func createTestDatabase(dbName string, dbType string, net wire.CurrencyNet) (database.DB, func(), error) { + // Handle memory database specially since it doesn't need the disk specific + // handling. var db database.DB var teardown func() - if testDbType == "memdb" { - ndb, err := database.Create(testDbType) + if dbType == "memdb" { + ndb, err := database.Create(dbType) if err != nil { return nil, nil, fmt.Errorf("error creating db: %w", err) } db = ndb - // Setup a teardown function for cleaning up. This function is - // returned to the caller to be invoked when it is done testing. + // Setup a teardown function for cleaning up. This function is returned to + // the caller to be invoked when it is done testing. teardown = func() { db.Close() } } else { - // Create the directory for test database. + // Create the directory for the test database. dbPath, err := ioutil.TempDir("", dbName) if err != nil { err := fmt.Errorf("unable to create test db path: %w", @@ -78,22 +73,51 @@ func chainSetup(dbName string, params *chaincfg.Params) (*blockchain.BlockChain, return nil, nil, err } - // Create a new database to store the accepted blocks into. - ndb, err := database.Create(testDbType, dbPath, blockDataNet) + // Create the test database. + ndb, err := database.Create(dbType, dbPath, net) if err != nil { os.RemoveAll(dbPath) return nil, nil, fmt.Errorf("error creating db: %w", err) } db = ndb - // Setup a teardown function for cleaning up. This function is - // returned to the caller to be invoked when it is done testing. + // Setup a teardown function for cleaning up. This function is returned to + // the caller to be invoked when it is done testing. teardown = func() { db.Close() os.RemoveAll(dbPath) } } + return db, teardown, nil +} + +// chainSetup is used to create a new db and chain instance with the genesis +// block already inserted. In addition to the new chain instance, it returns +// a teardown function the caller should invoke when done testing to clean up. +func chainSetup(dbName string, params *chaincfg.Params) (*blockchain.BlockChain, func(), error) { + if !isSupportedDbType(testDbType) { + return nil, nil, fmt.Errorf("unsupported db type %v", testDbType) + } + + // Create a test block database. + db, teardownDb, err := createTestDatabase(dbName, testDbType, blockDataNet) + if err != nil { + return nil, nil, err + } + + // Create a test UTXO database. + utxoDb, teardownUtxoDb, err := createTestDatabase(dbName+"_utxo", testDbType, + blockDataNet) + if err != nil { + teardownDb() + return nil, nil, err + } + teardown := func() { + teardownUtxoDb() + teardownDb() + } + // Copy the chain params to ensure any modifications the tests do to // the chain parameters do not affect the global instance. paramsCopy := *params @@ -108,6 +132,7 @@ func chainSetup(dbName string, params *chaincfg.Params) (*blockchain.BlockChain, chain, err := blockchain.New(context.Background(), &blockchain.Config{ DB: db, + UtxoDB: utxoDb, ChainParams: ¶msCopy, TimeSource: blockchain.NewMedianTime(), SigCache: sigCache, diff --git a/blockchain/utxodb.go b/blockchain/utxodb.go new file mode 100644 index 0000000000..4399a64cc9 --- /dev/null +++ b/blockchain/utxodb.go @@ -0,0 +1,95 @@ +// Copyright (c) 2021 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package blockchain + +import ( + "errors" + "os" + "path/filepath" + + "github.com/decred/dcrd/chaincfg/v3" + "github.com/decred/dcrd/database/v2" + "github.com/decred/dcrd/wire" +) + +const ( + // utxoDbName is the UTXO database name. + utxoDbName = "utxodb" + + // utxoDbDefaultDriver is the default driver to use for the UTXO database. + utxoDbDefaultDriver = "ffldb" +) + +// removeDB removes the database at the provided path. The fi parameter MUST +// agree with the provided path. +func removeDB(dbPath string, fi os.FileInfo) error { + if fi.IsDir() { + return os.RemoveAll(dbPath) + } + + return os.Remove(dbPath) +} + +// removeRegressionDB removes the existing regression test database if running +// in regression test mode and it already exists. +func removeRegressionDB(net wire.CurrencyNet, dbPath string) error { + // Don't do anything if not in regression test mode. + if net != wire.RegNet { + return nil + } + + // Remove the old regression test database if it already exists. + fi, err := os.Stat(dbPath) + if err == nil { + log.Infof("Removing regression test UTXO database from '%s'", dbPath) + return removeDB(dbPath, fi) + } + + return nil +} + +// LoadUtxoDB loads (or creates when needed) the UTXO database and returns a +// handle to it. It also contains additional logic such as ensuring the +// regression test database is clean when in regression test mode. +func LoadUtxoDB(params *chaincfg.Params, dataDir string) (database.DB, error) { + // Set the database path based on the data directory and database name. + dbPath := filepath.Join(dataDir, utxoDbName) + + // The regression test is special in that it needs a clean database for each + // run, so remove it now if it already exists. + removeRegressionDB(params.Net, dbPath) + + // createDB is a convenience func that creates the database with the type and + // network specified in the config at the path determined above while also + // creating any intermediate directories in the configured data directory path + // as needed. + createDB := func() (database.DB, error) { + // Create the data dir if it does not exist. + err := os.MkdirAll(dataDir, 0700) + if err != nil { + return nil, err + } + return database.Create(utxoDbDefaultDriver, dbPath, params.Net) + } + + // Open the existing database or create a new one as needed. + log.Infof("Loading UTXO database from '%s'", dbPath) + db, err := database.Open(utxoDbDefaultDriver, dbPath, params.Net) + if err != nil { + // Return the error if it's not because the database doesn't exist. + if !errors.Is(err, database.ErrDbDoesNotExist) { + return nil, err + } + + db, err = createDB() + if err != nil { + return nil, err + } + } + + log.Info("UTXO database loaded") + + return db, nil +} diff --git a/blockchain/utxoio.go b/blockchain/utxoio.go index cf7cc592ea..f9695e6502 100644 --- a/blockchain/utxoio.go +++ b/blockchain/utxoio.go @@ -6,6 +6,7 @@ package blockchain import ( + "context" "fmt" "sync" "time" @@ -16,6 +17,14 @@ import ( "github.com/decred/dcrd/wire" ) +const ( + // currentUtxoDatabaseVersion indicates the current UTXO database version. + currentUtxoDatabaseVersion = 1 + + // currentUtxoSetVersion indicates the current UTXO set database version. + currentUtxoSetVersion = 3 +) + var ( // utxoDbInfoBucketName is the name of the database bucket used to house // global versioning and date information for the UTXO database. @@ -612,3 +621,106 @@ func dbFetchUtxoSetState(dbTx database.Tx) (*utxoSetState, error) { // Deserialize the utxo set state and return it. return deserializeUtxoSetState(serialized) } + +// createUtxoDbInfo initializes the UTXO database info. It must only be called +// on an uninitialized database. +func (b *BlockChain) createUtxoDbInfo() error { + // Create the initial UTXO database state. + err := b.utxoDb.Update(func(dbTx database.Tx) error { + meta := dbTx.Metadata() + + // Create the bucket that houses information about the database's creation + // and version. + _, err := meta.CreateBucket(utxoDbInfoBucketName) + if err != nil { + return err + } + + // Initialize the UTXO set version. If the block database version is before + // version 9, then initialize the UTXO set version based on the block + // database version since that is what tracked the UTXO set version at that + // point in time. + utxoVer := uint32(currentUtxoSetVersion) + if b.dbInfo.version >= 7 && b.dbInfo.version < 9 { + utxoVer = 2 + } else if b.dbInfo.version < 7 { + utxoVer = 1 + } + + // Write the creation and version information to the database. + b.utxoDbInfo = &utxoDatabaseInfo{ + version: currentUtxoDatabaseVersion, + compVer: currentCompressionVersion, + utxoVer: utxoVer, + created: time.Now(), + } + err = dbPutUtxoDatabaseInfo(dbTx, b.utxoDbInfo) + if err != nil { + return err + } + + // Create the bucket that houses the UTXO set. + _, err = meta.CreateBucket(utxoSetBucketName) + if err != nil { + return err + } + + return err + }) + return err +} + +// initUtxoDbInfo loads (or creates if necessary) the UTXO database info. +func (b *BlockChain) initUtxoDbInfo(ctx context.Context) error { + // Determine the state of the database. + var isStateInitialized bool + err := b.utxoDb.View(func(dbTx database.Tx) error { + // Fetch the database versioning information. + dbInfo := dbFetchUtxoDatabaseInfo(dbTx) + + // The database bucket for the versioning information is missing. + if dbInfo == nil { + return nil + } + + // Don't allow downgrades of the UTXO database. + if dbInfo.version > currentUtxoDatabaseVersion { + return fmt.Errorf("the current UTXO database is no longer compatible "+ + "with this version of the software (%d > %d)", dbInfo.version, + currentUtxoDatabaseVersion) + } + + // Don't allow downgrades of the database compression version. + if dbInfo.compVer > currentCompressionVersion { + return fmt.Errorf("the current database compression version is no "+ + "longer compatible with this version of the software (%d > %d)", + dbInfo.compVer, currentCompressionVersion) + } + + b.utxoDbInfo = dbInfo + isStateInitialized = true + + return nil + }) + if err != nil { + return err + } + + // Initialize the database if it has not already been done. + if !isStateInitialized { + if err := b.createUtxoDbInfo(); err != nil { + return err + } + } + + return nil +} + +// initUtxoState attempts to load and initialize the UTXO state from the +// database. This entails running any database migrations as necessary as well +// as initializing the UTXO cache. +func (b *BlockChain) initUtxoState(ctx context.Context) error { + // Initialize the UTXO cache to ensure that the state of the UTXO set is + // caught up to the tip of the best chain. + return b.utxoCache.Initialize(b, b.bestChain.tip()) +} diff --git a/blockchain/validate_test.go b/blockchain/validate_test.go index e008542948..9ded976b62 100644 --- a/blockchain/validate_test.go +++ b/blockchain/validate_test.go @@ -271,23 +271,30 @@ func TestCheckBlockSanity(t *testing.T) { // TestCheckBlockHeaderContext tests that genesis block passes context headers // because its parent is nil. func TestCheckBlockHeaderContext(t *testing.T) { - // Create a new database for the blocks. + // Create a test block database. + const testDbType = "ffldb" + const dbName = "examplecheckheadercontext" params := chaincfg.RegNetParams() - dbPath := filepath.Join(os.TempDir(), "examplecheckheadercontext") - _ = os.RemoveAll(dbPath) - db, err := database.Create("ffldb", dbPath, params.Net) + db, teardownDb, err := createTestDatabase(dbName, testDbType, params.Net) if err != nil { t.Fatalf("Failed to create database: %v\n", err) - return } - defer os.RemoveAll(dbPath) - defer db.Close() + defer teardownDb() + + // Create a test UTXO database. + utxoDb, teardownUtxoDb, err := createTestDatabase(dbName+"_utxo", testDbType, + params.Net) + if err != nil { + t.Fatalf("Failed to create UTXO database: %v\n", err) + } + defer teardownUtxoDb() // Create a new BlockChain instance using the underlying database for // the simnet network. chain, err := New(context.Background(), &Config{ DB: db, + UtxoDB: utxoDb, ChainParams: params, TimeSource: NewMedianTime(), UtxoCache: NewUtxoCache(&UtxoCacheConfig{ diff --git a/blockdb.go b/blockdb.go index 977ace0374..fc5e25398e 100644 --- a/blockdb.go +++ b/blockdb.go @@ -125,8 +125,8 @@ func loadBlockDB(params *chaincfg.Params) (database.DB, error) { // createDB is a convenience func that creates the database with the type // and network specified in the config at the path determined above while - // also creating any any intermediate directories in the configured data - // directory path as needed. + // also creating any intermediate directories in the configured data directory + // path as needed. createDB := func() (database.DB, error) { // Create the data dir if it does not exist. err := os.MkdirAll(cfg.DataDir, 0700) diff --git a/dcrd.go b/dcrd.go index 20e44b6fcb..cde8675a78 100644 --- a/dcrd.go +++ b/dcrd.go @@ -17,6 +17,7 @@ import ( "runtime/pprof" "strings" + "github.com/decred/dcrd/blockchain/v4" "github.com/decred/dcrd/blockchain/v4/indexers" "github.com/decred/dcrd/internal/limits" "github.com/decred/dcrd/internal/version" @@ -140,7 +141,7 @@ func dcrdMain() error { defer func() { // Ensure the database is sync'd and closed on shutdown. lifetimeNotifier.notifyShutdownEvent(lifetimeEventDBOpen) - dcrdLog.Infof("Gracefully shutting down the database...") + dcrdLog.Infof("Gracefully shutting down the block database...") db.Close() }() @@ -149,6 +150,23 @@ func dcrdMain() error { return nil } + // Load the UTXO database. + utxoDb, err := blockchain.LoadUtxoDB(cfg.params.Params, cfg.DataDir) + if err != nil { + dcrdLog.Errorf("%v", err) + return err + } + defer func() { + // Ensure the database is sync'd and closed on shutdown. + dcrdLog.Infof("Gracefully shutting down the UTXO database...") + utxoDb.Close() + }() + + // Return now if a shutdown signal was triggered. + if shutdownRequested(ctx) { + return nil + } + // Drop indexes and exit if requested. // // NOTE: The order is important here because dropping the tx index also @@ -186,7 +204,7 @@ func dcrdMain() error { // Create server. lifetimeNotifier.notifyStartupEvent(lifetimeEventP2PServer) - svr, err := newServer(ctx, cfg.Listeners, db, cfg.params.Params, + svr, err := newServer(ctx, cfg.Listeners, db, utxoDb, cfg.params.Params, cfg.DataDir) if err != nil { dcrdLog.Errorf("Unable to start server: %v", err) diff --git a/server.go b/server.go index 7814300a84..2cbca8fa82 100644 --- a/server.go +++ b/server.go @@ -3289,7 +3289,10 @@ func setupRPCListeners() ([]net.Listener, error) { // newServer returns a new dcrd server configured to listen on addr for the // decred network type specified by chainParams. Use start to begin accepting // connections from peers. -func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainParams *chaincfg.Params, dataDir string) (*server, error) { +func newServer(ctx context.Context, listenAddrs []string, db database.DB, + utxoDb database.DB, chainParams *chaincfg.Params, + dataDir string) (*server, error) { + amgr := addrmgr.New(cfg.DataDir, dcrdLookup) services := defaultServices @@ -3406,6 +3409,7 @@ func newServer(ctx context.Context, listenAddrs []string, db database.DB, chainP s.chain, err = blockchain.New(ctx, &blockchain.Config{ DB: s.db, + UtxoDB: utxoDb, ChainParams: s.chainParams, Checkpoints: checkpoints, TimeSource: s.timeSource,