Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait for HTTP server serve() termination #348

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
serverTypes "github.com/open-telemetry/opamp-go/server/types"
)

var (
errAlreadyStarted = errors.New("already started")
var errAlreadyStarted = errors.New("already started")

const (
defaultOpAMPPath = "/v1/opamp"
headerContentType = "Content-Type"
headerContentEncoding = "Content-Encoding"
headerAcceptEncoding = "Accept-Encoding"
contentEncodingGzip = "gzip"
contentTypeProtobuf = "application/x-protobuf"
)

const defaultOpAMPPath = "/v1/opamp"
const headerContentType = "Content-Type"
const headerContentEncoding = "Content-Encoding"
const headerAcceptEncoding = "Accept-Encoding"
const contentEncodingGzip = "gzip"
const contentTypeProtobuf = "application/x-protobuf"

type server struct {
logger types.Logger
settings Settings
Expand All @@ -39,7 +39,8 @@

// The listening HTTP Server after successful Start() call. Nil if Start()
// is not called or was not successful.
httpServer *http.Server
httpServer *http.Server
httpServerServeWg *sync.WaitGroup

// The network address Server is listening on. Nil if not started.
addr net.Addr
Expand Down Expand Up @@ -108,6 +109,9 @@
ConnContext: contextWithConn,
}
s.httpServer = hs
httpServerServeWg := sync.WaitGroup{}
httpServerServeWg.Add(1)
s.httpServerServeWg = &httpServerServeWg

listenAddr := s.httpServer.Addr

Expand All @@ -118,15 +122,21 @@
}
err = s.startHttpServer(
listenAddr,
func(l net.Listener) error { return hs.ServeTLS(l, "", "") },
func(l net.Listener) error {
defer httpServerServeWg.Done()
return hs.ServeTLS(l, "", "")
},

Check warning on line 128 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L125-L128

Added lines #L125 - L128 were not covered by tests
)
} else {
if listenAddr == "" {
listenAddr = ":http"
}
err = s.startHttpServer(
listenAddr,
func(l net.Listener) error { return hs.Serve(l) },
func(l net.Listener) error {
defer httpServerServeWg.Done()
return hs.Serve(l)
},
)
}
return err
Expand Down Expand Up @@ -159,7 +169,11 @@
defer func() { s.httpServer = nil }()
// This stops accepting new connections. TODO: close existing
// connections and wait them to be terminated.
return s.httpServer.Shutdown(ctx)
err := s.httpServer.Shutdown(ctx)
if err != nil {
return err
}

Check warning on line 175 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L174-L175

Added lines #L174 - L175 were not covered by tests
s.httpServerServeWg.Wait()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to honour ctx cancellation. If context is Done(), Stop() needs to return with an error. I am not sure if this is going to be the behavior, given that Wait() can wait indefinitely long. Please add a test that verifies the cancellation case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, added in 4e4986f

}
return nil
}
Expand Down Expand Up @@ -366,7 +380,6 @@
w.Header().Set(headerContentEncoding, contentEncodingGzip)
}
_, err = w.Write(bodyBytes)

if err != nil {
s.logger.Debugf(req.Context(), "Cannot send HTTP response: %v", err)
}
Expand Down
22 changes: 17 additions & 5 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ func TestServerStartStop(t *testing.T) {
assert.NoError(t, err)
}

func TestServerStartStopIdempotency(t *testing.T) {
endpoint := testhelpers.GetAvailableLocalAddress()
for i := 0; i < 10; i++ {
t.Run(fmt.Sprintf("Attempt #%d: ", i), func(t *testing.T) {
srv := startServer(t, &StartSettings{
ListenEndpoint: endpoint,
})

err := srv.Start(StartSettings{})
assert.ErrorIs(t, err, errAlreadyStarted)

err = srv.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestServerStartStopWithMiddleware(t *testing.T) {
var addedMiddleware atomic.Bool
assert.False(t, addedMiddleware.Load())
Expand Down Expand Up @@ -620,7 +637,6 @@ func TestServerAttachSendMessagePlainHTTP(t *testing.T) {
}

func TestServerHonoursClientRequestContentEncoding(t *testing.T) {

hc := http.Client{}
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
Expand Down Expand Up @@ -698,7 +714,6 @@ func TestServerHonoursClientRequestContentEncoding(t *testing.T) {
}

func TestServerHonoursAcceptEncoding(t *testing.T) {

hc := http.Client{}
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
Expand Down Expand Up @@ -985,7 +1000,6 @@ func BenchmarkSendToClient(b *testing.B) {
}
srv := New(&sharedinternal.NopLogger{})
err := srv.Start(*settings)

if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -1017,7 +1031,6 @@ func BenchmarkSendToClient(b *testing.B) {

for _, conn := range serverConnections {
err := conn.Send(context.Background(), &protobufs.ServerToAgent{})

if err != nil {
b.Error(err)
}
Expand All @@ -1026,5 +1039,4 @@ func BenchmarkSendToClient(b *testing.B) {
for _, conn := range clientConnections {
conn.Close()
}

}
Loading