-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for idle connection timeouts and automatic reconnection #16
Conversation
@estensen can we double check that this will be compatible with the Fiber grpc server? https://github.com/grpc/grpc-go/blob/master/Documentation/keepalive.md |
To add more context here the concern here is that the server might disconnect clients that send keepalive too often I've reviewed the Fiber server and Tonic and can't see that it will disconnect clients because of keepalives. |
streams.go
Outdated
|
||
// Use exponential backoff with jitter | ||
sleepTime := backoff + time.Duration(rand.Int63n(int64(backoff/2))) | ||
fmt.Printf("Subscription error, retrying in %v: %v\n", sleepTime, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably implement a logger instead of this. But can we live with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have gotten feedback that people don't like it, so let's rm it for now and do a logger later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job, some questions and nits
if err == nil { | ||
// Successfully created new connection | ||
c.conn = newConn | ||
c.client = api.NewAPIClient(newConn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this also reconnect any existing streams that are open? Or should we do that manually?
|
||
// Use exponential backoff with jitter | ||
sleepTime := *backoff + time.Duration(rand.Int63n(int64(*backoff/2))) | ||
fmt.Printf("%s subscription error, retrying in %v: %v\n", subscriptionName, sleepTime, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("%s subscription error, retrying in %v: %v\n", subscriptionName, sleepTime, err) |
break | ||
} | ||
|
||
fmt.Printf("Stream error, reconnecting: %v\n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("Stream error, reconnecting: %v\n", err) |
time.Sleep(time.Second * 2) | ||
continue outer | ||
if err == io.EOF { | ||
fmt.Println("Stream completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Println("Stream completed") |
if err := tx.UnmarshalBinary(proto.RlpTransaction); err != nil { | ||
continue outer | ||
if err := tx.UnmarshalBinary(msg.RlpTransaction); err != nil { | ||
fmt.Printf("Error decoding transaction: %v\n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("Error decoding transaction: %v\n", err) |
time.Sleep(time.Second * 2) | ||
continue outer | ||
if err == io.EOF { | ||
fmt.Println("Stream completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Println("Stream completed") |
break | ||
} | ||
|
||
fmt.Printf("Block stream error, reconnecting: %v\n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("Block stream error, reconnecting: %v\n", err) |
} | ||
|
||
if decodeErr != nil { | ||
fmt.Printf("Failed to decode execution payload: %v\n", decodeErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("Failed to decode execution payload: %v\n", decodeErr) |
time.Sleep(time.Second * 2) | ||
continue outer | ||
if err == io.EOF { | ||
fmt.Println("Stream completed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Println("Stream completed") |
break | ||
} | ||
|
||
fmt.Printf("Beacon block stream error, reconnecting: %v\n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Printf("Beacon block stream error, reconnecting: %v\n", err) |
chore: add test CI
Like
http.Client
we don't set timeouts by default, which can lead to connection leaks and silent disconnections in long-running applications.This PR allows
IdleTimout
to be set to configure the timeout duration. Connections that are broken are also automatically reconnected.To keep things backwards compatible
IdleTimout
is set to0
and is disabled by default.Added test to simluate network and will add it to CI next: