Skip to content

Commit

Permalink
Use atomic.Bool when int was used as a bool variable (open-telemetry#…
Browse files Browse the repository at this point in the history
…5218)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored and Nicholaswang committed Jun 7, 2022
1 parent cbc5c5c commit 389715e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 39 deletions.
14 changes: 7 additions & 7 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
import (
"sync"

uatomic "go.uber.org/atomic"
"go.uber.org/atomic"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
Expand All @@ -29,8 +29,8 @@ import (
// the items from the top of the queue until its size drops back to maxSize
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *uatomic.Uint32
stopped *uatomic.Uint32
size *atomic.Uint32
stopped *atomic.Bool
items chan interface{}
onDroppedItem func(item interface{})
factory func() consumer
Expand All @@ -45,8 +45,8 @@ func NewBoundedMemoryQueue(capacity int, onDroppedItem func(item interface{})) P
onDroppedItem: onDroppedItem,
items: make(chan interface{}, capacity),
stopCh: make(chan struct{}),
stopped: uatomic.NewUint32(0),
size: uatomic.NewUint32(0),
stopped: atomic.NewBool(false),
size: atomic.NewUint32(0),
capacity: uint32(capacity),
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (c consumerFunc) consume(item interface{}) {

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(item interface{}) bool {
if q.stopped.Load() != 0 {
if q.stopped.Load() {
q.onDroppedItem(item)
return false
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (q *boundedMemoryQueue) Produce(item interface{}) bool {
// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue) Stop() {
q.stopped.Store(1) // disable producer
q.stopped.Store(true) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(q.items)
Expand Down
13 changes: 4 additions & 9 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,22 @@ type consumerState struct {
sync.Mutex
t *testing.T
consumed map[string]bool
consumedOnce *atomic.Int32
consumedOnce *atomic.Bool
}

func newConsumerState(t *testing.T) *consumerState {
return &consumerState{
t: t,
consumed: make(map[string]bool),
consumedOnce: atomic.NewInt32(0),
consumedOnce: atomic.NewBool(false),
}
}

func (s *consumerState) record(val string) {
s.Lock()
defer s.Unlock()
s.consumed[val] = true
s.consumedOnce.Store(1)
s.consumedOnce.Store(true)
}

func (s *consumerState) snapshot() map[string]bool {
Expand All @@ -128,12 +128,7 @@ func (s *consumerState) snapshot() map[string]bool {
}

func (s *consumerState) waitToConsumeOnce() {
for i := 0; i < 1000; i++ {
if s.consumedOnce.Load() == 0 {
time.Sleep(time.Millisecond)
}
}
require.EqualValues(s.t, 1, s.consumedOnce.Load(), "expected to consumer once")
require.Eventually(s.t, s.consumedOnce.Load, 2*time.Second, 10*time.Millisecond, "expected to consumer once")
}

func (s *consumerState) assertConsumed(expected map[string]bool) {
Expand Down
27 changes: 7 additions & 20 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type memoryLimiter struct {
ballastSize uint64

// forceDrop is used atomically to indicate when data should be dropped.
forceDrop *atomic.Int64
forceDrop *atomic.Bool

ticker *time.Ticker

Expand Down Expand Up @@ -119,7 +119,7 @@ func newMemoryLimiter(set component.ProcessorCreateSettings, cfg *Config) (*memo
ticker: time.NewTicker(cfg.CheckInterval),
readMemStatsFn: runtime.ReadMemStats,
logger: logger,
forceDrop: atomic.NewInt64(0),
forceDrop: atomic.NewBool(false),
obsrep: obsreport.NewProcessor(obsreport.ProcessorSettings{
Level: set.MetricsLevel,
ProcessorID: cfg.ID(),
Expand Down Expand Up @@ -174,7 +174,7 @@ func (ml *memoryLimiter) shutdown(context.Context) error {

func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
numSpans := td.SpanCount()
if ml.forcingDrop() {
if ml.forceDrop.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
Expand All @@ -193,7 +193,7 @@ func (ml *memoryLimiter) processTraces(ctx context.Context, td ptrace.Traces) (p

func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
numDataPoints := md.DataPointCount()
if ml.forcingDrop() {
if ml.forceDrop.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
Expand All @@ -211,7 +211,7 @@ func (ml *memoryLimiter) processMetrics(ctx context.Context, md pmetric.Metrics)

func (ml *memoryLimiter) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forcingDrop() {
if ml.forceDrop.Load() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
Expand Down Expand Up @@ -260,19 +260,6 @@ func (ml *memoryLimiter) startMonitoring() {
}
}

// forcingDrop indicates when memory resources need to be released.
func (ml *memoryLimiter) forcingDrop() bool {
return ml.forceDrop.Load() != 0
}

func (ml *memoryLimiter) setForcingDrop(b bool) {
var i int64
if b {
i = 1
}
ml.forceDrop.Store(i)
}

func memstatToZapField(ms *runtime.MemStats) zap.Field {
return zap.Uint64("cur_mem_mib", ms.Alloc/mibBytes)
}
Expand All @@ -296,7 +283,7 @@ func (ml *memoryLimiter) checkMemLimits() {
}

// Remember current dropping state.
wasForcingDrop := ml.forcingDrop()
wasForcingDrop := ml.forceDrop.Load()

// Check if the memory usage is above the soft limit.
mustForceDrop := ml.usageChecker.aboveSoftLimit(ms)
Expand All @@ -321,7 +308,7 @@ func (ml *memoryLimiter) checkMemLimits() {
}
}

ml.setForcingDrop(mustForceDrop)
ml.forceDrop.Store(mustForceDrop)
}

type memUsageChecker struct {
Expand Down
6 changes: 3 additions & 3 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: atomic.NewInt64(0),
forceDrop: atomic.NewBool(false),
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: atomic.NewInt64(0),
forceDrop: atomic.NewBool(false),
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
usageChecker: memUsageChecker{
memAllocLimit: 1024,
},
forceDrop: atomic.NewInt64(0),
forceDrop: atomic.NewBool(false),
readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc
},
Expand Down

0 comments on commit 389715e

Please sign in to comment.