From 0910113d46454c80881db840e21f25485dce2499 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 10 May 2024 07:16:53 -0700 Subject: [PATCH] Backport test & lint fixes from opentelemetry-collector-contrib/31996 (#190) This addresses the test flake occasionally seen in this repository. There was a channel being set to `nil` inappropriately, and uses of a direct call to channel close() when the more appropriate pattern is to cancel a context. This fixes the test flakes and lint problems from the upstream repository in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31996. --- .../internal/arrow/common_test.go | 23 ++------ .../internal/arrow/exporter_test.go | 52 +++++++++---------- .../internal/arrow/stream.go | 2 +- 3 files changed, 31 insertions(+), 46 deletions(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/common_test.go b/collector/exporter/otelarrowexporter/internal/arrow/common_test.go index 241bba05..ee97e2de 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/common_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/common_test.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "io" - "sync" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" @@ -160,7 +159,7 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context arrowpb.ArrowTracesService_ArrowTracesClient, error, ) { - return func(ctx context.Context, opts ...grpc.CallOption) ( + return func(ctx context.Context, _ ...grpc.CallOption) ( arrowpb.ArrowTracesService_ArrowTracesClient, error, ) { @@ -178,7 +177,6 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context // healthyTestChannel accepts the connection and returns an OK status immediately. type healthyTestChannel struct { - lock sync.Mutex sent chan *arrowpb.BatchArrowRecords recv chan *arrowpb.BatchStatus } @@ -190,18 +188,7 @@ func newHealthyTestChannel() *healthyTestChannel { } } -func (tc *healthyTestChannel) doClose() { - tc.lock.Lock() - defer tc.lock.Unlock() - if tc.sent != nil { - close(tc.sent) - tc.sent = nil - } -} - func (tc *healthyTestChannel) sendChannel() chan *arrowpb.BatchArrowRecords { - tc.lock.Lock() - defer tc.lock.Unlock() return tc.sent } @@ -211,7 +198,7 @@ func (tc *healthyTestChannel) onConnect(_ context.Context) error { func (tc *healthyTestChannel) onCloseSend() func() error { return func() error { - tc.doClose() + close(tc.sent) return nil } } @@ -265,7 +252,7 @@ func (tc *unresponsiveTestChannel) onCloseSend() func() error { } func (tc *unresponsiveTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error { - return func(req *arrowpb.BatchArrowRecords) error { + return func(_ *arrowpb.BatchArrowRecords) error { select { case <-ctx.Done(): return ctx.Err() @@ -313,7 +300,7 @@ func (tc *arrowUnsupportedTestChannel) onCloseSend() func() error { } func (tc *arrowUnsupportedTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error { - return func(req *arrowpb.BatchArrowRecords) error { + return func(_ *arrowpb.BatchArrowRecords) error { <-ctx.Done() return ctx.Err() } @@ -346,7 +333,7 @@ func (tc *disconnectedTestChannel) onCloseSend() func() error { } func (tc *disconnectedTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error { - return func(req *arrowpb.BatchArrowRecords) error { + return func(_ *arrowpb.BatchArrowRecords) error { panic("unreachable") } } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 3223d8bb..276e5f3f 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -76,7 +76,7 @@ func newSingleStreamDowngradeDisabledTestCase(t *testing.T, pname PrioritizerNam func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase { var count int - return newExporterTestCaseCommon(t, DefaultPrioritizer, NotNoisy, defaultMaxStreamLifetime, 1, false, func(ctx context.Context) (map[string]string, error) { + return newExporterTestCaseCommon(t, DefaultPrioritizer, NotNoisy, defaultMaxStreamLifetime, 1, false, func(_ context.Context) (map[string]string, error) { defer func() { count++ }() if count%2 == 0 { return nil, nil @@ -512,8 +512,10 @@ func TestArrowExporterStreaming(t *testing.T) { tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel)) - bg := context.Background() - require.NoError(t, tc.exporter.Start(bg)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + require.NoError(t, tc.exporter.Start(ctx)) var expectOutput []ptrace.Traces var actualOutput []ptrace.Traces @@ -534,23 +536,21 @@ func TestArrowExporterStreaming(t *testing.T) { for times := 0; times < 10; times++ { input := testdata.GenerateTraces(2) - ctx := context.Background() - sent, err := tc.exporter.SendAndWait(ctx, input) + sent, err := tc.exporter.SendAndWait(context.Background(), input) require.NoError(t, err) require.True(t, sent) expectOutput = append(expectOutput, input) } - // Stop the test conduit started above. If the sender were - // still sending, the test would panic on a closed channel. - channel.doClose() + // Stop the test conduit started above. + cancel() wg.Wait() // As this equality check doesn't support out of order slices, // we sort the slices directly in the GenerateTraces function. require.Equal(t, expectOutput, actualOutput) - require.NoError(t, tc.exporter.Shutdown(bg)) + require.NoError(t, tc.exporter.Shutdown(ctx)) }) } } @@ -562,8 +562,9 @@ func TestArrowExporterHeaders(t *testing.T) { tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel)) - bg := context.Background() - require.NoError(t, tc.exporter.Start(bg)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, tc.exporter.Start(ctx)) var expectOutput []metadata.MD var actualOutput []metadata.MD @@ -591,7 +592,6 @@ func TestArrowExporterHeaders(t *testing.T) { for times := 0; times < 10; times++ { input := testdata.GenerateTraces(2) - ctx := context.Background() if times%2 == 1 { md := metadata.MD{ @@ -606,17 +606,16 @@ func TestArrowExporterHeaders(t *testing.T) { }) } - sent, err := tc.exporter.SendAndWait(ctx, input) + sent, err := tc.exporter.SendAndWait(context.Background(), input) require.NoError(t, err) require.True(t, sent) } - // Stop the test conduit started above. If the sender were - // still sending, the test would panic on a closed channel. - channel.doClose() + // Stop the test conduit started above. + cancel() wg.Wait() require.Equal(t, expectOutput, actualOutput) - require.NoError(t, tc.exporter.Shutdown(bg)) + require.NoError(t, tc.exporter.Shutdown(ctx)) } // TestArrowExporterIsTraced tests whether trace and span ID are @@ -631,8 +630,8 @@ func TestArrowExporterIsTraced(t *testing.T) { tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel)) - bg := context.Background() - require.NoError(t, tc.exporter.Start(bg)) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, tc.exporter.Start(ctx)) var expectOutput []metadata.MD var actualOutput []metadata.MD @@ -660,17 +659,17 @@ func TestArrowExporterIsTraced(t *testing.T) { for times := 0; times < 10; times++ { input := testdata.GenerateTraces(2) - ctx := context.Background() + callCtx := context.Background() if times%2 == 1 { - ctx = trace.ContextWithSpanContext(ctx, + callCtx = trace.ContextWithSpanContext(callCtx, trace.NewSpanContext(trace.SpanContextConfig{ TraceID: [16]byte{byte(times), 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf}, SpanID: [8]byte{byte(times), 1, 2, 3, 4, 5, 6, 7}, }), ) expectMap := map[string]string{} - propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap)) + propagation.TraceContext{}.Inject(callCtx, propagation.MapCarrier(expectMap)) md := metadata.MD{ "traceparent": []string{expectMap["traceparent"]}, @@ -683,17 +682,16 @@ func TestArrowExporterIsTraced(t *testing.T) { }) } - sent, err := tc.exporter.SendAndWait(ctx, input) + sent, err := tc.exporter.SendAndWait(callCtx, input) require.NoError(t, err) require.True(t, sent) } - // Stop the test conduit started above. If the sender were - // still sending, the test would panic on a closed channel. - channel.doClose() + // Stop the test conduit started above. + cancel() wg.Wait() require.Equal(t, expectOutput, actualOutput) - require.NoError(t, tc.exporter.Shutdown(bg)) + require.NoError(t, tc.exporter.Shutdown(ctx)) }) } } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index fb16d4f0..1645937d 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -246,7 +246,7 @@ func (s *Stream) run(ctx context.Context, dc doneCancel, streamClient StreamClie // performs a blocking send(). This returns when the data is in the write buffer, // the caller waiting on its error channel. func (s *Stream) write(ctx context.Context) (retErr error) { - // always close send() + // always close the send channel when this function returns. defer func() { _ = s.client.CloseSend() }() // headers are encoding using hpack, reusing a buffer on each call.