From 456f607476b04c5401dcb2525ccf38cb381152f9 Mon Sep 17 00:00:00 2001 From: akats7 Date: Mon, 11 Dec 2023 18:26:38 -0500 Subject: [PATCH] Added traces tests and comments --- connector/failoverconnector/factory_test.go | 13 +- connector/failoverconnector/failover.go | 54 ++--- .../internal/state/pipeline_selector.go | 108 +++++++--- .../failoverconnector/internal/state/utils.go | 24 +-- connector/failoverconnector/traces.go | 6 +- connector/failoverconnector/traces_test.go | 189 ++++++++++++++++++ 6 files changed, 299 insertions(+), 95 deletions(-) diff --git a/connector/failoverconnector/factory_test.go b/connector/failoverconnector/factory_test.go index 9a7c573b792e..671193416add 100644 --- a/connector/failoverconnector/factory_test.go +++ b/connector/failoverconnector/factory_test.go @@ -15,20 +15,23 @@ import ( ) func TestNewFactory(t *testing.T) { + traces0 := component.NewIDWithName(component.DataTypeTraces, "0") + traces1 := component.NewIDWithName(component.DataTypeTraces, "1") + traces2 := component.NewIDWithName(component.DataTypeTraces, "2") cfg := &Config{ - PipelinePriority: [][]component.ID{{component.NewIDWithName(component.DataTypeTraces, "0"), component.NewIDWithName(component.DataTypeTraces, "1")}, {component.NewIDWithName(component.DataTypeTraces, "2")}}, + PipelinePriority: [][]component.ID{{traces0, traces1}, {traces2}}, RetryInterval: 5 * time.Minute, RetryGap: 10 * time.Second, MaxRetries: 5, } router := connectortest.NewTracesRouter( - connectortest.WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "0")), - connectortest.WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "1")), + connectortest.WithNopTraces(traces0), + connectortest.WithNopTraces(traces1), + connectortest.WithNopTraces(traces2), ) - factory := NewFactory() - conn, err := factory.CreateTracesToTraces(context.Background(), + conn, err := NewFactory().CreateTracesToTraces(context.Background(), connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) assert.NoError(t, err) diff --git a/connector/failoverconnector/failover.go b/connector/failoverconnector/failover.go index 97a4e3cc3631..8bae5c3d6f1e 100644 --- a/connector/failoverconnector/failover.go +++ b/connector/failoverconnector/failover.go @@ -40,7 +40,7 @@ func newFailoverRouter[C any](provider consumerProvider[C], cfg *Config) *failov func (f *failoverRouter[C]) getCurrentConsumer() (C, int, bool) { // if currentIndex incremented passed bounds of pipeline list var nilConsumer C - idx := f.pS.GetCurrentIndex() + idx := f.pS.CurrentIndex() if idx >= len(f.cfg.PipelinePriority) { return nilConsumer, -1, false } @@ -63,7 +63,7 @@ func (f *failoverRouter[C]) registerConsumers() error { func (f *failoverRouter[C]) handlePipelineError(idx int) { // avoids race condition in case of consumeSIGNAL invocations // where index was updated during execution - if idx != f.pS.GetCurrentIndex() { + if idx != f.pS.CurrentIndex() { return } doRetry := f.pS.IndexIsStable(idx) @@ -88,7 +88,7 @@ func (f *failoverRouter[C]) enableRetry(ctx context.Context) { ticker := time.NewTicker(f.cfg.RetryInterval) defer ticker.Stop() - stableIndex := f.pS.GetStableIndex() + stableIndex := f.pS.StableIndex() var cancelFunc context.CancelFunc // checkContinueRetry checks that any higher priority levels have retries remaining // (have not exceeded their maxRetries) @@ -113,41 +113,14 @@ func (f *failoverRouter[C]) enableRetry(ctx context.Context) { // interval starts in the middle of the execution func (f *failoverRouter[C]) handleRetry(parentCtx context.Context, stableIndex int) context.CancelFunc { retryCtx, cancelFunc := context.WithCancel(parentCtx) - go f.retryHighPriorityPipelines(retryCtx, stableIndex) + go f.pS.RetryHighPriorityPipelines(retryCtx, stableIndex, f.cfg.RetryGap) return cancelFunc } -// retryHighPriorityPipelines responsible for single iteration through all higher priority pipelines -func (f *failoverRouter[C]) retryHighPriorityPipelines(ctx context.Context, stableIndex int) { - ticker := time.NewTicker(f.cfg.RetryGap) - - defer ticker.Stop() - - for i := 0; i < stableIndex; i++ { - // if stableIndex was updated to a higher priority level during the execution of the goroutine - // will return to avoid overwriting higher priority level with lower one - if stableIndex > f.pS.GetStableIndex() { - return - } - // checks that max retries were not used for this index - if f.pS.MaxRetriesUsed(i) { - continue - } - select { - // return when context is cancelled by parent goroutine - case <-ctx.Done(): - return - case <-ticker.C: - // when ticker triggers currentIndex is updated - f.pS.SetToRetryIndex(i) - } - } -} - // checkStopRetry checks if retry should be suspended if all higher priority levels have exceeded their max retries func (f *failoverRouter[C]) checkContinueRetry(index int) bool { for i := 0; i < index; i++ { - if f.pS.PipelineRetries[i] < f.cfg.MaxRetries { + if f.pS.IndexRetryCount(i) < f.cfg.MaxRetries { return true } } @@ -157,11 +130,14 @@ func (f *failoverRouter[C]) checkContinueRetry(index int) bool { // reportStable reports back to the failoverRouter that the current priority level that was called by Consume.SIGNAL was // stable func (f *failoverRouter[C]) reportStable(idx int) { - // is stableIndex is already the known stableIndex return - if f.pS.IndexIsStable(idx) { - return - } - // if the stableIndex is a retried index, the update the stable index to the retried index - // NOTE retry will not stop due to potential higher priority index still available - f.pS.SetNewStableIndex(idx) + f.pS.ReportStable(idx) +} + +// For Testing +func (f *failoverRouter[C]) GetConsumerAtIndex(idx int) C { + return f.consumers[idx] +} + +func (f *failoverRouter[C]) ModifyConsumerAtIndex(idx int, c C) { + f.consumers[idx] = c } diff --git a/connector/failoverconnector/internal/state/pipeline_selector.go b/connector/failoverconnector/internal/state/pipeline_selector.go index e415b6613364..8b1d410b95ed 100644 --- a/connector/failoverconnector/internal/state/pipeline_selector.go +++ b/connector/failoverconnector/internal/state/pipeline_selector.go @@ -4,15 +4,17 @@ package state // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/state" import ( + "context" "sync" + "time" ) // PipelineSelector is meant to serve as the source of truth for the target priority level type PipelineSelector struct { - CurrentIndex int - StableIndex int + currentIndex int + stableIndex int lock sync.RWMutex - PipelineRetries []int + pipelineRetries []int maxRetry int } @@ -21,81 +23,127 @@ type PipelineSelector struct { // priority index that was set during a retry, in which case we return to the stable index func (p *PipelineSelector) UpdatePipelineIndex(idx int) { if p.IndexIsStable(idx) { - p.SetToNextPriorityPipeline() + p.setToNextPriorityPipeline(idx) + return } - p.SetToStableIndex() + p.setToStableIndex(idx) } // NextPipeline skips through any lower priority pipelines that have exceeded their maxRetries // and sets the first that has not as the new stable -func (p *PipelineSelector) SetToNextPriorityPipeline() { +func (p *PipelineSelector) setToNextPriorityPipeline(idx int) { p.lock.Lock() defer p.lock.Unlock() - for ok := true; ok; ok = p.exceededMaxRetries() { - p.CurrentIndex++ + for ok := true; ok; ok = p.exceededMaxRetries(idx) { + idx++ } - p.StableIndex = p.CurrentIndex + p.stableIndex = idx } -func (p *PipelineSelector) exceededMaxRetries() bool { - return p.CurrentIndex < len(p.PipelineRetries) && (p.PipelineRetries[p.CurrentIndex] >= p.maxRetry) +// retryHighPriorityPipelines responsible for single iteration through all higher priority pipelines +func (p *PipelineSelector) RetryHighPriorityPipelines(ctx context.Context, stableIndex int, retryGap time.Duration) { + ticker := time.NewTicker(retryGap) + + defer ticker.Stop() + + for i := 0; i < stableIndex; i++ { + // if stableIndex was updated to a higher priority level during the execution of the goroutine + // will return to avoid overwriting higher priority level with lower one + if stableIndex > p.StableIndex() { + return + } + // checks that max retries were not used for this index + if p.MaxRetriesUsed(i) { + continue + } + select { + // return when context is cancelled by parent goroutine + case <-ctx.Done(): + return + case <-ticker.C: + // when ticker triggers currentIndex is updated + p.setToCurrentIndex(i) + } + } +} + +func (p *PipelineSelector) exceededMaxRetries(idx int) bool { + return idx < len(p.pipelineRetries) && (p.pipelineRetries[idx] >= p.maxRetry) } // SetToStableIndex returns the CurrentIndex to the known Stable Index -func (p *PipelineSelector) SetToStableIndex() { +func (p *PipelineSelector) setToStableIndex(idx int) { p.lock.Lock() defer p.lock.Unlock() - p.PipelineRetries[p.CurrentIndex]++ - p.CurrentIndex = p.StableIndex + p.pipelineRetries[idx]++ + p.currentIndex = p.stableIndex } // SetToRetryIndex accepts a param and sets the CurrentIndex to this index value -func (p *PipelineSelector) SetToRetryIndex(index int) { +func (p *PipelineSelector) setToCurrentIndex(index int) { p.lock.Lock() defer p.lock.Unlock() - p.CurrentIndex = index + p.currentIndex = index } // MaxRetriesUsed exported access to maxRetriesUsed -func (p *PipelineSelector) MaxRetriesUsed(index int) bool { +func (p *PipelineSelector) MaxRetriesUsed(idx int) bool { p.lock.RLock() defer p.lock.RUnlock() - return p.PipelineRetries[index] >= p.maxRetry + return p.pipelineRetries[idx] >= p.maxRetry } // SetNewStableIndex Update stableIndex to the passed stable index -func (p *PipelineSelector) SetNewStableIndex(idx int) { +func (p *PipelineSelector) setNewStableIndex(idx int) { p.lock.Lock() - defer p.lock.RUnlock() - p.PipelineRetries[p.CurrentIndex] = 0 - p.StableIndex = idx + defer p.lock.Unlock() + p.pipelineRetries[idx] = 0 + p.stableIndex = idx } // IndexIsStable returns if index passed is the stable index func (p *PipelineSelector) IndexIsStable(idx int) bool { p.lock.RLock() defer p.lock.RUnlock() - return idx == p.StableIndex + return p.stableIndex == idx } -func (p *PipelineSelector) GetStableIndex() int { +func (p *PipelineSelector) StableIndex() int { p.lock.RLock() defer p.lock.RUnlock() - return p.StableIndex + return p.stableIndex } -func (p *PipelineSelector) GetCurrentIndex() int { +func (p *PipelineSelector) CurrentIndex() int { p.lock.RLock() defer p.lock.RUnlock() - return p.CurrentIndex + return p.currentIndex +} + +func (p *PipelineSelector) IndexRetryCount(idx int) int { + p.lock.RLock() + defer p.lock.RUnlock() + return p.pipelineRetries[idx] +} + +// reportStable reports back to the failoverRouter that the current priority level that was called by Consume.SIGNAL was +// stable +func (p *PipelineSelector) ReportStable(idx int) { + // is stableIndex is already the known stableIndex return + if p.IndexIsStable(idx) { + return + } + // if the stableIndex is a retried index, the update the stable index to the retried index + // NOTE retry will not stop due to potential higher priority index still available + p.setNewStableIndex(idx) } func NewPipelineSelector(lenPriority int, maxRetries int) *PipelineSelector { return &PipelineSelector{ - CurrentIndex: 0, - StableIndex: 0, + currentIndex: 0, + stableIndex: 0, lock: sync.RWMutex{}, - PipelineRetries: make([]int, lenPriority), + pipelineRetries: make([]int, lenPriority), maxRetry: maxRetries, } } diff --git a/connector/failoverconnector/internal/state/utils.go b/connector/failoverconnector/internal/state/utils.go index 5a8120057495..c84814e63ec4 100644 --- a/connector/failoverconnector/internal/state/utils.go +++ b/connector/failoverconnector/internal/state/utils.go @@ -9,30 +9,18 @@ import ( ) type TryLock struct { - lock chan struct{} + lock sync.Mutex } -func NewTryLock() *TryLock { - return &TryLock{ - lock: make(chan struct{}, 1), - } -} - -// Lock tries to write to a channel of size 1 to maintain a single access point to a resource -// if not default case will return -// NOTE: may need to update logic in future so that concurrent calls to consume block while the lock is acquired -// and then return automatically once lock is released to avoid repeated calls to consume before indexes are updated -func (tl *TryLock) Lock(fn func(int), arg int) { - select { - case tl.lock <- struct{}{}: - defer tl.Unlock() +func (t *TryLock) TryExecute(fn func(int), arg int) { + if t.lock.TryLock() { + defer t.lock.Unlock() fn(arg) - default: } } -func (tl *TryLock) Unlock() { - <-tl.lock +func NewTryLock() *TryLock { + return &TryLock{} } // Manages cancel function for retry goroutine, ends up cleaner than using channels diff --git a/connector/failoverconnector/traces.go b/connector/failoverconnector/traces.go index acc45f3df163..135a39ff7401 100644 --- a/connector/failoverconnector/traces.go +++ b/connector/failoverconnector/traces.go @@ -41,7 +41,7 @@ func (f *tracesFailover) ConsumeTraces(ctx context.Context, td ptrace.Traces) er if err == nil { // trylock to make sure for concurrent calls multiple invocations don't try to update the failover // state simultaneously and don't wait to acquire lock in pipeline selector - f.stableTryLock.Lock(f.failover.reportStable, idx) + f.stableTryLock.TryExecute(f.failover.reportStable, idx) return nil } return f.FailoverTraces(ctx, td) @@ -56,11 +56,11 @@ func (f *tracesFailover) FailoverTraces(ctx context.Context, td ptrace.Traces) e // in case of err handlePipelineError is called through tryLock // tryLock is to avoid race conditions from concurrent calls to handlePipelineError, only first call should enter // see state.TryLock - f.errTryLock.Lock(f.failover.handlePipelineError, idx) + f.errTryLock.TryExecute(f.failover.handlePipelineError, idx) continue } // when healthy pipeline is found, reported back to failover component - f.stableTryLock.Lock(f.failover.reportStable, idx) + f.stableTryLock.TryExecute(f.failover.reportStable, idx) return nil } f.logger.Error("All provided pipelines return errors, dropping data") diff --git a/connector/failoverconnector/traces_test.go b/connector/failoverconnector/traces_test.go index 6b681925623a..b5747db9e9aa 100644 --- a/connector/failoverconnector/traces_test.go +++ b/connector/failoverconnector/traces_test.go @@ -2,3 +2,192 @@ // SPDX-License-Identifier: Apache-2.0 package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector" +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +var errTracesConsumer = errors.New("Error from ConsumeTraces") + +func TestTracesRegisterConsumers(t *testing.T) { + var sinkFirst, sinkSecond, sinkThird consumertest.TracesSink + tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") + tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") + tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") + + cfg := &Config{ + PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, + RetryInterval: 5 * time.Minute, + RetryGap: 10 * time.Second, + MaxRetries: 5, + } + + router := connectortest.NewTracesRouter( + connectortest.WithTracesSink(tracesFirst, &sinkFirst), + connectortest.WithTracesSink(tracesSecond, &sinkSecond), + connectortest.WithTracesSink(tracesThird, &sinkThird), + ) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) + + failoverConnector := conn.(*tracesFailover) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + require.NoError(t, err) + require.NotNil(t, conn) + + tc, idx, ok := failoverConnector.failover.getCurrentConsumer() + tc1 := failoverConnector.failover.GetConsumerAtIndex(1) + tc2 := failoverConnector.failover.GetConsumerAtIndex(2) + + assert.True(t, ok) + require.Equal(t, idx, 0) + require.Implements(t, (*consumer.Traces)(nil), tc) + require.Implements(t, (*consumer.Traces)(nil), tc1) + require.Implements(t, (*consumer.Traces)(nil), tc2) +} + +func TestTracesWithValidFailover(t *testing.T) { + var sinkSecond, sinkThird consumertest.TracesSink + tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") + tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") + tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") + + cfg := &Config{ + PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, + RetryInterval: 5 * time.Minute, + RetryGap: 10 * time.Second, + MaxRetries: 5, + } + + router := connectortest.NewTracesRouter( + connectortest.WithNopTraces(tracesFirst), + connectortest.WithTracesSink(tracesSecond, &sinkSecond), + connectortest.WithTracesSink(tracesThird, &sinkThird), + ) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) + + require.NoError(t, err) + + failoverConnector := conn.(*tracesFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + tr := sampleTrace() + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + _, idx, ok := failoverConnector.failover.getCurrentConsumer() + assert.True(t, ok) + require.Equal(t, idx, 1) +} + +func TestTracesWithFailoverError(t *testing.T) { + var sinkSecond, sinkThird consumertest.TracesSink + tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") + tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") + tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") + + cfg := &Config{ + PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, + RetryInterval: 5 * time.Minute, + RetryGap: 10 * time.Second, + MaxRetries: 5, + } + + router := connectortest.NewTracesRouter( + connectortest.WithNopTraces(tracesFirst), + connectortest.WithTracesSink(tracesSecond, &sinkSecond), + connectortest.WithTracesSink(tracesThird, &sinkThird), + ) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) + + require.NoError(t, err) + + failoverConnector := conn.(*tracesFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer)) + failoverConnector.failover.ModifyConsumerAtIndex(2, consumertest.NewErr(errTracesConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + tr := sampleTrace() + + assert.EqualError(t, conn.ConsumeTraces(context.Background(), tr), "All provided pipelines return errors") +} + +func TestTracesWithFailoverRecovery(t *testing.T) { + var sinkSecond, sinkThird consumertest.TracesSink + tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") + tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") + tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") + + cfg := &Config{ + PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, + RetryInterval: 50 * time.Millisecond, + RetryGap: 10 * time.Millisecond, + MaxRetries: 1000, + } + + router := connectortest.NewTracesRouter( + connectortest.WithNopTraces(tracesFirst), + connectortest.WithTracesSink(tracesSecond, &sinkSecond), + connectortest.WithTracesSink(tracesThird, &sinkThird), + ) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) + + require.NoError(t, err) + + failoverConnector := conn.(*tracesFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + tr := sampleTrace() + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + _, idx, ok := failoverConnector.failover.getCurrentConsumer() + + assert.True(t, ok) + require.Equal(t, idx, 1) + + // Simulate recovery of exporter + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) + + time.Sleep(100 * time.Millisecond) + + _, idx, ok = failoverConnector.failover.getCurrentConsumer() + assert.True(t, ok) + require.Equal(t, idx, 0) +} + +func sampleTrace() ptrace.Traces { + tr := ptrace.NewTraces() + rl := tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutStr("conn", "failover") + span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("SampleSpan") + return tr +}