Skip to content

Commit

Permalink
Rely on consensus event channel for block and virtual change set proc…
Browse files Browse the repository at this point in the history
…essing
  • Loading branch information
tiram88 committed Jun 20, 2022
1 parent 0233365 commit 1ed802c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 63 deletions.
26 changes: 25 additions & 1 deletion processing/infrastructure/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package logging

import (
"fmt"
"github.com/kaspanet/kaspad/infrastructure/logger"
"os"
"time"

"github.com/kaspanet/kaspad/infrastructure/logger"
)

var (
Expand Down Expand Up @@ -36,3 +38,25 @@ func UpdateLogLevels() {
func Logger() *logger.Logger {
return log
}

func LogErrorAndExit(errorLog string, logParameters ...interface{}) {
// If LoadConfig failed, the logger backend may not have been run yet
if !log.Backend().IsRunning() {
logger.InitLogStdout(logger.LevelInfo)
UpdateLogLevels()
}

log.Errorf(errorLog, logParameters...)

exitHandlerDone := make(chan struct{})
go func() {
log.Backend().Close()
close(exitHandlerDone)
}()
select {
case <-time.After(1 * time.Second):
case <-exitHandlerDone:
}

os.Exit(1)
}
63 changes: 21 additions & 42 deletions processing/main.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,63 @@
package main

import (
"os"
"time"

databasePackage "github.com/kaspa-live/kaspa-graph-inspector/processing/database"
configPackage "github.com/kaspa-live/kaspa-graph-inspector/processing/infrastructure/config"
"github.com/kaspa-live/kaspa-graph-inspector/processing/infrastructure/logging"
kaspadPackage "github.com/kaspa-live/kaspa-graph-inspector/processing/kaspad"
processingPackage "github.com/kaspa-live/kaspa-graph-inspector/processing/processing"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/infrastructure/logger"
)

var log = logging.Logger()

func main() {
config, err := configPackage.LoadConfig()
if err != nil {
logErrorAndExit("Could not parse command line arguments.\n%s", err)
logging.LogErrorAndExit("Could not parse command line arguments.\n%s", err)
}

database, err := databasePackage.Connect(config.DatabaseConnectionString)
if err != nil {
logErrorAndExit("Could not connect to database %s: %s", config.DatabaseConnectionString, err)
logging.LogErrorAndExit("Could not connect to database %s: %s", config.DatabaseConnectionString, err)
}
defer database.Close()

kaspad, err := kaspadPackage.New(config)
if err != nil {
logErrorAndExit("Could not create kaspad: %s", err)
logging.LogErrorAndExit("Could not create kaspad: %s", err)
}
processing, err := processingPackage.NewProcessing(config, database, kaspad)
if err != nil {
logErrorAndExit("Could not initialize processing: %s", err)
logging.LogErrorAndExit("Could not initialize processing: %s", err)
}
kaspad.SetOnBlockAddedListener(func(block *externalapi.DomainBlock) {
err := processing.ProcessBlock(block)
if err != nil {
logErrorAndExit("Could not process block: %s", err)
}
})

// This is no longer useful since kaspad v0.12.2
// that introduce a consensus event channel.
// See processing.initConsensusEventsHandler.

// kaspad.SetOnBlockAddedListener(func(block *externalapi.DomainBlock) {
// blockHash := consensushashing.BlockHash(block)
// blockInfo, err := kaspad.Domain().Consensus().GetBlockInfo(blockHash)
// if err != nil {
// logging.LogErrorAndExit("Consensus ValidateAndInsertBlock listener could not get block info for block %s: %s", blockHash, err)
// }
// logging.Logger().Debugf("Consensus ValidateAndInsertBlock listener gets block %s with status %s", blockHash, blockInfo.BlockStatus.String())
// })

kaspad.SetOnVirtualResolvedListener(func() {
err := processing.ResyncVirtualSelectedParentChain()
if err != nil {
logErrorAndExit("Could not resync the virtual selected parent chain: %s", err)
logging.LogErrorAndExit("Could not resync the virtual selected parent chain: %s", err)
}
})
kaspad.SetOnConsensusResetListener(func() {
err := processing.ResyncDatabase()
if err != nil {
logErrorAndExit("Could not resync database: %s", err)
logging.LogErrorAndExit("Could not resync database: %s", err)
}
})
err = kaspad.Start()
if err != nil {
logErrorAndExit("Could not start kaspad: %s", err)
logging.LogErrorAndExit("Could not start kaspad: %s", err)
}

<-make(chan struct{})
}

func logErrorAndExit(errorLog string, logParameters ...interface{}) {
// If LoadConfig failed, the logger backend may not have been run yet
if !log.Backend().IsRunning() {
logger.InitLogStdout(logger.LevelInfo)
logging.UpdateLogLevels()
}

log.Errorf(errorLog, logParameters...)

exitHandlerDone := make(chan struct{})
go func() {
log.Backend().Close()
close(exitHandlerDone)
}()
select {
case <-time.After(1 * time.Second):
case <-exitHandlerDone:
}

os.Exit(1)
}
6 changes: 6 additions & 0 deletions processing/processing/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package batch
import (
"github.com/go-pg/pg/v10"
databasePackage "github.com/kaspa-live/kaspa-graph-inspector/processing/database"
"github.com/kaspa-live/kaspa-graph-inspector/processing/infrastructure/logging"
kaspadPackage "github.com/kaspa-live/kaspa-graph-inspector/processing/kaspad"
"github.com/kaspanet/kaspad/domain/consensus/database"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/pkg/errors"
)

var log = logging.Logger()

type Batch struct {
database *databasePackage.Database
kaspad *kaspadPackage.Kaspad
Expand Down Expand Up @@ -125,9 +128,12 @@ func (b *Batch) CollectDirectDependencies(databaseTransaction *pg.Tx, hash *exte
// to include it in the batch
if !errors.Is(err, database.ErrNotFound) {
return err
} else {
log.Warnf("Parent %s for block %s not found by kaspad domain consensus; the missing dependency is ignored", parentHash, hash)
}
} else {
b.Add(parentHash, parentBlock)
log.Warnf("Parent %s for block %s found by kaspad domain consensus; the missing dependency is registered for processing", parentHash, hash)
}
}
}
Expand Down
54 changes: 34 additions & 20 deletions processing/processing/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kaspa-live/kaspa-graph-inspector/processing/processing/batch"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/infrastructure/db/database"
"github.com/pkg/errors"
)

Expand All @@ -34,7 +35,7 @@ func NewProcessing(config *configPackage.Config,
database: database,
kaspad: kaspad,
}
processing.initConsensusEventsHandler(kaspad.Domain().ConsensusEventsChannel())
processing.initConsensusEventsHandler()

err := processing.ResyncDatabase()
if err != nil {
Expand All @@ -44,7 +45,7 @@ func NewProcessing(config *configPackage.Config,
return processing, nil
}

func (p *Processing) initConsensusEventsHandler(consensusEventsChan chan externalapi.ConsensusEvent) {
func (p *Processing) initConsensusEventsHandler() {
go func() {
for {
consensusEvent, ok := <-p.kaspad.Domain().ConsensusEventsChannel()
Expand All @@ -55,21 +56,21 @@ func (p *Processing) initConsensusEventsHandler(consensusEventsChan chan externa
case *externalapi.VirtualChangeSet:
err := p.ProcessVirtualChange(event)
if err != nil {
panic(err)
logging.LogErrorAndExit("Failed to process virtual change consensus event: %s", err)
}
case *externalapi.BlockAdded:
log.Debugf("Consensus event handler gets block %s", consensushashing.BlockHash(event.Block))
err := p.ProcessBlock(event.Block)
if err != nil {
panic(err)
logging.LogErrorAndExit("Failed to process block added consensus event: %s", err)
}
default:
panic(errors.Errorf("Got event of unsupported type %T", consensusEvent))
logging.LogErrorAndExit("Failed to process consensus event: %s", errors.Errorf("Got event of unsupported type %T", consensusEvent))
}
}
}()
}


func (p *Processing) ResyncDatabase() error {
p.Lock()
defer p.Unlock()
Expand Down Expand Up @@ -218,7 +219,7 @@ func (p *Processing) ResyncDatabase() error {
}
}

return p.resyncVirtualSelectedParentChain(databaseTransaction)
return p.resyncVirtualSelectedParentChain(databaseTransaction, false)
})
}

Expand All @@ -227,17 +228,17 @@ func (p *Processing) ResyncVirtualSelectedParentChain() error {
defer p.Unlock()

return p.database.RunInTransaction(func(databaseTransaction *pg.Tx) error {
return p.resyncVirtualSelectedParentChain(databaseTransaction)
return p.resyncVirtualSelectedParentChain(databaseTransaction, false)
})
}

func (p *Processing) resyncVirtualSelectedParentChain(databaseTransaction *pg.Tx) error {
func (p *Processing) resyncVirtualSelectedParentChain(databaseTransaction *pg.Tx, withDependencies bool) error {
log.Infof("Resyncing virtual selected parent chain")
defer log.Infof("Finished resyncing virtual selected parent chain")

highestBlockVirtualSelectedParentChain, err := p.database.HighestBlockInVirtualSelectedParentChain(databaseTransaction)
if err != nil {
return err
return errors.Wrapf(err, "Could not get highest block in virtual selected parent chain")
}
highestBlockHash, err := externalapi.NewDomainHashFromString(highestBlockVirtualSelectedParentChain.BlockHash)
if err != nil {
Expand All @@ -247,6 +248,12 @@ func (p *Processing) resyncVirtualSelectedParentChain(databaseTransaction *pg.Tx

virtualSelectedParentChain, err := p.kaspad.Domain().Consensus().GetVirtualSelectedParentChainFromBlock(highestBlockHash)
if err != nil {
if database.IsNotFoundError(err) {
// This may occur when restoring a kgi database on a system which kaspad database
// is older than the kgi database.
log.Errorf("Could not get virtual selected parent chain from block %s: %s", highestBlockHash, err)
return nil
}
return err
}
if len(virtualSelectedParentChain.Added) > 0 {
Expand All @@ -258,11 +265,13 @@ func (p *Processing) resyncVirtualSelectedParentChain(databaseTransaction *pg.Tx
blockInsertionResult := &externalapi.VirtualChangeSet{
VirtualSelectedParentChainChanges: virtualSelectedParentChain,
}
err = p.processBlockAndDependencies(databaseTransaction, virtualSelectedParentHash, virtualSelectedParentBlock, nil)
if err != nil {
return err
if withDependencies {
err = p.processBlockAndDependencies(databaseTransaction, virtualSelectedParentHash, virtualSelectedParentBlock, nil)
if err != nil {
return err
}
}
err = p.processVirtualChange(databaseTransaction, blockInsertionResult)
err = p.processVirtualChange(databaseTransaction, blockInsertionResult, withDependencies)
if err != nil {
return err
}
Expand Down Expand Up @@ -294,6 +303,9 @@ func (p *Processing) processBlockAndDependencies(databaseTransaction *pg.Tx, has
if !ok {
break
}
if !batch.Empty() {
log.Warnf("Handling missing dependency block %s", consensushashing.BlockHash(block))
}
err = p.processBlock(databaseTransaction, block)
if err != nil {
return err
Expand Down Expand Up @@ -407,6 +419,8 @@ func (p *Processing) processBlock(databaseTransaction *pg.Tx, block *externalapi
return errors.Wrapf(err, "Could not insert edge from block %s to parent id %d", blockHash, parentID)
}
}
} else {
log.Debugf("Block %s already exists in database; not processed", blockHash)
}

blockInfo, err := p.kaspad.Domain().Consensus().GetBlockInfo(blockHash)
Expand Down Expand Up @@ -474,11 +488,11 @@ func (p *Processing) ProcessVirtualChange(blockInsertionResult *externalapi.Virt
defer p.Unlock()

return p.database.RunInTransaction(func(databaseTransaction *pg.Tx) error {
return p.processVirtualChange(databaseTransaction, blockInsertionResult)
return p.processVirtualChange(databaseTransaction, blockInsertionResult, true)
})
}

func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInsertionResult *externalapi.VirtualChangeSet) error {
func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInsertionResult *externalapi.VirtualChangeSet, withDependencies bool) error {
if blockInsertionResult == nil || blockInsertionResult.VirtualSelectedParentChainChanges == nil {
return nil
}
Expand All @@ -492,7 +506,7 @@ func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInser
if err == nil {
blockColors[removedBlockID] = model.ColorGray
blockIsInVirtualSelectedParentChain[removedBlockID] = false
} else {
} else if withDependencies {
log.Errorf("Could not get id of removed block %s", removedBlockHash)
}
}
Expand All @@ -504,7 +518,7 @@ func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInser
addedBlockID, err := p.database.BlockIDByHash(databaseTransaction, addedBlockHash)
if err == nil {
blockIsInVirtualSelectedParentChain[addedBlockID] = true
} else {
} else if withDependencies {
log.Errorf("Could not get id of added block %s", addedBlockHash)
}
}
Expand All @@ -527,7 +541,7 @@ func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInser
blueBlockID, err := p.database.BlockIDByHash(databaseTransaction, blueHash)
if err == nil {
blockColors[blueBlockID] = model.ColorBlue
} else {
} else if withDependencies {
log.Errorf("Could not get id of merge set blue block %s", blueHash)
}
}
Expand All @@ -539,7 +553,7 @@ func (p *Processing) processVirtualChange(databaseTransaction *pg.Tx, blockInser
redBlockID, err := p.database.BlockIDByHash(databaseTransaction, redHash)
if err == nil {
blockColors[redBlockID] = model.ColorRed
} else {
} else if withDependencies {
log.Errorf("Could not get id of merge set red block %s", redHash)
}
}
Expand Down

0 comments on commit 1ed802c

Please sign in to comment.