Skip to content

Commit

Permalink
Merge pull request #63 from keep-network/pipe-it
Browse files Browse the repository at this point in the history
New Ethereum event subscription API, background event pull loop

# Overview 

There are two major changes to Ethereum subscription API proposed here:
- new subscription API with `OnEvent` and `Pipe` functions,
- background monitoring loop pulling past events from the chain.

The first change should allow implementing some handler logic easier and to avoid complex logic leading to bugs such as keep-network/keep-core#1333 or keep-network/keep-core#2052.

The second change should improve client responsiveness for operators running their nodes against Ethereum deployments that are not very reliable on the event delivery front.
  
This code has been integrated with ECDSA keep client in `keep-ecdsa` repository and can be tested there on the branch `pipe-it` (keep-network/keep-ecdsa#671).

# New API

Event subscription API has been refactored to resemble the proposition from keep-network/keep-core#491. The new event subscription mechanism allows installing event callback handler function with `OnEvent` function as well as piping events from a subscription to a channel with `Pipe` function.

Example usage of `OnEvent`:
```
handlerFn := func(
    submittingMember common.Address,
    conflictingPublicKey []byte,
    blockNumber uint64,
) {
   // (...)
}
subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).OnEvent(handlerFn)
```

The same subscription but with a `Pipe`:
```
sink := make(chan *abi.BondedECDSAKeepConflictingPublicKeySubmitted)

subscription := keepContract.ConflictingPublicKeySubmitted(
	nil, // default SubscribeOpts
	nil, // no filtering on submitting member
).Pipe(sink)
```

Currently, all our event subscriptions use function handlers. While it is convenient in some cases, in some other cases it is the opposite. For example, `OnBondedECDSAKeepCreated` handler in ECDSA client works perfectly fine as a function. It triggers the protocol and does not have to constantly monitor the state of the chain. On the other hand, `OnDKGResultSubmitted` handler from the beacon client needs to monitor the chain and exit the process of event publication in case another node has published the result. In this case, the code could be better structured with a channel-based subscription that would allow listening for block counter events and events from DKG result submitted subscription in one for-select loop.

# Background monitoring loop

Some nodes in the network are running against Ethereum setups that are not particularly reliable in delivering events. Events are not delivered, nodes are not starting key-generation, or are not participating in redemption signing. Another problem is the stability of the event subscription mechanism (see #62). If the web socket connection is dropped too often, the resubscription mechanism is not enough to receive events emitted when the connection was in a weird, stale state.

To address this problem, we introduce a background loop periodically pulling past events from the chain next to a regular `watchLogs` subscription. How often events are pulled and how many blocks are taken into account can be configured with `SubscribeOpts` parameters. 

This way, even if the event was lost by `watchLogs` subscription for whatever reason, it should be pulled by a background monitoring loop later. An extremely important implication of this change is that handlers should have a logic in place allowing them to de-duplicate received events even if a lot of time passed between receiving the original event and the duplicate.

I have been experimenting with various options here, including de-duplication events in the chain implementation layer, but none of them proved to be successful as the correct de-duplication algorithm requires domain knowledge about a certain type of an event and in what circumstances identical event emitted later should or should not be identified as a duplicate.

De-duplicator implementations should be added to `keep-core` and `keep-ecdsa` clients and are out of the scope of `keep-common` and this PR.
  • Loading branch information
nkuba authored Jan 28, 2021
2 parents bb217b9 + b3b56f6 commit 34905d2
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 120 deletions.
52 changes: 52 additions & 0 deletions pkg/chain/ethereum/ethutil/subscribe_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ethutil

import "time"

const (
// DefaultSubscribeOptsTick is the default duration with which
// past events are pulled from the chain by the subscription monitoring
// mechanism if no other value is provided in SubscribeOpts when creating
// the subscription.
DefaultSubscribeOptsTick = 15 * time.Minute

// DefaultSubscribeOptsPastBlocks is the default number of past blocks
// pulled from the chain by the subscription monitoring mechanism if no
// other value is provided in SubscribeOpts when creating the subscription.
DefaultSubscribeOptsPastBlocks = 100

// SubscriptionBackoffMax is the maximum backoff time between event
// resubscription attempts.
SubscriptionBackoffMax = 2 * time.Minute

// SubscriptionAlertThreshold is time threshold below which event
// resubscription emits an error to the logs.
// WS connection can be dropped at any moment and event resubscription will
// follow. However, if WS connection for event subscription is getting
// dropped too often, it may indicate something is wrong with Ethereum
// client. This constant defines the minimum lifetime of an event
// subscription required before the subscription failure happens and
// resubscription follows so that the resubscription does not emit an error
// to the logs alerting about potential problems with Ethereum client
// connection.
SubscriptionAlertThreshold = 15 * time.Minute
)

// SubscribeOpts specifies optional configuration options that can be passed
// when creating Ethereum event subscription.
type SubscribeOpts struct {

// Tick is the duration with which subscription monitoring mechanism
// pulls events from the chain. This mechanism is an additional process
// next to a regular watchLogs subscription making sure no events are lost
// even in case the regular subscription missed them because of, for
// example, connectivity problems.
Tick time.Duration

// PastBlocks is the number of past blocks subscription monitoring mechanism
// takes into consideration when pulling past events from the chain.
// This event pull mechanism is an additional process next to a regular
// watchLogs subscription making sure no events are lost even in case the
// regular subscription missed them because of, for example, connectivity
// problems.
PastBlocks uint64
}
10 changes: 10 additions & 0 deletions tools/generators/ethereum/command.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"

"github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter"
"github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil"
"github.com/keep-network/keep-common/pkg/cmd"

Expand Down Expand Up @@ -226,6 +227,14 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) {

miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice)

blockCounter, err := blockcounter.CreateBlockCounter(client)
if err != nil {
return nil, fmt.Errorf(
"failed to create Ethereum blockcounter: [%v]",
err,
)
}

address := common.HexToAddress(config.ContractAddresses["{{.Class}}"])

return contract.New{{.Class}}(
Expand All @@ -234,6 +243,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) {
client,
ethutil.NewNonceManager(key.Address, client),
miningWaiter,
blockCounter,
&sync.Mutex{},
)
}
10 changes: 10 additions & 0 deletions tools/generators/ethereum/command_template_content.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter"
"github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil"
"github.com/keep-network/keep-common/pkg/cmd"
Expand Down Expand Up @@ -229,6 +230,14 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) {
miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice)
blockCounter, err := blockcounter.CreateBlockCounter(client)
if err != nil {
return nil, fmt.Errorf(
"failed to create Ethereum blockcounter: [%v]",
err,
)
}
address := common.HexToAddress(config.ContractAddresses["{{.Class}}"])
return contract.New{{.Class}}(
Expand All @@ -237,6 +246,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) {
client,
ethutil.NewNonceManager(key.Address, client),
miningWaiter,
blockCounter,
&sync.Mutex{},
)
}
Expand Down
19 changes: 4 additions & 15 deletions tools/generators/ethereum/contract.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/ipfs/go-log"

"github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter"
"github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil"
"github.com/keep-network/keep-common/pkg/subscription"
)
Expand All @@ -25,21 +26,6 @@ import (
// included or excluded from logging at startup by name.
var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}")

const (
// Maximum backoff time between event resubscription attempts.
{{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute

// Threshold below which event resubscription emits an error to the logs.
// WS connection can be dropped at any moment and event resubscription will
// follow. However, if WS connection for event subscription is getting
// dropped too often, it may indicate something is wrong with Ethereum
// client. This constant defines the minimum lifetime of an event
// subscription required before the subscription failure happens and
// resubscription follows so that the resubscription does not emit an error
// to the logs alerting about potential problems with Ethereum client.
{{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute
)

type {{.Class}} struct {
contract *abi.{{.AbiClass}}
contractAddress common.Address
Expand All @@ -51,6 +37,7 @@ type {{.Class}} struct {
errorResolver *ethutil.ErrorResolver
nonceManager *ethutil.NonceManager
miningWaiter *ethutil.MiningWaiter
blockCounter *blockcounter.EthereumBlockCounter

transactionMutex *sync.Mutex
}
Expand All @@ -61,6 +48,7 @@ func New{{.Class}}(
backend bind.ContractBackend,
nonceManager *ethutil.NonceManager,
miningWaiter *ethutil.MiningWaiter,
blockCounter *blockcounter.EthereumBlockCounter,
transactionMutex *sync.Mutex,
) (*{{.Class}}, error) {
callerOptions := &bind.CallOpts{
Expand Down Expand Up @@ -99,6 +87,7 @@ func New{{.Class}}(
errorResolver: ethutil.NewErrorResolver(backend, &contractABI, &contractAddress),
nonceManager: nonceManager,
miningWaiter: miningWaiter,
blockCounter: blockCounter,
transactionMutex: transactionMutex,
}, nil
}
Expand Down
178 changes: 134 additions & 44 deletions tools/generators/ethereum/contract_events.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,132 @@
{{- $logger := (print $contract.ShortVar "Logger") -}}
{{- range $i, $event := .Events }}

type {{$contract.FullVar}}{{$event.CapsName}}Func func(
{{$event.ParamDeclarations -}}
)

func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events(
startBlock uint64,
endBlock *uint64,
func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}(
opts *ethutil.SubscribeOpts,
{{$event.IndexedFilterDeclarations -}}
) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){
iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}(
&bind.FilterOpts{
Start: startBlock,
End: endBlock,
},
{{$event.IndexedFilters}}
)
if err != nil {
return nil, fmt.Errorf(
"error retrieving past {{$event.CapsName}} events: [%v]",
err,
)
) *{{$event.SubscriptionCapsName}} {
if opts == nil {
opts = new(ethutil.SubscribeOpts)
}
if opts.Tick == 0 {
opts.Tick = ethutil.DefaultSubscribeOptsTick
}
if opts.PastBlocks == 0 {
opts.PastBlocks = ethutil.DefaultSubscribeOptsPastBlocks
}

events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0)

for iterator.Next() {
event := iterator.Event
events = append(events, event)
return &{{$event.SubscriptionCapsName}}{
{{$contract.ShortVar}},
opts,
{{$event.IndexedFilters}}
}
}

return events, nil
type {{$event.SubscriptionCapsName}} struct {
contract *{{$contract.Class}}
opts *ethutil.SubscribeOpts
{{$event.IndexedFilterFields -}}
}

func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}(
success {{$contract.FullVar}}{{$event.CapsName}}Func,
{{$event.IndexedFilterDeclarations -}}
) (subscription.EventSubscription) {
eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}})
type {{$contract.FullVar}}{{$event.CapsName}}Func func(
{{$event.ParamDeclarations -}}
)

func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent(
handler {{$contract.FullVar}}{{$event.CapsName}}Func,
) subscription.EventSubscription {
eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}})
ctx, cancelCtx := context.WithCancel(context.Background())

// TODO: Watch* function will soon accept channel as a parameter instead
// of the callback. This loop will be eliminated then.
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-eventOccurred:
success(
{{$event.ParamExtractors}}
case event := <- eventChan:
handler(
{{$event.ParamExtractors}}
)
}
}
}()

sub := {{$event.SubscriptionShortVar}}.Pipe(eventChan)
return subscription.NewEventSubscription(func() {
sub.Unsubscribe()
cancelCtx()
})
}

func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe(
sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}},
) subscription.EventSubscription {
ctx, cancelCtx := context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.Tick)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
lastBlock, err := {{$event.SubscriptionShortVar}}.contract.blockCounter.CurrentBlock()
if err != nil {
{{$logger}}.Errorf(
"subscription failed to pull events: [%v]",
err,
)
}
fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.PastBlocks

{{$logger}}.Infof(
"subscription monitoring fetching past {{$event.CapsName}} events " +
"starting from block [%v]",
fromBlock,
)
events, err := {{$event.SubscriptionShortVar}}.contract.Past{{$event.CapsName}}Events(
fromBlock,
nil,
{{$event.IndexedFilterExtractors}}
)
if err != nil {
{{$logger}}.Errorf(
"subscription failed to pull events: [%v]",
err,
)
continue
}
{{$logger}}.Infof(
"subscription monitoring fetched [%v] past {{$event.CapsName}} events",
len(events),
)

for _, event := range events {
sink <- event
}
}
}
}()

sub := {{$event.SubscriptionShortVar}}.contract.watch{{$event.CapsName}}(
sink,
{{$event.IndexedFilterExtractors}}
)

return subscription.NewEventSubscription(func() {
sub.Unsubscribe()
cancelCtx()
})
}

func ({{$contract.ShortVar}} *{{$contract.Class}}) watch{{$event.CapsName}}(
sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}},
{{$event.IndexedFilterDeclarations -}}
) event.Subscription {
subscribeFn := func(ctx context.Context) (event.Subscription, error) {
return {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}(
&bind.WatchOpts{Context: ctx},
eventOccurred,
sink,
{{$event.IndexedFilters}}
)
}
Expand All @@ -84,18 +150,42 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}(
)
}

sub := ethutil.WithResubscription(
{{$contract.ShortVar}}SubscriptionBackoffMax,
return ethutil.WithResubscription(
ethutil.SubscriptionBackoffMax,
subscribeFn,
{{$contract.ShortVar}}SubscriptionAlertThreshold,
ethutil.SubscriptionAlertThreshold,
thresholdViolatedFn,
subscriptionFailedFn,
)
}

return subscription.NewEventSubscription(func() {
sub.Unsubscribe()
cancelCtx()
})
func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events(
startBlock uint64,
endBlock *uint64,
{{$event.IndexedFilterDeclarations -}}
) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){
iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}(
&bind.FilterOpts{
Start: startBlock,
End: endBlock,
},
{{$event.IndexedFilters}}
)
if err != nil {
return nil, fmt.Errorf(
"error retrieving past {{$event.CapsName}} events: [%v]",
err,
)
}

events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0)

for iterator.Next() {
event := iterator.Event
events = append(events, event)
}

return events, nil
}

{{- end -}}
Loading

0 comments on commit 34905d2

Please sign in to comment.