Skip to content

Commit

Permalink
Merge pull request #6817 from The-K-R-O-K/UlianaAndrukhiv/6799-inacti…
Browse files Browse the repository at this point in the history
…vity-tracker

[Access] Implement inactivity tracker
  • Loading branch information
peterargue authored Jan 2, 2025
2 parents 0f4013e + 1c4f2a7 commit d71546c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 10 deletions.
11 changes: 9 additions & 2 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/onflow/flow-go/engine/access/rest"
commonrest "github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
Expand Down Expand Up @@ -227,8 +228,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
IdleTimeout: rest.DefaultIdleTimeout,
MaxRequestSize: commonrest.DefaultMaxRequestSize,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
},
stateStreamConf: statestreambackend.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -1450,6 +1452,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
12 changes: 8 additions & 4 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ type ObserverServiceConfig struct {
registerCacheSize uint
programCacheSize uint
registerDBPruneThreshold uint64
websocketConfig websockets.Config
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -200,8 +199,9 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
IdleTimeout: rest.DefaultIdleTimeout,
MaxRequestSize: commonrest.DefaultMaxRequestSize,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
},
stateStreamConf: statestreambackend.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -254,7 +254,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
registerCacheSize: 0,
programCacheSize: 0,
registerDBPruneThreshold: pruner.DefaultThreshold,
websocketConfig: websockets.NewDefaultWebsocketConfig(),
}
}

Expand Down Expand Up @@ -814,6 +813,11 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down
8 changes: 8 additions & 0 deletions engine/access/rest/websockets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,24 @@ const (
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
WriteWait = 10 * time.Second

// DefaultInactivityTimeout is the default duration a WebSocket connection can remain open without any active subscriptions
// before being automatically closed
DefaultInactivityTimeout time.Duration = 1 * time.Minute
)

type Config struct {
MaxSubscriptionsPerConnection uint64
MaxResponsesPerSecond uint64
// InactivityTimeout specifies the duration a WebSocket connection can remain open without any active subscriptions
// before being automatically closed
InactivityTimeout time.Duration
}

func NewDefaultWebsocketConfig() Config {
return Config{
MaxSubscriptionsPerConnection: 1000,
MaxResponsesPerSecond: 1000,
InactivityTimeout: DefaultInactivityTimeout,
}
}
29 changes: 27 additions & 2 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *Controller) HandleConnection(ctx context.Context) {

err := c.configureKeepalive()
if err != nil {
c.logger.Error().Err(err).Msg("error configuring connection")
c.logger.Error().Err(err).Msg("error configuring keepalive connection")
return
}

Expand Down Expand Up @@ -237,8 +237,16 @@ func (c *Controller) keepalive(ctx context.Context) error {
}

// writeMessages reads a messages from multiplexed stream and passes them on to a client WebSocket connection.
// The multiplexed stream channel is filled by data providers
// The multiplexed stream channel is filled by data providers.
// The function tracks the last message sent and periodically checks for inactivity.
// If no messages are sent within InactivityTimeout and no active data providers exist,
// the connection will be closed.
func (c *Controller) writeMessages(ctx context.Context) error {
inactivityTicker := time.NewTicker(c.config.InactivityTimeout / 10)
defer inactivityTicker.Stop()

lastMessageSentAt := time.Now()

defer func() {
// drain the channel as some providers may still send data to it after this routine shutdowns
// so, in order to not run into deadlock there should be at least 1 reader on the channel
Expand All @@ -257,13 +265,30 @@ func (c *Controller) writeMessages(ctx context.Context) error {
return nil
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
return fmt.Errorf("failed to set the write deadline: %w", err)
}

if err := c.conn.WriteJSON(message); err != nil {
return err
}

lastMessageSentAt = time.Now()

case <-inactivityTicker.C:
hasNoActiveSubscriptions := c.dataProviders.Size() == 0
exceedsInactivityTimeout := time.Since(lastMessageSentAt) > c.config.InactivityTimeout
if hasNoActiveSubscriptions && exceedsInactivityTimeout {
c.logger.Debug().
Dur("timeout", c.config.InactivityTimeout).
Msg("connection inactive, closing due to timeout")
return fmt.Errorf("no recent activity for %v", c.config.InactivityTimeout)
}
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
Expand Down Expand Up @@ -809,6 +808,37 @@ func (s *WsControllerSuite) TestControllerShutdown() {

conn.AssertExpectations(t)
})

s.T().Run("Inactivity tracking", func(t *testing.T) {
t.Parallel()

conn := connmock.NewWebsocketConnection(t)
conn.On("Close").Return(nil).Once()
conn.On("SetReadDeadline", mock.Anything).Return(nil).Once()
conn.On("SetPongHandler", mock.AnythingOfType("func(string) error")).Return(nil).Once()

factory := dpmock.NewDataProviderFactory(t)
// Mock with short inactivity timeout for testing
wsConfig := s.wsConfig

wsConfig.InactivityTimeout = 50 * time.Millisecond
controller := NewWebSocketController(s.logger, wsConfig, conn, factory)

conn.
On("ReadJSON", mock.Anything).
Return(func(interface{}) error {
// waiting more than InactivityTimeout to make sure that read message routine busy and do not return
// an error before than inactivity tracker initiate shut down
<-time.After(wsConfig.InactivityTimeout)
return websocket.ErrCloseSent
}).
Once()

controller.HandleConnection(context.Background())
time.Sleep(wsConfig.InactivityTimeout)

conn.AssertExpectations(t)
})
}

func (s *WsControllerSuite) TestKeepaliveRoutine() {
Expand Down

0 comments on commit d71546c

Please sign in to comment.