From 533d7fd8988cc28afb712d74aca753cd823a443d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 25 Jan 2023 10:56:25 -0800 Subject: [PATCH] Cherrypick 5959: xds: fix panic involving double close of channel in xDS transport (#5971) --- xds/internal/xdsclient/transport/loadreport.go | 8 ++++++++ .../xdsclient/transport/loadreport_test.go | 14 +++++++++++++- xds/internal/xdsclient/transport/transport.go | 1 - 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/xds/internal/xdsclient/transport/loadreport.go b/xds/internal/xdsclient/transport/loadreport.go index a683afd57938..58a2e5dedb6a 100644 --- a/xds/internal/xdsclient/transport/loadreport.go +++ b/xds/internal/xdsclient/transport/loadreport.go @@ -62,6 +62,11 @@ func (t *Transport) lrsStartStream() { ctx, cancel := context.WithCancel(context.Background()) t.lrsCancelStream = cancel + + // Create a new done channel everytime a new stream is created. This ensures + // that we don't close the same channel multiple times (from lrsRunner() + // goroutine) when multiple streams are created and closed. + t.lrsRunnerDoneCh = make(chan struct{}) go t.lrsRunner(ctx) } @@ -78,6 +83,9 @@ func (t *Transport) lrsStopStream() { t.lrsCancelStream() t.logger.Infof("Stopping LRS stream") + + // Wait for the runner goroutine to exit. The done channel will be + // recreated when a new stream is created. <-t.lrsRunnerDoneCh } diff --git a/xds/internal/xdsclient/transport/loadreport_test.go b/xds/internal/xdsclient/transport/loadreport_test.go index f6203c9b4425..815ca25b27b7 100644 --- a/xds/internal/xdsclient/transport/loadreport_test.go +++ b/xds/internal/xdsclient/transport/loadreport_test.go @@ -54,7 +54,7 @@ func (s) TestReportLoad(t *testing.T) { NodeProto: nodeProto, } - // Create a transport to the fake server. + // Create a transport to the fake management server. tr, err := transport.New(transport.Options{ ServerCfg: serverCfg, UpdateHandler: func(transport.ResourceUpdate) error { return nil }, // No ADS validation. @@ -190,4 +190,16 @@ func (s) TestReportLoad(t *testing.T) { if _, err := mgmtServer.LRSStreamCloseChan.Receive(ctx); err != nil { t.Fatal("Timeout waiting for LRS stream to close") } + + // Calling the load reporting API again should result in the creation of a + // new LRS stream. This ensures that creating and closing multiple streams + // works smoothly. + _, cancelLRS3 := tr.ReportLoad() + if err != nil { + t.Fatalf("Failed to start LRS load reporting: %v", err) + } + if _, err := mgmtServer.LRSStreamOpenChan.Receive(ctx); err != nil { + t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err) + } + cancelLRS3() } diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index e0b9807c1648..814ca5f87263 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -202,7 +202,6 @@ func New(opts Options) (*Transport, error) { versions: make(map[string]string), nonces: make(map[string]string), adsRunnerDoneCh: make(chan struct{}), - lrsRunnerDoneCh: make(chan struct{}), } // This context is used for sending and receiving RPC requests and