Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spammer state bugs #37

Merged
merged 4 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ linters-settings:
initialisms: ["ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS", "SIP", "RTP", "AMQP", "DB", "TS"]

linters:
# Disable all linters.
disable-all: true
# Enable specific linter
enable:
- deadcode
- errcheck
Expand Down Expand Up @@ -107,3 +110,23 @@ issues:
# Set to 0 to disable.
# Default: 3
max-same-issues: 0
#exclude:
# - 'Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked' # errcheck
# - "err113: do not define dynamic errors, use wrapped static errors instead:" # goerr113
# - "type name will be used as [0-9A-Za-z_.]+ by other packages, and that stutters; consider calling this" # golint
# - "Potential file inclusion via variable" # gosec
# - "G404: Use of weak random number generator" # gosec
# - "Subprocess launch(ed with variable|ing should be audited)" # gosec
# - "Use of unsafe calls should be audited" # gosec
# - "G108: Profiling endpoint is automatically exposed on /debug/pprof" # gosec
# - "(Expect directory permissions to be 0750 or less|Expect file permissions to be 0600 or less)" # gosec
# - "G101: Potential hardcoded credentials" # gosec
# - "(G104|G307)" # gosec Duplicated errcheck checks.
# - "`[0-9A-Za-z_.]+` - `[0-9A-Za-z_.]+` always receives `[0-9A-Za-z_.]+`" # unparam
# - "should have comment .*or be unexported" # revive
# - "exported: comment on exported" # revive
# - "package-comments: package comment should be of the form" # revive
# - "blank-imports" # revive
# - "var-naming: don't use leading k in Go names;" #revive
# - 'shadow: declaration of "err"' # govet

6 changes: 5 additions & 1 deletion core/spammer/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func run() error {

CoreComponent.LogInfo("Starting API server...")

//nolint:contextcheck // false positive
_ = spammer.NewServer(deps.Spammer, e.Group(""))

go func() {
Expand All @@ -263,10 +264,13 @@ func run() error {
}

shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

//nolint:contextcheck // false positive
if err := e.Shutdown(shutdownCtx); err != nil {
CoreComponent.LogWarn(err)
}
shutdownCtxCancel()

CoreComponent.LogInfo("Stopping API ... done")
}, daemon.PriorityStopSpammerAPI); err != nil {
CoreComponent.LogPanicf("failed to start worker: %s", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/spammer/cpu_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *CPUUsageUpdater) Run(ctx context.Context) {
return
}

//nolint:contextcheck // false positive
cpuUsagePSutil, err := cpu.Percent(c.sampleTime, false)
c.Lock()
if err != nil {
Expand Down
64 changes: 38 additions & 26 deletions pkg/spammer/spammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

const (
IndexerQueryMaxResults = 1000
IndexerQueryTimeout = 30 * time.Second
IndexerQueryTimeout = 15 * time.Second
)

type outputState byte
Expand Down Expand Up @@ -234,7 +234,7 @@ func New(
workersCount = runtime.NumCPU() - 1
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), IndexerQueryTimeout)
defer cancel()

var err error
Expand Down Expand Up @@ -563,8 +563,8 @@ func (s *Spammer) startSpammerWorkers(valueSpamEnabled bool, bpsRateLimit float6
spammerWorkerCount = 1
}

var rateLimitChannel chan struct{} = nil
var rateLimitAbortSignal chan struct{} = nil
var rateLimitChannel chan struct{}
var rateLimitAbortSignal chan struct{}
currentProcessID := s.processID.Load()

if bpsRateLimit != 0.0 {
Expand Down Expand Up @@ -797,7 +797,7 @@ func (s *Spammer) Start(valueSpamEnabled *bool, bpsRateLimit *float64, cpuMaxUsa
workersCountCfg = 1
}

if err := s.getCurrentSpammerLedgerState(); err != nil {
if err := s.getCurrentSpammerLedgerState(context.Background()); err != nil {
return err
}

Expand Down Expand Up @@ -955,20 +955,12 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles

if conflicting {
// there was a conflict in the chain
// forget all known outputs
s.accountSender.ResetOutputs()
s.accountReceiver.ResetOutputs()

// recreate the pending transactions map
s.pendingTransactionsMap = make(map[iotago.BlockID]*pendingTransaction)
s.resetSpammerState()

// wait until the indexer got updated
if err := s.waitForIndexerUpdate(ctx, msIndex); err != nil {
return err
}

// reset the state if there was a conflict
s.outputState = stateBasicOutputCreate
} else {
// we only allow the spammer to create new transactions if there was no conflict in the last milestone.
s.currentLedgerMilestoneIndex = s.ledgerMilestoneIndex.Load()
Expand All @@ -980,22 +972,38 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles
// but there are no new outputs created by the spammer.
// in this case we query the indexer again to get the latest state.
if s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled {
if err := s.getCurrentSpammerLedgerState(); err != nil {
return err
if err := s.getCurrentSpammerLedgerState(ctx); err != nil {
// there was an error getting the current ledger state
s.resetSpammerState()

s.LogWarnf("failed to get current spammer ledger state: %s", err.Error())
}
}

return nil
}

// resetSpammerState resets the spammer state in case of a conflict.
func (s *Spammer) resetSpammerState() {
// forget all known outputs
s.accountSender.ResetOutputs()
s.accountReceiver.ResetOutputs()

// recreate the pending transactions map
s.pendingTransactionsMap = make(map[iotago.BlockID]*pendingTransaction)

// reset the state if there was a conflict
s.outputState = stateBasicOutputCreate
}

// waitForIndexerUpdate waits until the indexer got updated to the expected milestone index.
func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.MilestoneIndex) error {

if s.indexer == nil {
return nodeclient.ErrIndexerPluginNotAvailable
}

ctxWaitForUpdate, cancelWaitForUpdate := context.WithTimeout(ctx, 10*time.Second)
ctxWaitForUpdate, cancelWaitForUpdate := context.WithTimeout(ctx, IndexerQueryTimeout)
defer cancelWaitForUpdate()

for ctxWaitForUpdate.Err() == nil {
Expand Down Expand Up @@ -1023,7 +1031,7 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles
return ctxWaitForUpdate.Err()
}

func (s *Spammer) getCurrentSpammerLedgerState() error {
func (s *Spammer) getCurrentSpammerLedgerState(ctx context.Context) error {

if s.accountSender == nil || s.accountReceiver == nil {
return nil
Expand All @@ -1035,25 +1043,25 @@ func (s *Spammer) getCurrentSpammerLedgerState() error {

ts := time.Now()

ctx, cancel := context.WithTimeout(context.Background(), IndexerQueryTimeout)
defer cancel()
ctxQuery, cancelQuery := context.WithTimeout(ctx, IndexerQueryTimeout)
defer cancelQuery()

// only query basic outputs with native tokens if we want to melt them
allowNativeTokens := s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken && s.valueSpamMeltNativeToken

// get all known outputs from the indexer (sender)
if err := s.accountSender.QueryOutputsFromIndexer(ctx, s.indexer, allowNativeTokens, true, s.valueSpamCreateAlias, s.valueSpamCreateAlias, s.valueSpamCreateNFT, IndexerQueryMaxResults); err != nil {
if err := s.accountSender.QueryOutputsFromIndexer(ctxQuery, s.indexer, allowNativeTokens, true, s.valueSpamCreateAlias, s.valueSpamCreateAlias, s.valueSpamCreateNFT, IndexerQueryMaxResults); err != nil {
return err
}

receiversAliasOutputsUsed := s.valueSpamCreateAlias && s.valueSpamDestroyAlias && ((!s.valueSpamCreateFoundry || s.valueSpamDestroyFoundry) && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken))

// get all known outputs from the indexer (receiver)
if err := s.accountReceiver.QueryOutputsFromIndexer(ctx, s.indexer, allowNativeTokens, s.valueSpamCollectBasicOutput, receiversAliasOutputsUsed, receiversAliasOutputsUsed, s.valueSpamDestroyNFT, IndexerQueryMaxResults); err != nil {
if err := s.accountReceiver.QueryOutputsFromIndexer(ctxQuery, s.indexer, allowNativeTokens, s.valueSpamCollectBasicOutput, receiversAliasOutputsUsed, receiversAliasOutputsUsed, s.valueSpamDestroyNFT, IndexerQueryMaxResults); err != nil {
return err
}

s.LogDebugf(`getCurrentSpammerLedgerState finised, took: %v
s.LogDebugf(`getCurrentSpammerLedgerState finished, took: %v
outputs sender: basic: %d, alias: %d, foundry: %d, nft: %d
outputs receiver: basic: %d, alias: %d, foundry: %d, nft: %d`, time.Since(ts).Truncate(time.Millisecond),
s.accountSender.BasicOutputsCount(), s.accountSender.AliasOutputsCount(), s.accountSender.FoundryOutputsCount(), s.accountSender.NFTOutputsCount(),
Expand Down Expand Up @@ -1223,7 +1231,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB
senderAddress := spamBuilder.accountSender.Address()

// add all inputs
var remainder int64 = 0
var remainder int64
consumedInputIDs := iotago.OutputIDs{}
for _, input := range spamBuilder.consumedInputs {
remainder += int64(input.Output().Deposit())
Expand Down Expand Up @@ -1313,7 +1321,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB
}

// add all outputs and calculate the remainder
var remainderOutputIndex uint16 = 0
var remainderOutputIndex uint16
for i, outputWithOwnership := range spamBuilder.createdOutputs {
output := outputWithOwnership.Output

Expand Down Expand Up @@ -1404,6 +1412,10 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB

blockID, err := s.sendBlockFunc(ctx, block)
if err != nil {
// there was an error during sending a transaction
// it is high likely that something is non-solid or below max depth
s.resetSpammerState()

return nil, nil, fmt.Errorf("send transaction block failed, error: %w", err)
}

Expand All @@ -1423,7 +1435,7 @@ func (s *Spammer) BuildTransactionPayloadBlockAndSend(ctx context.Context, spamB

createdOutputs := make([]UTXOInterface, 0)

var outputIndex uint16 = 0
var outputIndex uint16
for _, outputWithOwnership := range spamBuilder.createdOutputs {
output := outputWithOwnership.Output
if output.Type() == iotago.OutputAlias {
Expand Down
5 changes: 4 additions & 1 deletion plugins/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ func run() error {
Plugin.LogInfo("Stopping Prometheus exporter ...")

shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

//nolint:contextcheck // false positive
err := deps.PrometheusEcho.Shutdown(shutdownCtx)
if err != nil {
Plugin.LogWarn(err)
}
shutdownCtxCancel()

Plugin.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityStopPrometheus)
}
Expand Down