Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create memhttp package to debug flaky testcases #594

Merged
merged 23 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4c43e4b
Debug flaky testcases with in memory network
emcfarlane Sep 18, 2023
1f1df5d
Fix lint
emcfarlane Sep 18, 2023
225aa0d
Add TODO for flaky test
emcfarlane Sep 18, 2023
27410bd
Use local networking for benchmarks
emcfarlane Sep 18, 2023
facfa53
Merge branch 'main' into ed/memtest
emcfarlane Sep 19, 2023
877b4df
Create memhttp and memhttp test packages
emcfarlane Oct 6, 2023
932533f
Fix race on RegisterShutdown
emcfarlane Oct 10, 2023
c057a62
Fix transport desc
emcfarlane Oct 10, 2023
f6a2bd7
Fix feedback
emcfarlane Oct 11, 2023
40594b7
Fix description
emcfarlane Oct 11, 2023
b9fccc6
Add Cleanup method test servers
emcfarlane Oct 13, 2023
dd98dd4
Revert moving options
emcfarlane Oct 13, 2023
716794b
Ensure response errors are reported consistently
emcfarlane Oct 22, 2023
ccd39d4
Document BlockUntilResponseReady behaviour
emcfarlane Oct 22, 2023
91afa1e
Ensure CloseWrite is called
emcfarlane Oct 23, 2023
ea743b3
Feedback remove changes to duplexHTTPCall
emcfarlane Oct 23, 2023
628915a
Add comment to clarify behaviour
emcfarlane Oct 23, 2023
23dc2da
Restrict log errors to New|Logger|Lshortfile
emcfarlane Oct 23, 2023
2ee0def
Document response close error handling
emcfarlane Oct 25, 2023
00d7f6a
Fix and document Close behaviour for pipe
emcfarlane Oct 25, 2023
758f889
Move responseBodyReady to group what it protects
emcfarlane Oct 25, 2023
fa4a554
Add CloseResponse checks in TestServer
emcfarlane Oct 25, 2023
7b062d1
Merge branch 'main' into ed/memtest
emcfarlane Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,28 @@ func TestServer(t *testing.T) {
connect.CodeOf(stream.Err()),
connect.CodeInvalidArgument,
)
assert.Nil(t, stream.Close())
})
t.Run("count_up_timeout", func(t *testing.T) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
t.Cleanup(cancel)
_, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{Number: 1}))
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeDeadlineExceeded)
})
t.Run("count_up_cancel_after_first_response", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
request := connect.NewRequest(&pingv1.CountUpRequest{Number: 5})
request.Header().Set(clientHeader, headerValue)
stream, err := client.CountUp(ctx, request)
assert.Nil(t, err)
assert.True(t, stream.Receive())
cancel()
assert.False(t, stream.Receive())
assert.NotNil(t, stream.Err())
assert.Equal(t, connect.CodeOf(stream.Err()), connect.CodeCanceled)
assert.Nil(t, stream.Close())
})
}
testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, expectSuccess bool) { //nolint:thelper
t.Run("cumsum", func(t *testing.T) {
Expand Down Expand Up @@ -285,6 +299,7 @@ func TestServer(t *testing.T) {
assert.Equal(t, connect.CodeOf(err), connect.CodeCanceled)
assert.Equal(t, got, expect)
assert.False(t, connect.IsWireError(err))
assert.Nil(t, stream.CloseResponse())
})
t.Run("cumsum_cancel_before_send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -302,6 +317,8 @@ func TestServer(t *testing.T) {
err := stream.Send(&pingv1.CumSumRequest{Number: 19})
assert.Equal(t, connect.CodeOf(err), connect.CodeCanceled, assert.Sprintf("%v", err))
assert.False(t, connect.IsWireError(err))
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
}
testErrors := func(t *testing.T, client pingv1connect.PingServiceClient) { //nolint:thelper
Expand Down
18 changes: 12 additions & 6 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ type duplexHTTPCall struct {
requestBodyReader *io.PipeReader
requestBodyWriter *io.PipeWriter

// sendRequestOnce ensures we only send the request once.
sendRequestOnce sync.Once
request *http.Request
response *http.Response

// responseReady is closed when the response is ready or when the request
// fails. Any error on request initialisation will be set on the
// responseErr. There's always a response if responseErr is nil.
responseReady chan struct{}
response *http.Response
responseErr error
}

Expand Down Expand Up @@ -177,7 +178,9 @@ func (d *duplexHTTPCall) CloseRead() error {
if d.response == nil {
return nil
}
if _, err := discard(d.response.Body); err != nil {
if _, err := discard(d.response.Body); err != nil &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added these error checks to make CloseResponse behaviour more consistent. Otherwise Close does race with context cancel on errors causing either nil or a context error. Maybe we can revisit the discard behaviour in another PR.

_ = d.response.Body.Close()
return wrapIfRSTError(err)
}
Expand Down Expand Up @@ -247,6 +250,9 @@ func (d *duplexHTTPCall) makeRequest() {
}
// Once we send a message to the server, they send a message back and
// establish the receive side of the stream.
// On error, we close the request body using the Write side of the pipe.
// This ensures HTTP2 streams receive an io.EOF from the Read side of the
// pipe. Write's check for io.ErrClosedPipe and will convert this to io.EOF.
response, err := d.httpClient.Do(d.request) //nolint:bodyclose
if err != nil {
err = wrapIfContextError(err)
Expand All @@ -257,15 +263,15 @@ func (d *duplexHTTPCall) makeRequest() {
err = NewError(CodeUnavailable, err)
}
d.responseErr = err
d.requestBodyReader.CloseWithError(io.EOF)
d.requestBodyWriter.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed? Since it was using CloseWithError and not simply Close, this would not be the source of io.ErrPipeClosed issues. In fact, I believe this will cause attempts to send on the broken stream to now return io.ErrPipeClosed instead of io.EOF.

In order to control the error received by the write half (senders), you must close the read half (which is what it was doing before), and vice versa.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close is just calling CloseWithError(nil) which then gets set to io.EOF: https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/io/pipe.go;l=100
It's always the opposite side of the pipe that gets the error with the current side getting: io.ErrClosedPipe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetError called close on read:

_ = d.requestBodyReader.Close()

Was my mistake switching to Writer, it's now fixed in the latest PR.

Copy link
Member

@jhump jhump Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current side getting: io.ErrClosedPipe
SetError called close on read:

Exactly. By closing the writer here, we will induce io.ErrClosedPipe in calls to send (which is what used the writer). The old behavior, closing the reader, is better since it results in EOF errors instead.

Was my mistake switching to Writer, it's now fixed in the latest PR.

I don't follow this. I'm looking at refreshed diff and it's still closing the writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I confused myself and switched the ordering. My previous comments don't align.

Closing the write side makes Write on the pipe error with io.ErrClosedPipe. Connect checks for this when sending:

if err != nil && errors.Is(err, io.ErrClosedPipe) {

However, on the read side we need to return io.EOF to avoid HTTP2 setting the io.ErrClosedPipe as the error for the response body: #594 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseWrite behaviour was different to SetError:

return d.requestBodyWriter.Close()

_ = d.requestBodyReader.Close()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you for explaining!

return
}
// We've got a response. We can now read from the response body.
// Closing the response body is delegated to the caller.
// Closing the response body is delegated to the caller even on error.
d.response = response
if err := d.validateResponse(response); err != nil {
d.responseErr = err
d.requestBodyReader.CloseWithError(io.EOF)
d.requestBodyWriter.Close()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
Expand All @@ -278,7 +284,7 @@ func (d *duplexHTTPCall) makeRequest() {
response.ProtoMajor,
response.ProtoMinor,
)
d.requestBodyReader.CloseWithError(io.EOF)
d.requestBodyWriter.Close()
}
}

Expand Down