Skip to content

Commit

Permalink
[exporter/stefexporter] Fix a context cancellation bug in STEF export…
Browse files Browse the repository at this point in the history
…er (#37944)

#### Description

STEF exporter used context that was passed to exportMetrics() as the
context for the entire gRPC stream.

This was wrong since the context is cancelled immediately after
exportMetrics returns.

The exporter now uses a separate context for the gRPC stream. This
context has a longer duration, matching the duration of the gRPC stream.

The context passed to exportMetrics() continues to be used as the
context for connection attempt. If connection is not established within
the limits of that context then the attempt to connect is correctly
aborted.

This now correctly decouples the context (and cancellation) for
connection attempt from the context of the connected gRPC stream.

#### Testing

Added unit tests to verify the bug fix.
  • Loading branch information
tigrannajaryan authored Feb 18, 2025
1 parent 3fea1c2 commit d983217
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 1 deletion.
51 changes: 50 additions & 1 deletion exporter/stefexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type stefExporter struct {
connID uint64
grpcConn *grpc.ClientConn
client *stefgrpc.Client
connCancel context.CancelFunc

// The STEF writer we write metrics to and which in turns sends them over gRPC.
stefWriter *oteltef.MetricsWriter
Expand Down Expand Up @@ -147,8 +148,39 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error {
}
s.client = stefgrpc.NewClient(settings)

grpcWriter, opts, err := s.client.Connect(ctx)
s.connCancel = nil
connCtx, connCancel := context.WithCancel(context.Background())

connectionAttemptDone := make(chan struct{})
defer close(connectionAttemptDone)

// Start a goroutine that waits for success, failure or cancellation of
// the connection attempt.
go func() {
// Wait for either connection attempt to be done or for the caller
// of ensureConnected() to give up.
select {
case <-ctx.Done():
// The caller of ensureConnected() cancelled while we are waiting
// for connection to be established. We have to cancel the
// connection attempt (and the whole connection if it raced us and
// managed to connect - we will reconnect later again in that case).
s.set.Logger.Debug("Canceling connection context because ensureConnected() caller cancelled.")
connCancel()
case <-connectionAttemptDone:
// Connection attempt finished (successfully or no). No need to wait for the
// previous case, calling connCancel() is not needed anymore now. It will be
// called later, when disconnecting.
// From this moment we are essentially detaching from the Context
// that passed to ensureConnected() since we wanted to honor it only
// for the duration of the connection attempt, but not for the duration
// of the entire existence of the connection.
}
}()

grpcWriter, opts, err := s.client.Connect(connCtx)
if err != nil {
connCancel()
return fmt.Errorf("failed to connect to destination: %w", err)
}

Expand All @@ -157,10 +189,17 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error {
// Create STEF record writer over gRPC.
s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts)
if err != nil {
connCancel()
return err
}

// From this point on we consider the connection successfully established.
s.isConnected = true

// We need to call the cancel func when this connection is over so that we don't
// leak the Context we just created. This will be done in disconnect().
s.connCancel = connCancel

s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint))

return nil
Expand All @@ -174,6 +213,12 @@ func (s *stefExporter) disconnect(ctx context.Context) {
return
}

if s.connCancel != nil {
s.set.Logger.Debug("Calling cancel on connection context to avoid leaks")
s.connCancel()
s.connCancel = nil
}

if err := s.client.Disconnect(ctx); err != nil {
s.set.Logger.Error("Failed to disconnect", zap.Error(err))
}
Expand All @@ -194,6 +239,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
converter := stefpdatametrics.OtlpToSTEFUnsorted{}
err := converter.WriteMetrics(md, s.stefWriter)
if err != nil {
s.set.Logger.Debug("WriteMetrics failed", zap.Error(err))

// Error to write to STEF stream typically indicates either:
// 1) A problem with the connection. We need to reconnect.
// 2) Encoding failure, possibly due to encoder bug. In this case
Expand Down Expand Up @@ -221,6 +268,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
// data is sent to network. This is necessary so that the server receives it and
// sends an acknowledgement back.
if err = s.stefWriter.Flush(); err != nil {
s.set.Logger.Debug("Flush failed", zap.Error(err))

// Failure to write the gRPC stream normally means something is
// wrong with the connection. We need to reconnect. Disconnect here
// and the next exportMetrics() call will connect again.
Expand Down
114 changes: 114 additions & 0 deletions exporter/stefexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/pdata/testdata"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)
Expand Down Expand Up @@ -332,3 +334,115 @@ func TestStartServerAfterClient(t *testing.T) {
// Ensure data is received.
assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load())
}

func TestCancelBlockedExport(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
logger, _ := logCfg.Build()

// Listen but don't accept connections. This should block gRPC connection attempt.
endpoint := testutil.GetAvailableLocalAddress(t)
listener, err := net.Listen("tcp", endpoint)
require.NoError(t, err)
defer listener.Close()

// Start an exporter and point to the listener.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ClientConfig = configgrpc.ClientConfig{
Endpoint: endpoint,
TLSSetting: configtls.ClientConfig{Insecure: true},
}

set := exportertest.NewNopSettings()
set.TelemetrySettings.Logger = logger

exp := newStefExporter(set.TelemetrySettings, cfg)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, exp.Start(ctx, host))

// Cancel after Start() returns.
// This should cancel the connection attempt that Start() initiated.
go func() { cancel() }()

md := testdata.GenerateMetrics(1)

// Do some attempts send with cancellation to help trigger races if there is any.
for i := 0; i < 10; i++ {
// Trying sending with server down. The connection attempt will block
// because listener does not accept connections. However exportMetrics()
// will return almost immediately because connection attempt
// context is cancelled.
ctx, cancel := context.WithCancel(context.Background())
go func() { cancel() }()
err = exp.exportMetrics(ctx, md)

// Sending must fail with Cancelled code.
require.Error(t, err)
stat, ok := status.FromError(err)
assert.True(t, ok)
assert.EqualValues(t, codes.Canceled, stat.Code())
}
}

func TestCancelAfterExport(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
logger, _ := logCfg.Build()

mockSrv := newMockMetricDestServer(t, logger)

// Start an exporter and point to the server.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ClientConfig = configgrpc.ClientConfig{
Endpoint: mockSrv.endpoint,
// Use insecure mode for tests so that we don't bother with certificates.
TLSSetting: configtls.ClientConfig{Insecure: true},
}

set := exportertest.NewNopSettings()
set.TelemetrySettings.Logger = logger

exp := newStefExporter(set.TelemetrySettings, cfg)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

// Start the server.
mockSrv.start()
defer mockSrv.stop()

host := componenttest.NewNopHost()
ctx, cancel := context.WithCancel(context.Background())
// Cancel the context to cause connection attempt in Start() to fail.
cancel()
require.NoError(t, exp.Start(ctx, host))

var pointCount int64
for i := 0; i < 10; i++ {
md := testdata.GenerateMetrics(1)
pointCount += int64(md.DataPointCount())
ctx, cancel = context.WithCancel(context.Background())

err := exp.exportMetrics(ctx, md)
require.NoError(t, err)

// Canceling context should not result in broken connection.
// We had a bug in exporter implementation that was causing the next
// exportMetrics() attempt to fail.
cancel()
}

// Ensure all data is received.
assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load())
}

0 comments on commit d983217

Please sign in to comment.