Skip to content

Commit

Permalink
[chore][pkg/stanza] Fix goroutine leak in regex parser (#31108)
Browse files Browse the repository at this point in the history
Fixes
#31093
  • Loading branch information
djaglowski authored Feb 8, 2024
1 parent 66a5fa5 commit 82cbc43
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 3 deletions.
24 changes: 21 additions & 3 deletions pkg/stanza/operator/parser/regex/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type cache interface {
add(key string, data any) bool
copy() map[string]any
maxSize() uint16
stop()
}

// newMemoryCache takes a cache size and a limiter interval and
Expand Down Expand Up @@ -111,6 +112,10 @@ func (m *memoryCache) maxSize() uint16 {
return uint16(cap(m.keys))
}

func (m *memoryCache) stop() {
m.limiter.stop()
}

// limiter provides rate limiting methods for
// the cache
type limiter interface {
Expand All @@ -120,6 +125,7 @@ type limiter interface {
limit() uint64
resetInterval() time.Duration
throttled() bool
stop()
}

// newStartedAtomicLimiter returns a started atomicLimiter
Expand All @@ -132,6 +138,7 @@ func newStartedAtomicLimiter(max uint64, interval uint64) *atomicLimiter {
count: &atomic.Uint64{},
max: max,
interval: time.Second * time.Duration(interval),
done: make(chan struct{}),
}

a.init()
Expand All @@ -146,6 +153,7 @@ type atomicLimiter struct {
max uint64
interval time.Duration
start sync.Once
done chan struct{}
}

var _ limiter = &atomicLimiter{count: &atomic.Uint64{}}
Expand All @@ -158,10 +166,16 @@ func (l *atomicLimiter) init() {
// During every interval period, reduce the counter
// by 10%
x := math.Round(-0.10 * float64(l.max))
ticker := time.NewTicker(l.interval)
for {
time.Sleep(l.interval)
if l.currentCount() > 0 {
l.count.Add(^uint64(x))
select {
case <-l.done:
ticker.Stop()
return
case <-ticker.C:
if l.currentCount() > 0 {
l.count.Add(^uint64(x))
}
}
}
}()
Expand Down Expand Up @@ -196,3 +210,7 @@ func (l *atomicLimiter) limit() uint64 {
func (l *atomicLimiter) resetInterval() time.Duration {
return l.interval
}

func (l *atomicLimiter) stop() {
close(l.done)
}
8 changes: 8 additions & 0 deletions pkg/stanza/operator/parser/regex/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestNewMemoryCache(t *testing.T) {

for _, tc := range cases {
output := newMemoryCache(tc.maxSize, 0)
defer output.stop()
require.Equal(t, tc.expect.cache, output.cache)
require.Len(t, output.cache, 0, "new memory should always be empty")
require.Len(t, output.keys, 0, "new memory should always be empty")
Expand Down Expand Up @@ -72,6 +73,7 @@ func TestMemory(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
defer tc.cache.stop()
for key, value := range tc.input {
tc.cache.add(key, value)
out := tc.cache.get(key)
Expand All @@ -95,6 +97,7 @@ func TestCleanupLast(t *testing.T) {
maxSize := 10

m := newMemoryCache(uint16(maxSize), 0)
defer m.stop()

// Add to cache until it is full
for i := 0; i <= cap(m.keys); i++ {
Expand Down Expand Up @@ -175,6 +178,7 @@ func TestNewStartedAtomicLimiter(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
l := newStartedAtomicLimiter(tc.max, tc.interval)
require.Equal(t, tc.max, l.max)
defer l.stop()
if tc.interval == 0 {
// default
tc.interval = 5
Expand All @@ -192,6 +196,7 @@ func TestLimiter(t *testing.T) {
l := newStartedAtomicLimiter(max, 120)
require.NotNil(t, l)
require.Equal(t, max, l.max)
defer l.stop()

require.False(t, l.throttled(), "new limiter should not be throttling")
require.Equal(t, uint64(0), l.currentCount())
Expand Down Expand Up @@ -219,6 +224,7 @@ func TestThrottledLimiter(t *testing.T) {
max: max,
count: count,
interval: 1,
done: make(chan struct{}),
}

require.True(t, l.throttled())
Expand All @@ -227,6 +233,7 @@ func TestThrottledLimiter(t *testing.T) {
// for it to reset the counter. The limiter will no longer
// be in a throttled state and the count will be reset.
l.init()
defer l.stop()
wait := 2 * l.interval
time.Sleep(time.Second * wait)
require.False(t, l.throttled())
Expand All @@ -235,6 +242,7 @@ func TestThrottledLimiter(t *testing.T) {

func TestThrottledCache(t *testing.T) {
c := newMemoryCache(3, 120)
defer c.stop()
require.False(t, c.limiter.throttled())
require.Equal(t, 4, int(c.limiter.limit()), "expected limit be cache size + 1")
require.Equal(t, float64(120), c.limiter.resetInterval().Seconds(), "expected reset interval to be 120 seconds")
Expand Down
14 changes: 14 additions & 0 deletions pkg/stanza/operator/parser/regex/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package regex

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
7 changes: 7 additions & 0 deletions pkg/stanza/operator/parser/regex/regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ type Parser struct {
cache cache
}

func (r *Parser) Stop() error {
if r.cache != nil {
r.cache.stop()
}
return nil
}

// Process will parse an entry for regex.
func (r *Parser) Process(ctx context.Context, entry *entry.Entry) error {
return r.ParserOperator.ProcessWith(ctx, entry, r.parse)
Expand Down
7 changes: 7 additions & 0 deletions pkg/stanza/operator/parser/regex/regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func TestParserInvalidType(t *testing.T) {

func TestParserCache(t *testing.T) {
parser := newTestParser(t, "^(?P<key>cache)", 200)
defer func() {
require.NoError(t, parser.Stop())
}()
_, err := parser.parse([]int{})
require.Error(t, err)
require.Contains(t, err.Error(), "type '[]int' cannot be parsed as regex")
Expand Down Expand Up @@ -135,6 +138,10 @@ func TestParserRegex(t *testing.T) {
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

defer func() {
require.NoError(t, op.Stop())
}()

fake := testutil.NewFakeOutput(t)
require.NoError(t, op.SetOutputs([]operator.Operator{fake}))

Expand Down

0 comments on commit 82cbc43

Please sign in to comment.