Skip to content

Commit

Permalink
Added traces tests and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akats7 committed Dec 12, 2023
1 parent 6883e32 commit 456f607
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 95 deletions.
13 changes: 8 additions & 5 deletions connector/failoverconnector/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ import (
)

func TestNewFactory(t *testing.T) {
traces0 := component.NewIDWithName(component.DataTypeTraces, "0")
traces1 := component.NewIDWithName(component.DataTypeTraces, "1")
traces2 := component.NewIDWithName(component.DataTypeTraces, "2")
cfg := &Config{
PipelinePriority: [][]component.ID{{component.NewIDWithName(component.DataTypeTraces, "0"), component.NewIDWithName(component.DataTypeTraces, "1")}, {component.NewIDWithName(component.DataTypeTraces, "2")}},
PipelinePriority: [][]component.ID{{traces0, traces1}, {traces2}},
RetryInterval: 5 * time.Minute,
RetryGap: 10 * time.Second,
MaxRetries: 5,
}

router := connectortest.NewTracesRouter(
connectortest.WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "0")),
connectortest.WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "1")),
connectortest.WithNopTraces(traces0),
connectortest.WithNopTraces(traces1),
connectortest.WithNopTraces(traces2),
)

factory := NewFactory()
conn, err := factory.CreateTracesToTraces(context.Background(),
conn, err := NewFactory().CreateTracesToTraces(context.Background(),
connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces))

assert.NoError(t, err)
Expand Down
54 changes: 15 additions & 39 deletions connector/failoverconnector/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newFailoverRouter[C any](provider consumerProvider[C], cfg *Config) *failov
func (f *failoverRouter[C]) getCurrentConsumer() (C, int, bool) {
// if currentIndex incremented passed bounds of pipeline list
var nilConsumer C
idx := f.pS.GetCurrentIndex()
idx := f.pS.CurrentIndex()
if idx >= len(f.cfg.PipelinePriority) {
return nilConsumer, -1, false
}
Expand All @@ -63,7 +63,7 @@ func (f *failoverRouter[C]) registerConsumers() error {
func (f *failoverRouter[C]) handlePipelineError(idx int) {
// avoids race condition in case of consumeSIGNAL invocations
// where index was updated during execution
if idx != f.pS.GetCurrentIndex() {
if idx != f.pS.CurrentIndex() {
return
}
doRetry := f.pS.IndexIsStable(idx)
Expand All @@ -88,7 +88,7 @@ func (f *failoverRouter[C]) enableRetry(ctx context.Context) {
ticker := time.NewTicker(f.cfg.RetryInterval)
defer ticker.Stop()

stableIndex := f.pS.GetStableIndex()
stableIndex := f.pS.StableIndex()
var cancelFunc context.CancelFunc
// checkContinueRetry checks that any higher priority levels have retries remaining
// (have not exceeded their maxRetries)
Expand All @@ -113,41 +113,14 @@ func (f *failoverRouter[C]) enableRetry(ctx context.Context) {
// interval starts in the middle of the execution
func (f *failoverRouter[C]) handleRetry(parentCtx context.Context, stableIndex int) context.CancelFunc {
retryCtx, cancelFunc := context.WithCancel(parentCtx)
go f.retryHighPriorityPipelines(retryCtx, stableIndex)
go f.pS.RetryHighPriorityPipelines(retryCtx, stableIndex, f.cfg.RetryGap)
return cancelFunc
}

// retryHighPriorityPipelines responsible for single iteration through all higher priority pipelines
func (f *failoverRouter[C]) retryHighPriorityPipelines(ctx context.Context, stableIndex int) {
ticker := time.NewTicker(f.cfg.RetryGap)

defer ticker.Stop()

for i := 0; i < stableIndex; i++ {
// if stableIndex was updated to a higher priority level during the execution of the goroutine
// will return to avoid overwriting higher priority level with lower one
if stableIndex > f.pS.GetStableIndex() {
return
}
// checks that max retries were not used for this index
if f.pS.MaxRetriesUsed(i) {
continue
}
select {
// return when context is cancelled by parent goroutine
case <-ctx.Done():
return
case <-ticker.C:
// when ticker triggers currentIndex is updated
f.pS.SetToRetryIndex(i)
}
}
}

// checkStopRetry checks if retry should be suspended if all higher priority levels have exceeded their max retries
func (f *failoverRouter[C]) checkContinueRetry(index int) bool {
for i := 0; i < index; i++ {
if f.pS.PipelineRetries[i] < f.cfg.MaxRetries {
if f.pS.IndexRetryCount(i) < f.cfg.MaxRetries {
return true
}
}
Expand All @@ -157,11 +130,14 @@ func (f *failoverRouter[C]) checkContinueRetry(index int) bool {
// reportStable reports back to the failoverRouter that the current priority level that was called by Consume.SIGNAL was
// stable
func (f *failoverRouter[C]) reportStable(idx int) {
// is stableIndex is already the known stableIndex return
if f.pS.IndexIsStable(idx) {
return
}
// if the stableIndex is a retried index, the update the stable index to the retried index
// NOTE retry will not stop due to potential higher priority index still available
f.pS.SetNewStableIndex(idx)
f.pS.ReportStable(idx)
}

// For Testing
func (f *failoverRouter[C]) GetConsumerAtIndex(idx int) C {
return f.consumers[idx]
}

func (f *failoverRouter[C]) ModifyConsumerAtIndex(idx int, c C) {
f.consumers[idx] = c
}
108 changes: 78 additions & 30 deletions connector/failoverconnector/internal/state/pipeline_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
package state // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/state"

import (
"context"
"sync"
"time"
)

// PipelineSelector is meant to serve as the source of truth for the target priority level
type PipelineSelector struct {
CurrentIndex int
StableIndex int
currentIndex int
stableIndex int
lock sync.RWMutex
PipelineRetries []int
pipelineRetries []int
maxRetry int
}

Expand All @@ -21,81 +23,127 @@ type PipelineSelector struct {
// priority index that was set during a retry, in which case we return to the stable index
func (p *PipelineSelector) UpdatePipelineIndex(idx int) {
if p.IndexIsStable(idx) {
p.SetToNextPriorityPipeline()
p.setToNextPriorityPipeline(idx)
return
}
p.SetToStableIndex()
p.setToStableIndex(idx)
}

// NextPipeline skips through any lower priority pipelines that have exceeded their maxRetries
// and sets the first that has not as the new stable
func (p *PipelineSelector) SetToNextPriorityPipeline() {
func (p *PipelineSelector) setToNextPriorityPipeline(idx int) {
p.lock.Lock()
defer p.lock.Unlock()
for ok := true; ok; ok = p.exceededMaxRetries() {
p.CurrentIndex++
for ok := true; ok; ok = p.exceededMaxRetries(idx) {
idx++
}
p.StableIndex = p.CurrentIndex
p.stableIndex = idx
}

func (p *PipelineSelector) exceededMaxRetries() bool {
return p.CurrentIndex < len(p.PipelineRetries) && (p.PipelineRetries[p.CurrentIndex] >= p.maxRetry)
// retryHighPriorityPipelines responsible for single iteration through all higher priority pipelines
func (p *PipelineSelector) RetryHighPriorityPipelines(ctx context.Context, stableIndex int, retryGap time.Duration) {
ticker := time.NewTicker(retryGap)

defer ticker.Stop()

for i := 0; i < stableIndex; i++ {
// if stableIndex was updated to a higher priority level during the execution of the goroutine
// will return to avoid overwriting higher priority level with lower one
if stableIndex > p.StableIndex() {
return
}
// checks that max retries were not used for this index
if p.MaxRetriesUsed(i) {
continue
}
select {
// return when context is cancelled by parent goroutine
case <-ctx.Done():
return
case <-ticker.C:
// when ticker triggers currentIndex is updated
p.setToCurrentIndex(i)
}
}
}

func (p *PipelineSelector) exceededMaxRetries(idx int) bool {
return idx < len(p.pipelineRetries) && (p.pipelineRetries[idx] >= p.maxRetry)
}

// SetToStableIndex returns the CurrentIndex to the known Stable Index
func (p *PipelineSelector) SetToStableIndex() {
func (p *PipelineSelector) setToStableIndex(idx int) {
p.lock.Lock()
defer p.lock.Unlock()
p.PipelineRetries[p.CurrentIndex]++
p.CurrentIndex = p.StableIndex
p.pipelineRetries[idx]++
p.currentIndex = p.stableIndex
}

// SetToRetryIndex accepts a param and sets the CurrentIndex to this index value
func (p *PipelineSelector) SetToRetryIndex(index int) {
func (p *PipelineSelector) setToCurrentIndex(index int) {
p.lock.Lock()
defer p.lock.Unlock()
p.CurrentIndex = index
p.currentIndex = index
}

// MaxRetriesUsed exported access to maxRetriesUsed
func (p *PipelineSelector) MaxRetriesUsed(index int) bool {
func (p *PipelineSelector) MaxRetriesUsed(idx int) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.PipelineRetries[index] >= p.maxRetry
return p.pipelineRetries[idx] >= p.maxRetry
}

// SetNewStableIndex Update stableIndex to the passed stable index
func (p *PipelineSelector) SetNewStableIndex(idx int) {
func (p *PipelineSelector) setNewStableIndex(idx int) {
p.lock.Lock()
defer p.lock.RUnlock()
p.PipelineRetries[p.CurrentIndex] = 0
p.StableIndex = idx
defer p.lock.Unlock()
p.pipelineRetries[idx] = 0
p.stableIndex = idx
}

// IndexIsStable returns if index passed is the stable index
func (p *PipelineSelector) IndexIsStable(idx int) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return idx == p.StableIndex
return p.stableIndex == idx
}

func (p *PipelineSelector) GetStableIndex() int {
func (p *PipelineSelector) StableIndex() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.StableIndex
return p.stableIndex
}

func (p *PipelineSelector) GetCurrentIndex() int {
func (p *PipelineSelector) CurrentIndex() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.CurrentIndex
return p.currentIndex
}

func (p *PipelineSelector) IndexRetryCount(idx int) int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.pipelineRetries[idx]
}

// reportStable reports back to the failoverRouter that the current priority level that was called by Consume.SIGNAL was
// stable
func (p *PipelineSelector) ReportStable(idx int) {
// is stableIndex is already the known stableIndex return
if p.IndexIsStable(idx) {
return
}
// if the stableIndex is a retried index, the update the stable index to the retried index
// NOTE retry will not stop due to potential higher priority index still available
p.setNewStableIndex(idx)
}

func NewPipelineSelector(lenPriority int, maxRetries int) *PipelineSelector {
return &PipelineSelector{
CurrentIndex: 0,
StableIndex: 0,
currentIndex: 0,
stableIndex: 0,
lock: sync.RWMutex{},
PipelineRetries: make([]int, lenPriority),
pipelineRetries: make([]int, lenPriority),
maxRetry: maxRetries,
}
}
24 changes: 6 additions & 18 deletions connector/failoverconnector/internal/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,18 @@ import (
)

type TryLock struct {
lock chan struct{}
lock sync.Mutex
}

func NewTryLock() *TryLock {
return &TryLock{
lock: make(chan struct{}, 1),
}
}

// Lock tries to write to a channel of size 1 to maintain a single access point to a resource
// if not default case will return
// NOTE: may need to update logic in future so that concurrent calls to consume<SIGNAL> block while the lock is acquired
// and then return automatically once lock is released to avoid repeated calls to consume<SIGNAL> before indexes are updated
func (tl *TryLock) Lock(fn func(int), arg int) {
select {
case tl.lock <- struct{}{}:
defer tl.Unlock()
func (t *TryLock) TryExecute(fn func(int), arg int) {
if t.lock.TryLock() {
defer t.lock.Unlock()
fn(arg)
default:
}
}

func (tl *TryLock) Unlock() {
<-tl.lock
func NewTryLock() *TryLock {
return &TryLock{}
}

// Manages cancel function for retry goroutine, ends up cleaner than using channels
Expand Down
6 changes: 3 additions & 3 deletions connector/failoverconnector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (f *tracesFailover) ConsumeTraces(ctx context.Context, td ptrace.Traces) er
if err == nil {
// trylock to make sure for concurrent calls multiple invocations don't try to update the failover
// state simultaneously and don't wait to acquire lock in pipeline selector
f.stableTryLock.Lock(f.failover.reportStable, idx)
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
return f.FailoverTraces(ctx, td)
Expand All @@ -56,11 +56,11 @@ func (f *tracesFailover) FailoverTraces(ctx context.Context, td ptrace.Traces) e
// in case of err handlePipelineError is called through tryLock
// tryLock is to avoid race conditions from concurrent calls to handlePipelineError, only first call should enter
// see state.TryLock
f.errTryLock.Lock(f.failover.handlePipelineError, idx)
f.errTryLock.TryExecute(f.failover.handlePipelineError, idx)
continue
}
// when healthy pipeline is found, reported back to failover component
f.stableTryLock.Lock(f.failover.reportStable, idx)
f.stableTryLock.TryExecute(f.failover.reportStable, idx)
return nil
}
f.logger.Error("All provided pipelines return errors, dropping data")
Expand Down
Loading

0 comments on commit 456f607

Please sign in to comment.