Skip to content

Commit

Permalink
feat: flags for conn params
Browse files Browse the repository at this point in the history
  • Loading branch information
aeddi committed Jan 19, 2025
1 parent 19f3dbb commit bdaddaf
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 80 deletions.
48 changes: 38 additions & 10 deletions contribs/gnokms/internal/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,30 @@ import (
"time"

"github.com/gnolang/gno/gno.land/pkg/log"
"github.com/gnolang/gno/tm2/pkg/bft/privval"
"go.uber.org/zap/zapcore"
)

type Flags struct {
ChainID string
NodeAddr string
DialTimeout time.Duration
LogLevel string
LogFormat string
ChainID string
NodeAddr string
DialTimeout time.Duration
DialMaxRetries uint
DialRetryInterval time.Duration
ReadWriteTimeout time.Duration
LogLevel string
LogFormat string
}

var defaultFlags = Flags{
ChainID: "dev",
NodeAddr: "tcp://127.0.0.1:26659",
DialTimeout: time.Millisecond * 3000,
LogLevel: zapcore.DebugLevel.String(),
LogFormat: log.ConsoleFormat.String(),
ChainID: "dev",
NodeAddr: "tcp://127.0.0.1:26659",
DialTimeout: time.Second * 3,
DialMaxRetries: privval.DefaultMaxDialRetries,
DialRetryInterval: privval.DefaultDialRetryIntervalMS * time.Millisecond,
ReadWriteTimeout: privval.DefaultReadWriteTimeoutSeconds * time.Second,
LogLevel: zapcore.DebugLevel.String(),
LogFormat: log.ConsoleFormat.String(),
}

func (f *Flags) RegisterFlags(fs *flag.FlagSet) {
Expand All @@ -46,6 +53,27 @@ func (f *Flags) RegisterFlags(fs *flag.FlagSet) {
"timeout for dialing node using TCP",
)

fs.UintVar(
&f.DialMaxRetries,
"max-retries",
defaultFlags.DialMaxRetries,
"maximum number of retries to dial node",
)

fs.DurationVar(
&f.DialRetryInterval,
"retry-interval",
defaultFlags.DialRetryInterval,
"interval to wait between dial retries",
)

fs.DurationVar(
&f.ReadWriteTimeout,
"rw-timeout",
defaultFlags.ReadWriteTimeout,
"timeout for read/write operations",
)

fs.StringVar(
&f.LogLevel,
"log-level",
Expand Down
14 changes: 9 additions & 5 deletions contribs/gnokms/internal/common/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ func NewSignerServer(
return nil, nil, err
}

// Initialize the remote signer server with the dialer and the gnokey private validator.
server := privval.NewSignerServer(
privval.NewSignerDialerEndpoint(logger, dialer),
commonFlags.ChainID,
privVal,
endpoint := privval.NewSignerDialerEndpoint(
logger,
dialer,
privval.SignerDialerEndpointMaxDialRetries(commonFlags.DialMaxRetries),
privval.SignerDialerEndpointDialRetryInterval(commonFlags.DialRetryInterval),
privval.SignerDialerEndpointReadWriteTimeout(commonFlags.ReadWriteTimeout),
)

// Initialize the remote signer server with the dialer and the gnokey private validator.
server := privval.NewSignerServer(endpoint, commonFlags.ChainID, privVal)

return server, flush, nil
}

Expand Down
4 changes: 2 additions & 2 deletions tm2/pkg/bft/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
log.NewTestingLogger(t),
dialer,
)
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
privval.SignerDialerEndpointReadWriteTimeout(100 * time.Millisecond)(dialerEndpoint)

signerServer := privval.NewSignerServer(
dialerEndpoint,
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
log.NewTestingLogger(t),
dialer,
)
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
privval.SignerDialerEndpointReadWriteTimeout(100 * time.Millisecond)(dialerEndpoint)

pvsc := privval.NewSignerServer(
dialerEndpoint,
Expand Down
70 changes: 48 additions & 22 deletions tm2/pkg/bft/privval/signer_dialer_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,40 @@ import (
)

const (
defaultMaxDialRetries = 10
defaultRetryWaitMilliseconds = 100
DefaultMaxDialRetries = 10
DefaultDialRetryIntervalMS = 100
)

// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
type SignerServiceEndpointOption func(*SignerDialerEndpoint)

// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
// from external signing processes.
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
// SignerDialerEndpointReadWriteTimeout sets the read and write timeout for
// connections from client processes.
func SignerDialerEndpointReadWriteTimeout(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.readWriteTimeout = timeout }
}

// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
// SignerDialerEndpointMaxDialRetries sets the amount of attempted retries to
// acceptNewConnection.
func SignerDialerEndpointMaxDialRetries(retries uint) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.maxDialRetries = retries }
}

// SignerDialerEndpoint dials using its dialer and responds to any
// signature requests using its privVal.
// SignerDialerEndpointDialRetryInterval sets the retry wait interval to a
// custom value.
func SignerDialerEndpointDialRetryInterval(interval time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.dialRetryInterval = interval }

Check warning on line 33 in tm2/pkg/bft/privval/signer_dialer_endpoint.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/privval/signer_dialer_endpoint.go#L32-L33

Added lines #L32 - L33 were not covered by tests
}

// SignerDialerEndpoint dials using its dialer and responds to any signature
// requests using its privVal.
type SignerDialerEndpoint struct {
signerEndpoint

dialer SocketDialer

retryWait time.Duration
maxConnRetries int
maxDialRetries uint
dialRetryInterval time.Duration
}

// NewSignerDialerEndpoint returns a SignerDialerEndpoint that will dial using the given
Expand All @@ -43,15 +50,20 @@ type SignerDialerEndpoint struct {
func NewSignerDialerEndpoint(
logger *slog.Logger,
dialer SocketDialer,
options ...SignerServiceEndpointOption,
) *SignerDialerEndpoint {
sd := &SignerDialerEndpoint{
dialer: dialer,
retryWait: defaultRetryWaitMilliseconds * time.Millisecond,
maxConnRetries: defaultMaxDialRetries,
dialer: dialer,
dialRetryInterval: DefaultDialRetryIntervalMS * time.Millisecond,
maxDialRetries: DefaultMaxDialRetries,
}

sd.BaseService = *service.NewBaseService(logger, "SignerDialerEndpoint", sd)
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
sd.signerEndpoint.readWriteTimeout = DefaultReadWriteTimeoutSeconds * time.Second

for _, option := range options {
option(sd)
}

Check warning on line 66 in tm2/pkg/bft/privval/signer_dialer_endpoint.go

View check run for this annotation

Codecov / codecov/patch

tm2/pkg/bft/privval/signer_dialer_endpoint.go#L65-L66

Added lines #L65 - L66 were not covered by tests

return sd
}
Expand All @@ -61,23 +73,37 @@ func (sd *SignerDialerEndpoint) ensureConnection() error {
return nil
}

retries := 0
for retries < sd.maxConnRetries {
retries := uint(0)
for retries < sd.maxDialRetries {
conn, err := sd.dialer()

if err != nil {
retries++
sd.Logger.Debug("SignerDialer: Reconnection failed", "retries", retries, "max", sd.maxConnRetries, "err", err)
sd.Logger.Debug(
"SignerDialer: Reconnection failed",
"retries",
retries,
"max",
sd.maxDialRetries,
"err",
err,
)
// Wait between retries
time.Sleep(sd.retryWait)
time.Sleep(sd.dialRetryInterval)
} else {
sd.SetConnection(conn)
sd.Logger.Debug("SignerDialer: Connection Ready")
return nil
}
}

sd.Logger.Debug("SignerDialer: Max retries exceeded", "retries", retries, "max", sd.maxConnRetries)
sd.Logger.Debug(
"SignerDialer: Max retries exceeded",
"retries",
retries,
"max",
sd.maxDialRetries,
)

return ErrNoConnection
}
8 changes: 4 additions & 4 deletions tm2/pkg/bft/privval/signer_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
defaultTimeoutReadWriteSeconds = 3
DefaultReadWriteTimeoutSeconds = 3
)

type signerEndpoint struct {
Expand All @@ -21,7 +21,7 @@ type signerEndpoint struct {
connMtx sync.Mutex
conn net.Conn

timeoutReadWrite time.Duration
readWriteTimeout time.Duration
}

// Close closes the underlying net.Conn.
Expand Down Expand Up @@ -89,7 +89,7 @@ func (se *signerEndpoint) ReadMessage() (msg SignerMessage, err error) {
}

// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
deadline := time.Now().Add(se.readWriteTimeout)

err = se.conn.SetReadDeadline(deadline)
if err != nil {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (se *signerEndpoint) WriteMessage(msg SignerMessage) (err error) {
}

// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
deadline := time.Now().Add(se.readWriteTimeout)
se.Logger.Debug("Write::Error Resetting deadline", "obj", se)

err = se.conn.SetWriteDeadline(deadline)
Expand Down
6 changes: 3 additions & 3 deletions tm2/pkg/bft/privval/signer_listener_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func NewSignerListenerEndpoint(
) *SignerListenerEndpoint {
sc := &SignerListenerEndpoint{
listener: listener,
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
timeoutAccept: DefaultTimeoutAcceptSeconds * time.Second,
}

sc.BaseService = *service.NewBaseService(logger, "SignerListenerEndpoint", sc)
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
sc.signerEndpoint.readWriteTimeout = DefaultReadWriteTimeoutSeconds * time.Second
return sc
}

Expand All @@ -48,7 +48,7 @@ func (sl *SignerListenerEndpoint) OnStart() error {
sl.connectRequestCh = make(chan struct{})
sl.connectionAvailableCh = make(chan net.Conn)

sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
sl.pingTimer = time.NewTicker(DefaultPingPeriodMilliseconds * time.Millisecond)

go sl.serviceLoop()
go sl.pingLoop()
Expand Down
22 changes: 11 additions & 11 deletions tm2/pkg/bft/privval/signer_listener_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
testTimeoutAccept = DefaultTimeoutAcceptSeconds * time.Second

testTimeoutReadWrite = 100 * time.Millisecond
testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one
Expand All @@ -36,16 +36,16 @@ func TestSignerRemoteRetryTCPOnly(t *testing.T) {
t.Parallel()

var (
attemptCh = make(chan int)
retries = 10
attemptCh = make(chan uint)
retries = uint(10)
)

ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

// Continuously Accept connection and close {attempts} times
go func(ln net.Listener, attemptCh chan<- int) {
attempts := 0
go func(ln net.Listener, attemptCh chan<- uint) {
attempts := uint(0)
for {
conn, err := ln.Accept()
require.NoError(t, err)
Expand All @@ -66,8 +66,8 @@ func TestSignerRemoteRetryTCPOnly(t *testing.T) {
log.NewTestingLogger(t),
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey(), nil),
)
SignerDialerEndpointTimeoutReadWrite(time.Millisecond)(dialerEndpoint)
SignerDialerEndpointConnRetries(retries)(dialerEndpoint)
SignerDialerEndpointReadWriteTimeout(time.Millisecond)(dialerEndpoint)
SignerDialerEndpointMaxDialRetries(retries)(dialerEndpoint)

chainID := random.RandStr(12)
mockPV := types.NewMockPV()
Expand Down Expand Up @@ -102,8 +102,8 @@ func TestRetryConnToRemoteSigner(t *testing.T) {
logger,
tc.dialer,
)
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointConnRetries(10)(dialerEndpoint)
SignerDialerEndpointReadWriteTimeout(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointMaxDialRetries(10)(dialerEndpoint)

signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV)

Expand Down Expand Up @@ -184,8 +184,8 @@ func getMockEndpoints(
listenerEndpoint = newSignerListenerEndpoint(logger, l, testTimeoutReadWrite)
)

SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointConnRetries(1e6)(dialerEndpoint)
SignerDialerEndpointReadWriteTimeout(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointMaxDialRetries(1e6)(dialerEndpoint)

startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)

Expand Down
12 changes: 6 additions & 6 deletions tm2/pkg/bft/privval/socket_listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const (
defaultTimeoutAcceptSeconds = 3
defaultPingPeriodMilliseconds = 100
DefaultTimeoutAcceptSeconds = 3
DefaultPingPeriodMilliseconds = 100
)

// timeoutError can be used to check if an error returned from the netp package
Expand Down Expand Up @@ -63,8 +63,8 @@ func NewTCPListener(
TCPListener: ln.(*net.TCPListener),
listenerKey: listenerKey,
authorizedKeys: authorizedKeys,
timeoutAccept: time.Second * defaultTimeoutAcceptSeconds,
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
timeoutAccept: time.Second * DefaultTimeoutAcceptSeconds,
timeoutReadWrite: time.Second * DefaultReadWriteTimeoutSeconds,
}
}

Expand Down Expand Up @@ -131,8 +131,8 @@ type unixListener struct {
func NewUnixListener(ln net.Listener) *unixListener {
return &unixListener{
UnixListener: ln.(*net.UnixListener),
timeoutAccept: time.Second * defaultTimeoutAcceptSeconds,
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
timeoutAccept: time.Second * DefaultTimeoutAcceptSeconds,
timeoutReadWrite: time.Second * DefaultReadWriteTimeoutSeconds,
}
}

Expand Down
Loading

0 comments on commit bdaddaf

Please sign in to comment.