Skip to content

Commit

Permalink
Merge branch 'master' into gregor/evm/direct-call
Browse files Browse the repository at this point in the history
  • Loading branch information
ramtinms authored Feb 20, 2024
2 parents 48df683 + a260653 commit 5340029
Show file tree
Hide file tree
Showing 45 changed files with 781 additions and 511 deletions.
5 changes: 5 additions & 0 deletions admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "stop-at-height", "data": { "height": 1111, "crash": false }}'
```

### Trigger checkpoint creation on execution
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "trigger-checkpoint"}'
```

### Add/Remove/Get address to rate limit a payer from adding transactions to collection nodes' mempool
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "add", "addresses": "a08d349e8037d6e5,e6765c6113547fb7" }}'
Expand Down
8 changes: 4 additions & 4 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,12 +1505,12 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
cacheSize := int(backendConfig.ConnectionPoolSize)

var connBackendCache *rpcConnection.Cache
var err error
if cacheSize > 0 {
backendCache, err := backend.NewCache(node.Logger, accessMetrics, cacheSize)
connBackendCache, err = rpcConnection.NewCache(node.Logger, accessMetrics, cacheSize)
if err != nil {
return nil, fmt.Errorf("could not initialize backend cache: %w", err)
return nil, fmt.Errorf("could not initialize connection cache: %w", err)
}
connBackendCache = rpcConnection.NewCache(backendCache, cacheSize)
}

connFactory := &rpcConnection.ConnectionFactoryImpl{
Expand All @@ -1521,9 +1521,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
AccessMetrics: accessMetrics,
Log: node.Logger,
Manager: rpcConnection.NewManager(
connBackendCache,
node.Logger,
accessMetrics,
connBackendCache,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
config.CompressorName,
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrap/run/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func GenerateExecutionState(
return flow.DummyStateCommitment, err
}

compactor, err := complete.NewCompactor(ledgerStorage, diskWal, zerolog.Nop(), capacity, checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
compactor, err := complete.NewCompactor(ledgerStorage, diskWal, zerolog.Nop(), capacity, checkpointDistance, checkpointsToKeep, atomic.NewBool(false), metricsCollector)
if err != nil {
return flow.DummyStateCommitment, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ func (exeNode *ExecutionNode) LoadExecutionStateLedgerWALCompactor(
exeNode.exeConf.checkpointDistance,
exeNode.exeConf.checkpointsToKeep,
exeNode.toTriggerCheckpoint, // compactor will listen to the signal from admin tool for force triggering checkpointing
exeNode.collector,
)
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,12 +1210,12 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
cacheSize := int(backendConfig.ConnectionPoolSize)

var connBackendCache *rpcConnection.Cache
var err error
if cacheSize > 0 {
backendCache, err := backend.NewCache(node.Logger, accessMetrics, cacheSize)
connBackendCache, err = rpcConnection.NewCache(node.Logger, accessMetrics, cacheSize)
if err != nil {
return nil, fmt.Errorf("could not initialize backend cache: %w", err)
return nil, fmt.Errorf("could not initialize connection cache: %w", err)
}
connBackendCache = rpcConnection.NewCache(backendCache, cacheSize)
}

connFactory := &rpcConnection.ConnectionFactoryImpl{
Expand All @@ -1226,9 +1226,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
AccessMetrics: accessMetrics,
Log: node.Logger,
Manager: rpcConnection.NewManager(
connBackendCache,
node.Logger,
accessMetrics,
connBackendCache,
config.MaxMsgSize,
backendConfig.CircuitBreakerConfig,
config.CompressorName,
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/checkpoint-collect-stats/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func run(*cobra.Command, []string) {
if err != nil {
log.Fatal().Err(err).Msg("cannot create ledger from write-a-head logs and checkpoints")
}
compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, math.MaxInt, 1, atomic.NewBool(false))
compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, math.MaxInt, 1, atomic.NewBool(false), &metrics.NoopCollector{})
if err != nil {
log.Fatal().Err(err).Msg("cannot create compactor")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/exec-data-json-export/ledger_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ExportLedger(ledgerPath string, targetstate string, outputPath string) erro
return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err)
}

compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
compactor, err := complete.NewCompactor(led, diskWal, zerolog.Nop(), complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{})
if err != nil {
return fmt.Errorf("cannot create compactor: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func extractExecutionState(

log.Info().Msg("init compactor")

compactor, err := complete.NewCompactor(led, diskWal, log, complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
compactor, err := complete.NewCompactor(led, diskWal, log, complete.DefaultCacheSize, checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{})
if err != nil {
return fmt.Errorf("cannot create compactor: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestExtractExecutionState(t *testing.T) {
require.NoError(t, err)
f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion)
require.NoError(t, err)
compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{})
require.NoError(t, err)
<-compactor.Ready()

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestExtractExecutionState(t *testing.T) {
checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation.
checkpointsToKeep = 1
)
compactor, err := complete.NewCompactor(storage, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false))
compactor, err := complete.NewCompactor(storage, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{})
require.NoError(t, err)

<-compactor.Ready()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *CombinedVoteProcessorV2TestSuite) TestProcess_InvalidSignatureFormat()
rapid.Check(s.T(), func(t *rapid.T) {
// create a signature with invalid length
vote := unittest.VoteForBlockFixture(s.proposal.Block, func(vote *model.Vote) {
vote.SigData = unittest.RandomBytes(generator.Draw(t, "sig-size").(int))
vote.SigData = unittest.RandomBytes(generator.Draw(t, "sig-size"))
})
err := s.processor.Process(vote)
require.Error(s.T(), err)
Expand Down Expand Up @@ -434,8 +434,8 @@ func TestCombinedVoteProcessorV2_PropertyCreatingQCCorrectness(testifyT *testing

rapid.Check(testifyT, func(t *rapid.T) {
// draw participants in range 1 <= participants <= maxParticipants
participants := rapid.Uint64Range(1, maxParticipants).Draw(t, "participants").(uint64)
beaconSignersCount := rapid.Uint64Range(participants/2+1, participants).Draw(t, "beaconSigners").(uint64)
participants := rapid.Uint64Range(1, maxParticipants).Draw(t, "participants")
beaconSignersCount := rapid.Uint64Range(participants/2+1, participants).Draw(t, "beaconSigners")
stakingSignersCount := participants - beaconSignersCount
require.Equal(t, participants, stakingSignersCount+beaconSignersCount)

Expand Down Expand Up @@ -638,20 +638,20 @@ func TestCombinedVoteProcessorV2_PropertyCreatingQCCorrectness(testifyT *testing
func TestCombinedVoteProcessorV2_PropertyCreatingQCLiveness(testifyT *testing.T) {
rapid.Check(testifyT, func(t *rapid.T) {
// draw beacon signers in range 1 <= beaconSignersCount <= 53
beaconSignersCount := rapid.Uint64Range(1, 53).Draw(t, "beaconSigners").(uint64)
beaconSignersCount := rapid.Uint64Range(1, 53).Draw(t, "beaconSigners")
// draw staking signers in range 0 <= stakingSignersCount <= 10
stakingSignersCount := rapid.Uint64Range(0, 10).Draw(t, "stakingSigners").(uint64)
stakingSignersCount := rapid.Uint64Range(0, 10).Draw(t, "stakingSigners")

stakingWeightRange, beaconWeightRange := rapid.Uint64Range(1, 10), rapid.Uint64Range(1, 10)

minRequiredWeight := uint64(0)
// draw weight for each signer randomly
stakingSigners := unittest.IdentityListFixture(int(stakingSignersCount), func(identity *flow.Identity) {
identity.InitialWeight = stakingWeightRange.Draw(t, identity.String()).(uint64)
identity.InitialWeight = stakingWeightRange.Draw(t, identity.String())
minRequiredWeight += identity.InitialWeight
})
beaconSigners := unittest.IdentityListFixture(int(beaconSignersCount), func(identity *flow.Identity) {
identity.InitialWeight = beaconWeightRange.Draw(t, identity.String()).(uint64)
identity.InitialWeight = beaconWeightRange.Draw(t, identity.String())
minRequiredWeight += identity.InitialWeight
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ func TestCombinedVoteProcessorV3_PropertyCreatingQCCorrectness(testifyT *testing

rapid.Check(testifyT, func(t *rapid.T) {
// draw participants in range 1 <= participants <= maxParticipants
participants := rapid.Uint64Range(1, maxParticipants).Draw(t, "participants").(uint64)
beaconSignersCount := rapid.Uint64Range(participants/2+1, participants).Draw(t, "beaconSigners").(uint64)
participants := rapid.Uint64Range(1, maxParticipants).Draw(t, "participants")
beaconSignersCount := rapid.Uint64Range(participants/2+1, participants).Draw(t, "beaconSigners")
stakingSignersCount := participants - beaconSignersCount
require.Equal(t, participants, stakingSignersCount+beaconSignersCount)

Expand Down Expand Up @@ -749,20 +749,20 @@ func TestCombinedVoteProcessorV3_OnlyRandomBeaconSigners(testifyT *testing.T) {
func TestCombinedVoteProcessorV3_PropertyCreatingQCLiveness(testifyT *testing.T) {
rapid.Check(testifyT, func(t *rapid.T) {
// draw beacon signers in range 1 <= beaconSignersCount <= 53
beaconSignersCount := rapid.Uint64Range(1, 53).Draw(t, "beaconSigners").(uint64)
beaconSignersCount := rapid.Uint64Range(1, 53).Draw(t, "beaconSigners")
// draw staking signers in range 0 <= stakingSignersCount <= 10
stakingSignersCount := rapid.Uint64Range(0, 10).Draw(t, "stakingSigners").(uint64)
stakingSignersCount := rapid.Uint64Range(0, 10).Draw(t, "stakingSigners")

stakingWeightRange, beaconWeightRange := rapid.Uint64Range(1, 10), rapid.Uint64Range(1, 10)

minRequiredWeight := uint64(0)
// draw weight for each signer randomly
stakingSigners := unittest.IdentityListFixture(int(stakingSignersCount), func(identity *flow.Identity) {
identity.InitialWeight = stakingWeightRange.Draw(t, identity.String()).(uint64)
identity.InitialWeight = stakingWeightRange.Draw(t, identity.String())
minRequiredWeight += identity.InitialWeight
})
beaconSigners := unittest.IdentityListFixture(int(beaconSignersCount), func(identity *flow.Identity) {
identity.InitialWeight = beaconWeightRange.Draw(t, identity.String()).(uint64)
identity.InitialWeight = beaconWeightRange.Draw(t, identity.String())
minRequiredWeight += identity.InitialWeight
})

Expand Down
2 changes: 1 addition & 1 deletion engine/access/apiproxy/access_api_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func TestNewFlowCachedAccessAPIProxy(t *testing.T) {
AccessMetrics: metrics,
CollectionNodeGRPCTimeout: time.Second,
Manager: connection.NewManager(
nil,
unittest.Logger(),
metrics,
nil,
grpcutils.DefaultMaxMsgSize,
connection.CircuitBreakerConfig{},
grpcutils.NoCompressor,
Expand Down
21 changes: 0 additions & 21 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,27 +236,6 @@ func New(params Params) (*Backend, error) {
return b, nil
}

// NewCache constructs cache for storing connections to other nodes.
// No errors are expected during normal operations.
func NewCache(
log zerolog.Logger,
metrics module.AccessMetrics,
connectionPoolSize int,
) (*lru.Cache[string, *connection.CachedClient], error) {
cache, err := lru.NewWithEvict(connectionPoolSize, func(_ string, client *connection.CachedClient) {
go client.Close() // close is blocking, so run in a goroutine

log.Debug().Str("grpc_conn_evicted", client.Address).Msg("closing grpc connection evicted from pool")
metrics.ConnectionFromPoolEvicted()
})

if err != nil {
return nil, fmt.Errorf("could not initialize connection pool cache: %w", err)
}

return cache, nil
}

func identifierList(ids []string) (flow.IdentifierList, error) {
idList := make(flow.IdentifierList, len(ids))
for i, idStr := range ids {
Expand Down
Loading

0 comments on commit 5340029

Please sign in to comment.