Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/stefexporter] Fix a context cancellation bug in STEF exporter #37944

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion exporter/stefexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type stefExporter struct {
connID uint64
grpcConn *grpc.ClientConn
client *stefgrpc.Client
connCancel context.CancelFunc

// The STEF writer we write metrics to and which in turns sends them over gRPC.
stefWriter *oteltef.MetricsWriter
Expand Down Expand Up @@ -147,8 +148,39 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error {
}
s.client = stefgrpc.NewClient(settings)

grpcWriter, opts, err := s.client.Connect(ctx)
s.connCancel = nil
connCtx, connCancel := context.WithCancel(context.Background())

connectionAttemptDone := make(chan struct{})
defer close(connectionAttemptDone)

// Start a goroutine that waits for success, failure or cancellation of
// the connection attempt.
go func() {
// Wait for either connection attempt to be done or for the caller
// of ensureConnected() to give up.
select {
case <-ctx.Done():
// The caller of ensureConnected() cancelled while we are waiting
// for connection to be established. We have to cancel the
// connection attempt (and the whole connection if it raced us and
// managed to connect - we will reconnect later again in that case).
s.set.Logger.Debug("Canceling connection context because ensureConnected() caller cancelled.")
connCancel()
case <-connectionAttemptDone:
// Connection attempt finished (successfully or no). No need to wait for the
// previous case, calling connCancel() is not needed anymore now. It will be
// called later, when disconnecting.
// From this moment we are essentially detaching from the Context
// that passed to ensureConnected() since we wanted to honor it only
// for the duration of the connection attempt, but not for the duration
// of the entire existence of the connection.
}
}()

grpcWriter, opts, err := s.client.Connect(connCtx)
if err != nil {
connCancel()
return fmt.Errorf("failed to connect to destination: %w", err)
}

Expand All @@ -157,10 +189,17 @@ func (s *stefExporter) ensureConnected(ctx context.Context) error {
// Create STEF record writer over gRPC.
s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts)
if err != nil {
connCancel()
return err
}

// From this point on we consider the connection successfully established.
s.isConnected = true

// We need to call the cancel func when this connection is over so that we don't
// leak the Context we just created. This will be done in disconnect().
s.connCancel = connCancel

s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint))

return nil
Expand All @@ -174,6 +213,12 @@ func (s *stefExporter) disconnect(ctx context.Context) {
return
}

if s.connCancel != nil {
s.set.Logger.Debug("Calling cancel on connection context to avoid leaks")
s.connCancel()
s.connCancel = nil
}

if err := s.client.Disconnect(ctx); err != nil {
s.set.Logger.Error("Failed to disconnect", zap.Error(err))
}
Expand All @@ -194,6 +239,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
converter := stefpdatametrics.OtlpToSTEFUnsorted{}
err := converter.WriteMetrics(md, s.stefWriter)
if err != nil {
s.set.Logger.Debug("WriteMetrics failed", zap.Error(err))

// Error to write to STEF stream typically indicates either:
// 1) A problem with the connection. We need to reconnect.
// 2) Encoding failure, possibly due to encoder bug. In this case
Expand Down Expand Up @@ -221,6 +268,8 @@ func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) er
// data is sent to network. This is necessary so that the server receives it and
// sends an acknowledgement back.
if err = s.stefWriter.Flush(); err != nil {
s.set.Logger.Debug("Flush failed", zap.Error(err))

// Failure to write the gRPC stream normally means something is
// wrong with the connection. We need to reconnect. Disconnect here
// and the next exportMetrics() call will connect again.
Expand Down
114 changes: 114 additions & 0 deletions exporter/stefexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/pdata/testdata"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)
Expand Down Expand Up @@ -332,3 +334,115 @@ func TestStartServerAfterClient(t *testing.T) {
// Ensure data is received.
assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load())
}

func TestCancelBlockedExport(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
logger, _ := logCfg.Build()

// Listen but don't accept connections. This should block gRPC connection attempt.
endpoint := testutil.GetAvailableLocalAddress(t)
listener, err := net.Listen("tcp", endpoint)
require.NoError(t, err)
defer listener.Close()

// Start an exporter and point to the listener.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ClientConfig = configgrpc.ClientConfig{
Endpoint: endpoint,
TLSSetting: configtls.ClientConfig{Insecure: true},
}

set := exportertest.NewNopSettings()
set.TelemetrySettings.Logger = logger

exp := newStefExporter(set.TelemetrySettings, cfg)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, exp.Start(ctx, host))

// Cancel after Start() returns.
// This should cancel the connection attempt that Start() initiated.
go func() { cancel() }()

md := testdata.GenerateMetrics(1)

// Do some attempts send with cancellation to help trigger races if there is any.
for i := 0; i < 10; i++ {
// Trying sending with server down. The connection attempt will block
// because listener does not accept connections. However exportMetrics()
// will return almost immediately because connection attempt
// context is cancelled.
ctx, cancel := context.WithCancel(context.Background())
go func() { cancel() }()
err = exp.exportMetrics(ctx, md)

// Sending must fail with Cancelled code.
require.Error(t, err)
stat, ok := status.FromError(err)
assert.True(t, ok)
assert.EqualValues(t, codes.Canceled, stat.Code())
}
}

func TestCancelAfterExport(t *testing.T) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true
logger, _ := logCfg.Build()

mockSrv := newMockMetricDestServer(t, logger)

// Start an exporter and point to the server.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ClientConfig = configgrpc.ClientConfig{
Endpoint: mockSrv.endpoint,
// Use insecure mode for tests so that we don't bother with certificates.
TLSSetting: configtls.ClientConfig{Insecure: true},
}

set := exportertest.NewNopSettings()
set.TelemetrySettings.Logger = logger

exp := newStefExporter(set.TelemetrySettings, cfg)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

// Start the server.
mockSrv.start()
defer mockSrv.stop()

host := componenttest.NewNopHost()
ctx, cancel := context.WithCancel(context.Background())
// Cancel the context to cause connection attempt in Start() to fail.
cancel()
require.NoError(t, exp.Start(ctx, host))

var pointCount int64
for i := 0; i < 10; i++ {
md := testdata.GenerateMetrics(1)
pointCount += int64(md.DataPointCount())
ctx, cancel = context.WithCancel(context.Background())

err := exp.exportMetrics(ctx, md)
require.NoError(t, err)

// Canceling context should not result in broken connection.
// We had a bug in exporter implementation that was causing the next
// exportMetrics() attempt to fail.
cancel()
}

// Ensure all data is received.
assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load())
}
Loading