From 40bfe67e73fd7552ba8ceecff6b8c26494badd02 Mon Sep 17 00:00:00 2001 From: Matthew Kocher Date: Thu, 16 Jun 2022 20:49:35 +0000 Subject: [PATCH 1/2] Fix race conditions in tests * Don't leave worker goroutines running in RLP Gateway tests. * Use contexts to close goroutines in envelope stream connector tests. Signed-off-by: Carson Long --- envelope_stream_connector_test.go | 62 ++++++++++++++----------------- rlp_gateway_client_test.go | 13 ++++++- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/envelope_stream_connector_test.go b/envelope_stream_connector_test.go index d39da70..64034bd 100644 --- a/envelope_stream_connector_test.go +++ b/envelope_stream_connector_test.go @@ -69,12 +69,17 @@ var _ = Describe("Connector", func() { tlsConf, ) - go func() { - rx := c.Stream(context.Background(), &loggregator_v2.EgressBatchRequest{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func(ctx context.Context) { + rx := c.Stream(ctx, &loggregator_v2.EgressBatchRequest{}) for { + if ctx.Err() != nil { + return + } rx() } - }() + }(ctx) Eventually(producer.connectionAttempts).Should(Equal(1)) producer.stop() @@ -121,16 +126,23 @@ var _ = Describe("Connector", func() { rx := c.Stream(context.Background(), &loggregator_v2.EgressBatchRequest{}) var count int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Read to allow the diode to notice it dropped data - go func() { - for range time.Tick(500 * time.Millisecond) { - // Do not invoke rx while mu is locked - l := len(rx()) - mu.Lock() - count += l - mu.Unlock() + go func(ctx context.Context) { + for { + select { + case <-time.Tick(500 * time.Millisecond): + // Do not invoke rx while mu is locked + l := len(rx()) + mu.Lock() + count += l + mu.Unlock() + case <-ctx.Done(): + return + } } - }() + }(ctx) Eventually(func() int { mu.Lock() @@ -144,7 +156,7 @@ var _ = Describe("Connector", func() { Expect(l).ToNot(BeZero()) }) - It("wont panic when context canceled", func() { + It("won't panic when context canceled", func() { producer, err := newFakeEventProducer() Expect(err).NotTo(HaveOccurred()) @@ -155,36 +167,18 @@ var _ = Describe("Connector", func() { ) Expect(err).NotTo(HaveOccurred()) - var ( - mu sync.Mutex - missed int - ) - addr := producer.addr c := loggregator.NewEnvelopeStreamConnector( - addr, + producer.addr, tlsConf, - loggregator.WithEnvelopeStreamBuffer(5, func(m int) { - mu.Lock() - defer mu.Unlock() - missed += m - }), + loggregator.WithEnvelopeStreamBuffer(5, func(m int) {}), ) - // Use a context that can be canceled ctx, cancel := context.WithCancel(context.Background()) rx := c.Stream(ctx, &loggregator_v2.EgressBatchRequest{}) - var rxReturned bool - // Read to allow the diode to notice it dropped data - go func() { - msg := rx() - Expect(msg).To(BeNil()) - rxReturned = true - }() - - // When the context is canceled, the client panics cancel() - Eventually(func() bool { return rxReturned }).Should(BeTrue()) + msg := rx() + Expect(msg).To(BeNil()) }) }) diff --git a/rlp_gateway_client_test.go b/rlp_gateway_client_test.go index ad229bb..9234af4 100644 --- a/rlp_gateway_client_test.go +++ b/rlp_gateway_client_test.go @@ -41,6 +41,7 @@ var _ = Describe("RlpGatewayClient", func() { It("requests envelopes from the RLP", func() { ch := make(chan []byte, 100) + defer close(ch) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(channelReader(ch)), @@ -76,6 +77,7 @@ var _ = Describe("RlpGatewayClient", func() { DescribeTable("encodes selectors correctly", func(selectors []*loggregator_v2.Selector, paramKey string, paramValue []string) { ch := make(chan []byte, 100) + defer close(ch) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(channelReader(ch)), @@ -180,6 +182,7 @@ var _ = Describe("RlpGatewayClient", func() { It("streams envelopes", func() { ch := make(chan []byte, 100) + defer close(ch) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(channelReader(ch)), @@ -229,6 +232,7 @@ var _ = Describe("RlpGatewayClient", func() { for i := 0; i < 10; i++ { ch <- []byte("event: heartbeat\ndata: 1541438163\n\n") } + ch <- []byte("event: closing\n") }() ctx, cancel := context.WithCancel(context.Background()) @@ -245,6 +249,7 @@ var _ = Describe("RlpGatewayClient", func() { It("handles closing events", func() { ch := make(chan []byte, 100) noCloseCh := make(chan []byte, 100) + defer close(noCloseCh) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, @@ -269,11 +274,13 @@ var _ = Describe("RlpGatewayClient", func() { }) It("reconnects for non-200 requests", func() { + ch := make(chan []byte, 100) + defer close(ch) spyDoer.resps = append(spyDoer.resps, &http.Response{StatusCode: 500}) spyDoer.resps = append(spyDoer.resps, &http.Response{StatusCode: 500}) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(channelReader(nil)), + Body: ioutil.NopCloser(channelReader(ch)), }) spyDoer.errs = []error{nil, nil, nil} ctx, cancel := context.WithCancel(context.Background()) @@ -291,9 +298,11 @@ var _ = Describe("RlpGatewayClient", func() { spyDoer.resps = append(spyDoer.resps, &http.Response{StatusCode: 200}) spyDoer.errs = append(spyDoer.errs, errors.New("some-error")) + ch := make(chan []byte, 100) + defer close(ch) spyDoer.resps = append(spyDoer.resps, &http.Response{ StatusCode: 200, - Body: ioutil.NopCloser(channelReader(nil)), + Body: ioutil.NopCloser(channelReader(ch)), }) spyDoer.errs = append(spyDoer.errs, nil) ctx, cancel := context.WithCancel(context.Background()) From 5212e528d55269a1493e55f9bc0746c44de8fb64 Mon Sep 17 00:00:00 2001 From: Carson Long Date: Thu, 16 Jun 2022 17:02:28 -0400 Subject: [PATCH 2/2] CI: add `-race` flag to go test --- .github/workflows/go.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index c915835..24169e5 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -21,4 +21,4 @@ jobs: - name: Test run: | source .envrc - go test -v ./... + go test -v -race ./...