Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outport driver stuck prints #4452

Merged
merged 8 commits into from
Sep 14, 2022
68 changes: 60 additions & 8 deletions outport/outport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package outport
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ElrondNetwork/elrond-go-core/core/check"
Expand All @@ -13,13 +14,17 @@ import (

var log = logger.GetOrCreate("outport")

const maxTimeForDriverCall = time.Second * 30
const minimumRetrialInterval = time.Millisecond * 10

type outport struct {
mutex sync.RWMutex
drivers []Driver
retrialInterval time.Duration
chanClose chan struct{}
mutex sync.RWMutex
drivers []Driver
retrialInterval time.Duration
chanClose chan struct{}
logHandler func(logLevel logger.LogLevel, message string, args ...interface{})
timeForDriverCall time.Duration
messageCounter uint64
}

// NewOutport will create a new instance of proxy
Expand All @@ -29,10 +34,12 @@ func NewOutport(retrialInterval time.Duration) (*outport, error) {
}

return &outport{
drivers: make([]Driver, 0),
mutex: sync.RWMutex{},
retrialInterval: retrialInterval,
chanClose: make(chan struct{}),
drivers: make([]Driver, 0),
mutex: sync.RWMutex{},
retrialInterval: retrialInterval,
chanClose: make(chan struct{}),
logHandler: log.Log,
timeForDriverCall: maxTimeForDriverCall,
}, nil
}

Expand All @@ -46,7 +53,34 @@ func (o *outport) SaveBlock(args *indexer.ArgsSaveBlockData) {
}
}

func (o *outport) monitorCompletionOnDriver(function string, driver Driver) chan struct{} {
counter := atomic.AddUint64(&o.messageCounter, 1)

o.logHandler(logger.LogDebug, "outport.monitorCompletionOnDriver starting",
"function", function, "driver", driverString(driver), "message counter", counter)
ch := make(chan struct{})
go func() {
timer := time.NewTimer(o.timeForDriverCall)

select {
case <-ch:
o.logHandler(logger.LogDebug, "outport.monitorCompletionOnDriver ended",
bogdan-rosianu marked this conversation as resolved.
Show resolved Hide resolved
"function", function, "driver", driverString(driver), "message counter", counter)
case <-timer.C:
o.logHandler(logger.LogWarning, "outport.monitorCompletionOnDriver took too long",
"function", function, "driver", driverString(driver), "message counter", counter, "time", o.timeForDriverCall)
}

timer.Stop()
}()

return ch
}

func (o *outport) saveBlockBlocking(args *indexer.ArgsSaveBlockData, driver Driver) {
ch := o.monitorCompletionOnDriver("saveBlockBlocking", driver)
defer close(ch)

for {
err := driver.SaveBlock(args)
if err == nil {
Expand Down Expand Up @@ -84,6 +118,9 @@ func (o *outport) RevertIndexedBlock(header data.HeaderHandler, body data.BodyHa
}

func (o *outport) revertIndexedBlockBlocking(header data.HeaderHandler, body data.BodyHandler, driver Driver) {
ch := o.monitorCompletionOnDriver("revertIndexedBlockBlocking", driver)
defer close(ch)

for {
err := driver.RevertIndexedBlock(header, body)
if err == nil {
Expand Down Expand Up @@ -112,6 +149,9 @@ func (o *outport) SaveRoundsInfo(roundsInfo []*indexer.RoundInfo) {
}

func (o *outport) saveRoundsInfoBlocking(roundsInfo []*indexer.RoundInfo, driver Driver) {
ch := o.monitorCompletionOnDriver("saveRoundsInfoBlocking", driver)
defer close(ch)

for {
err := driver.SaveRoundsInfo(roundsInfo)
if err == nil {
Expand Down Expand Up @@ -140,6 +180,9 @@ func (o *outport) SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte, e
}

func (o *outport) saveValidatorsPubKeysBlocking(validatorsPubKeys map[uint32][][]byte, epoch uint32, driver Driver) {
ch := o.monitorCompletionOnDriver("saveValidatorsPubKeysBlocking", driver)
defer close(ch)

for {
err := driver.SaveValidatorsPubKeys(validatorsPubKeys, epoch)
if err == nil {
Expand Down Expand Up @@ -168,6 +211,9 @@ func (o *outport) SaveValidatorsRating(indexID string, infoRating []*indexer.Val
}

func (o *outport) saveValidatorsRatingBlocking(indexID string, infoRating []*indexer.ValidatorRatingInfo, driver Driver) {
ch := o.monitorCompletionOnDriver("saveValidatorsRatingBlocking", driver)
defer close(ch)

for {
err := driver.SaveValidatorsRating(indexID, infoRating)
if err == nil {
Expand Down Expand Up @@ -196,6 +242,9 @@ func (o *outport) SaveAccounts(blockTimestamp uint64, acc []data.UserAccountHand
}

func (o *outport) saveAccountsBlocking(blockTimestamp uint64, acc []data.UserAccountHandler, driver Driver) {
ch := o.monitorCompletionOnDriver("saveAccountsBlocking", driver)
defer close(ch)

for {
err := driver.SaveAccounts(blockTimestamp, acc)
if err == nil {
Expand Down Expand Up @@ -224,6 +273,9 @@ func (o *outport) FinalizedBlock(headerHash []byte) {
}

func (o *outport) finalizedBlockBlocking(headerHash []byte, driver Driver) {
ch := o.monitorCompletionOnDriver("finalizedBlockBlocking", driver)
defer close(ch)

for {
err := driver.FinalizedBlock(headerHash)
if err == nil {
Expand Down
Loading