Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait for HTTP server serve() termination #348

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ type OpAMPServer interface {
// accept connections.
Start(settings StartSettings) error

// Stop accepting new connections and close all current connections. This should
// block until all connections are closed.
// Stop accepting new connections and close all current connections.
// This operation should block until both the server socket and all
// connections have been closed.
Stop(ctx context.Context) error

// Addr returns the network address Server is listening on. Nil if not started.
Expand Down
46 changes: 32 additions & 14 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
serverTypes "github.com/open-telemetry/opamp-go/server/types"
)

var (
errAlreadyStarted = errors.New("already started")
var errAlreadyStarted = errors.New("already started")

const (
defaultOpAMPPath = "/v1/opamp"
headerContentType = "Content-Type"
headerContentEncoding = "Content-Encoding"
headerAcceptEncoding = "Accept-Encoding"
contentEncodingGzip = "gzip"
contentTypeProtobuf = "application/x-protobuf"
)

const defaultOpAMPPath = "/v1/opamp"
const headerContentType = "Content-Type"
const headerContentEncoding = "Content-Encoding"
const headerAcceptEncoding = "Accept-Encoding"
const contentEncodingGzip = "gzip"
const contentTypeProtobuf = "application/x-protobuf"

type server struct {
logger types.Logger
settings Settings
Expand All @@ -39,7 +39,8 @@

// The listening HTTP Server after successful Start() call. Nil if Start()
// is not called or was not successful.
httpServer *http.Server
httpServer *http.Server
httpServerServeWg *sync.WaitGroup

// The network address Server is listening on. Nil if not started.
addr net.Addr
Expand Down Expand Up @@ -108,6 +109,9 @@
ConnContext: contextWithConn,
}
s.httpServer = hs
httpServerServeWg := sync.WaitGroup{}
httpServerServeWg.Add(1)
s.httpServerServeWg = &httpServerServeWg

listenAddr := s.httpServer.Addr

Expand All @@ -118,15 +122,21 @@
}
err = s.startHttpServer(
listenAddr,
func(l net.Listener) error { return hs.ServeTLS(l, "", "") },
func(l net.Listener) error {
defer httpServerServeWg.Done()
return hs.ServeTLS(l, "", "")
},

Check warning on line 128 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L125-L128

Added lines #L125 - L128 were not covered by tests
)
} else {
if listenAddr == "" {
listenAddr = ":http"
}
err = s.startHttpServer(
listenAddr,
func(l net.Listener) error { return hs.Serve(l) },
func(l net.Listener) error {
defer httpServerServeWg.Done()
return hs.Serve(l)
},
)
}
return err
Expand Down Expand Up @@ -159,7 +169,16 @@
defer func() { s.httpServer = nil }()
// This stops accepting new connections. TODO: close existing
// connections and wait them to be terminated.
return s.httpServer.Shutdown(ctx)
err := s.httpServer.Shutdown(ctx)
if err != nil {
return err
}

Check warning on line 175 in server/serverimpl.go

View check run for this annotation

Codecov / codecov/patch

server/serverimpl.go#L174-L175

Added lines #L174 - L175 were not covered by tests
select {
case <-ctx.Done():
return ctx.Err()
default:
s.httpServerServeWg.Wait()
}
}
return nil
}
Expand Down Expand Up @@ -366,7 +385,6 @@
w.Header().Set(headerContentEncoding, contentEncodingGzip)
}
_, err = w.Write(bodyBytes)

if err != nil {
s.logger.Debugf(req.Context(), "Cannot send HTTP response: %v", err)
}
Expand Down
35 changes: 30 additions & 5 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ func TestServerStartStop(t *testing.T) {
assert.NoError(t, err)
}

func TestServerStartStopWithCancel(t *testing.T) {
srv := startServer(t, &StartSettings{})

err := srv.Start(StartSettings{})
assert.ErrorIs(t, err, errAlreadyStarted)

canceledCtx, cancel := context.WithCancel(context.Background())
cancel()

err = srv.Stop(canceledCtx)
assert.ErrorIs(t, err, context.Canceled)
}

func TestServerStartStopIdempotency(t *testing.T) {
endpoint := testhelpers.GetAvailableLocalAddress()
for i := 0; i < 10; i++ {
t.Run(fmt.Sprintf("Attempt #%d: ", i), func(t *testing.T) {
srv := startServer(t, &StartSettings{
ListenEndpoint: endpoint,
})

err := srv.Start(StartSettings{})
assert.ErrorIs(t, err, errAlreadyStarted)

err = srv.Stop(context.Background())
assert.NoError(t, err)
})
}
}

func TestServerStartStopWithMiddleware(t *testing.T) {
var addedMiddleware atomic.Bool
assert.False(t, addedMiddleware.Load())
Expand Down Expand Up @@ -620,7 +650,6 @@ func TestServerAttachSendMessagePlainHTTP(t *testing.T) {
}

func TestServerHonoursClientRequestContentEncoding(t *testing.T) {

hc := http.Client{}
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
Expand Down Expand Up @@ -698,7 +727,6 @@ func TestServerHonoursClientRequestContentEncoding(t *testing.T) {
}

func TestServerHonoursAcceptEncoding(t *testing.T) {

hc := http.Client{}
var rcvMsg atomic.Value
var onConnectedCalled, onCloseCalled int32
Expand Down Expand Up @@ -985,7 +1013,6 @@ func BenchmarkSendToClient(b *testing.B) {
}
srv := New(&sharedinternal.NopLogger{})
err := srv.Start(*settings)

if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -1017,7 +1044,6 @@ func BenchmarkSendToClient(b *testing.B) {

for _, conn := range serverConnections {
err := conn.Send(context.Background(), &protobufs.ServerToAgent{})

if err != nil {
b.Error(err)
}
Expand All @@ -1026,5 +1052,4 @@ func BenchmarkSendToClient(b *testing.B) {
for _, conn := range clientConnections {
conn.Close()
}

}
Loading