Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/logstransform]: Fix shutdown ordering leading to panic #31153

Merged
merged 9 commits into from
Feb 9, 2024
13 changes: 13 additions & 0 deletions .chloggen/fix_log-transform-shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logstransformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential panic on shutdown due to incorrect shutdown order

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31139]
125 changes: 91 additions & 34 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type logsTransformProcessor struct {
emitter *adapter.LogEmitter
converter *adapter.Converter
fromConverter *adapter.FromPdataConverter
wg sync.WaitGroup
shutdownFns []component.ShutdownFunc
}

Expand Down Expand Up @@ -64,25 +63,81 @@ func (ltp *logsTransformProcessor) Capabilities() consumer.Capabilities {
}

func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
for _, fn := range ltp.shutdownFns {
ltp.logger.Info("Stopping logs transform processor")
// We call the shutdown functions in reverse order, so that the last thing we started
// is stopped first.
for i := len(ltp.shutdownFns) - 1; i >= 0; i-- {
fn := ltp.shutdownFns[i]

if err := fn(ctx); err != nil {
return err
}
}
ltp.wg.Wait()

return nil
}

func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host) error {
// create all objects before starting them, since the loops (consumerLoop, converterLoop) depend on these converters not being nil.
ltp.converter = adapter.NewConverter(ltp.logger)

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)

// data flows in this order:
// ConsumeLogs: receives logs and forwards them for conversion to stanza format ->
// fromConverter: converts logs to stanza format ->
// converterLoop: forwards converted logs to the stanza pipeline ->
// pipeline: performs user configured operations on the logs ->
// emitterLoop: forwards output stanza logs for conversion to OTLP ->
// converter: converts stanza logs to OTLP ->
// consumerLoop: sends the converted OTLP logs to the next consumer
//
// We should start these components in reverse order of the data flow, then stop them in order of the data flow,
// in order to allow for pipeline draining.
ltp.startConsumerLoop(ctx)
ltp.startConverter()
ltp.startEmitterLoop(ctx)
err := ltp.startPipeline()
if err != nil {
return err
}
ltp.startConverterLoop(ctx)
ltp.startFromConverter()

return nil
}

func (ltp *logsTransformProcessor) startFromConverter() {
ltp.fromConverter.Start()

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.fromConverter.Stop()
return nil
})
}

// startConverterLoop starts the converter loop, which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
func (ltp *logsTransformProcessor) startConverterLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.converterLoop(ctx, wg)

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
wg.Wait()
return nil
})
}

func (ltp *logsTransformProcessor) startPipeline() error {
// There is no need for this processor to use storage
err := ltp.pipe.Start(storage.NewNopClient())
if err != nil {
return err
}

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.logger.Info("Stopping logs transform processor")
return ltp.pipe.Stop()
})

Expand All @@ -92,40 +147,42 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host)
}
ltp.firstOperator = pipelineOperators[0]

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
return nil
}

ltp.converter = adapter.NewConverter(ltp.logger)
// startEmitterLoop starts the loop which reads all the logs modified by the pipeline and then forwards
// them to converter
func (ltp *logsTransformProcessor) startEmitterLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.emitterLoop(ctx, wg)

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
wg.Wait()
return nil
})
}

func (ltp *logsTransformProcessor) startConverter() {
ltp.converter.Start()

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.converter.Stop()
return nil
})
}

// startConsumerLoop starts the loop which reads all the logs produced by the converter
// (aggregated by Resource) and then places them on the next consumer
func (ltp *logsTransformProcessor) startConsumerLoop(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(1)
go ltp.consumerLoop(ctx, wg)

ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()
ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.fromConverter.Stop()
wg.Wait()
return nil
})
// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
// ...
ltp.wg.Add(1)
go ltp.converterLoop(ctx)

// * second which reads all the logs modified by the pipeline and then forwards
// them to converter
// ...
ltp.wg.Add(1)
go ltp.emitterLoop(ctx)

// ...
// * third which reads all the logs produced by the converter
// (aggregated by Resource) and then places them on the next consumer
ltp.wg.Add(1)
go ltp.consumerLoop(ctx)
return nil
}

func (ltp *logsTransformProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error {
Expand All @@ -135,8 +192,8 @@ func (ltp *logsTransformProcessor) ConsumeLogs(_ context.Context, ld plog.Logs)

// converterLoop reads the log entries produced by the fromConverter and sends them
// into the pipeline
func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) converterLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand All @@ -163,8 +220,8 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {

// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand All @@ -185,8 +242,8 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
}

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) {
defer ltp.wg.Done()
func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
Expand Down
86 changes: 86 additions & 0 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
Expand Down Expand Up @@ -195,3 +196,88 @@ func generateLogData(messages []testLogMessage) plog.Logs {

return ld
}

// laggy operator is a test operator that simulates heavy processing that takes a large amount of time.
// The heavy processing only occurs for every 100th log
type laggyOperator struct {
helper.WriterOperator
logsCount int
}

func (t *laggyOperator) Process(ctx context.Context, e *entry.Entry) error {

// Wait for a large amount of time every 100 logs
if t.logsCount%100 == 0 {
time.Sleep(100 * time.Millisecond)
}

t.logsCount++

t.Write(ctx, e)
return nil
}

func (t *laggyOperator) CanProcess() bool {
return true
}

type laggyOperatorConfig struct {
helper.WriterConfig
}

func (l *laggyOperatorConfig) Build(s *zap.SugaredLogger) (operator.Operator, error) {
wo, err := l.WriterConfig.Build(s)
if err != nil {
return nil, err
}

return &laggyOperator{
WriterOperator: wo,
}, nil
}

func TestProcessorShutdownWithSlowOperator(t *testing.T) {
operator.Register("laggy", func() operator.Builder { return &laggyOperatorConfig{} })

config := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
{
Builder: func() *laggyOperatorConfig {
l := &laggyOperatorConfig{}
l.OperatorType = "laggy"
return l
}(),
},
},
},
}

tln := new(consumertest.LogsSink)
factory := NewFactory()
ltp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), config, tln)
require.NoError(t, err)
assert.True(t, ltp.Capabilities().MutatesData)

err = ltp.Start(context.Background(), nil)
require.NoError(t, err)

testLog := plog.NewLogs()
scopeLogs := testLog.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty()

for i := 0; i < 500; i++ {
lr := scopeLogs.LogRecords().AppendEmpty()
lr.Body().SetStr("Test message")
}

// The idea is to check that shutdown, when there are a lot of entries, doesn't try to write logs to
// a closed channel, since that'll cause a panic.
// In order to test, we send a lot of logs to be consumed, then shutdown immediately.

err = ltp.ConsumeLogs(context.Background(), testLog)
require.NoError(t, err)

err = ltp.Shutdown(context.Background())
require.NoError(t, err)
}
Loading