diff --git a/exporter/stefexporter/exporter.go b/exporter/stefexporter/exporter.go index befde59b2d6f..a96b97466489 100644 --- a/exporter/stefexporter/exporter.go +++ b/exporter/stefexporter/exporter.go @@ -43,6 +43,7 @@ type stefExporter struct { connID uint64 grpcConn *grpc.ClientConn client *stefgrpc.Client + connCancel context.CancelFunc // The STEF writer we write metrics to and which in turns sends them over gRPC. stefWriter *oteltef.MetricsWriter @@ -147,8 +148,39 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error { } s.client = stefgrpc.NewClient(settings) - grpcWriter, opts, err := s.client.Connect(ctx) + s.connCancel = nil + connCtx, connCancel := context.WithCancel(context.Background()) + + connectionAttemptDone := make(chan struct{}) + defer close(connectionAttemptDone) + + // Start a goroutine that waits for success, failure or cancellation of + // the connection attempt. + go func() { + // Wait for either connection attempt to be done or for the caller + // of ensureConnected() to give up. + select { + case <-ctx.Done(): + // The caller of ensureConnected() cancelled while we are waiting + // for connection to be established. We have to cancel the + // connection attempt (and the whole connection if it raced us and + // managed to connect - we will reconnect later again in that case). + s.set.Logger.Debug("Canceling connection context because ensureConnected() caller cancelled.") + connCancel() + case <-connectionAttemptDone: + // Connection attempt finished (successfully or no). No need to wait for the + // previous case, calling connCancel() is not needed anymore now. It will be + // called later, when disconnecting. + // From this moment we are essentially detaching from the Context + // that passed to ensureConnected() since we wanted to honor it only + // for the duration of the connection attempt, but not for the duration + // of the entire existence of the connection. + } + }() + + grpcWriter, opts, err := s.client.Connect(connCtx) if err != nil { + connCancel() return fmt.Errorf("failed to connect to destination: %w", err) } @@ -157,10 +189,17 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error { // Create STEF record writer over gRPC. s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts) if err != nil { + connCancel() return err } + // From this point on we consider the connection successfully established. s.isConnected = true + + // We need to call the cancel func when this connection is over so that we don't + // leak the Context we just created. This will be done in disconnect(). + s.connCancel = connCancel + s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint)) return nil @@ -174,6 +213,12 @@ func (s *stefExporter) disconnect(ctx context.Context) { return } + if s.connCancel != nil { + s.set.Logger.Debug("Calling cancel on connection context to avoid leaks") + s.connCancel() + s.connCancel = nil + } + if err := s.client.Disconnect(ctx); err != nil { s.set.Logger.Error("Failed to disconnect", zap.Error(err)) } @@ -194,6 +239,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er converter := stefpdatametrics.OtlpToSTEFUnsorted{} err := converter.WriteMetrics(md, s.stefWriter) if err != nil { + s.set.Logger.Debug("WriteMetrics failed", zap.Error(err)) + // Error to write to STEF stream typically indicates either: // 1) A problem with the connection. We need to reconnect. // 2) Encoding failure, possibly due to encoder bug. In this case @@ -221,6 +268,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er // data is sent to network. This is necessary so that the server receives it and // sends an acknowledgement back. if err = s.stefWriter.Flush(); err != nil { + s.set.Logger.Debug("Flush failed", zap.Error(err)) + // Failure to write the gRPC stream normally means something is // wrong with the connection. We need to reconnect. Disconnect here // and the next exportMetrics() call will connect again. diff --git a/exporter/stefexporter/exporter_test.go b/exporter/stefexporter/exporter_test.go index 4264593e42e5..f9ede715bdf0 100644 --- a/exporter/stefexporter/exporter_test.go +++ b/exporter/stefexporter/exporter_test.go @@ -24,6 +24,8 @@ import ( "go.opentelemetry.io/collector/pdata/testdata" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" ) @@ -332,3 +334,115 @@ func TestStartServerAfterClient(t *testing.T) { // Ensure data is received. assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load()) } + +func TestCancelBlockedExport(t *testing.T) { + logCfg := zap.NewDevelopmentConfig() + logCfg.DisableStacktrace = true + logger, _ := logCfg.Build() + + // Listen but don't accept connections. This should block gRPC connection attempt. + endpoint := testutil.GetAvailableLocalAddress(t) + listener, err := net.Listen("tcp", endpoint) + require.NoError(t, err) + defer listener.Close() + + // Start an exporter and point to the listener. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: endpoint, + TLSSetting: configtls.ClientConfig{Insecure: true}, + } + + set := exportertest.NewNopSettings() + set.TelemetrySettings.Logger = logger + + exp := newStefExporter(set.TelemetrySettings, cfg) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, exp.Start(ctx, host)) + + // Cancel after Start() returns. + // This should cancel the connection attempt that Start() initiated. + go func() { cancel() }() + + md := testdata.GenerateMetrics(1) + + // Do some attempts send with cancellation to help trigger races if there is any. + for i := 0; i < 10; i++ { + // Trying sending with server down. The connection attempt will block + // because listener does not accept connections. However exportMetrics() + // will return almost immediately because connection attempt + // context is cancelled. + ctx, cancel := context.WithCancel(context.Background()) + go func() { cancel() }() + err = exp.exportMetrics(ctx, md) + + // Sending must fail with Cancelled code. + require.Error(t, err) + stat, ok := status.FromError(err) + assert.True(t, ok) + assert.EqualValues(t, codes.Canceled, stat.Code()) + } +} + +func TestCancelAfterExport(t *testing.T) { + logCfg := zap.NewDevelopmentConfig() + logCfg.DisableStacktrace = true + logger, _ := logCfg.Build() + + mockSrv := newMockMetricDestServer(t, logger) + + // Start an exporter and point to the server. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: mockSrv.endpoint, + // Use insecure mode for tests so that we don't bother with certificates. + TLSSetting: configtls.ClientConfig{Insecure: true}, + } + + set := exportertest.NewNopSettings() + set.TelemetrySettings.Logger = logger + + exp := newStefExporter(set.TelemetrySettings, cfg) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + // Start the server. + mockSrv.start() + defer mockSrv.stop() + + host := componenttest.NewNopHost() + ctx, cancel := context.WithCancel(context.Background()) + // Cancel the context to cause connection attempt in Start() to fail. + cancel() + require.NoError(t, exp.Start(ctx, host)) + + var pointCount int64 + for i := 0; i < 10; i++ { + md := testdata.GenerateMetrics(1) + pointCount += int64(md.DataPointCount()) + ctx, cancel = context.WithCancel(context.Background()) + + err := exp.exportMetrics(ctx, md) + require.NoError(t, err) + + // Canceling context should not result in broken connection. + // We had a bug in exporter implementation that was causing the next + // exportMetrics() attempt to fail. + cancel() + } + + // Ensure all data is received. + assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load()) +}