Skip to content

Commit

Permalink
Address CPU usage metric corner cases (#2366)
Browse files Browse the repository at this point in the history
* Address CPU usage metric corner cases

1. If two metrics arrive with identical timestamps, set cpu usage to
previous value rather than leaving it at NaN
2. Ensure that we don't emit cloudwatch metrics with sampleCount == 0
3. Make storage sets behavior consistent with cpu and memory
4. log when cpu usage is larger than tacs cap

* Do not cap CPU Usage percent

we want to be aware of when this happens, also added a detailed
log message for further debugging.

* code review: fix capital letters
  • Loading branch information
sparrc authored Feb 19, 2020
1 parent c8781fd commit db9dad9
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 22 deletions.
23 changes: 12 additions & 11 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,20 +596,27 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]*
// Container is not terminal. Get CPU stats set.
cpuStatsSet, err := container.statsQueue.GetCPUStatsSet()
if err != nil {
seelog.Warnf("Error getting cpu stats, err: %v, container: %v", err, dockerID)
seelog.Warnf("Error getting cpu stats, skipping container, err: %v, container: %v", err, dockerID)
continue
}

memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet()
if err != nil {
seelog.Warnf("Error getting memory stats, err: %v, container: %v", err, dockerID)
seelog.Warnf("Error getting memory stats, skipping container, err: %v, container: %v", err, dockerID)
continue
}

storageStatsSet, err := container.statsQueue.GetStorageStatsSet()
if err != nil {
seelog.Warnf("Error getting storage stats, skipping container, err: %v, container: %v", err, dockerID)
continue
}

containerMetric := &ecstcs.ContainerMetric{
ContainerName: &container.containerMetadata.Name,
CpuStatsSet: cpuStatsSet,
MemoryStatsSet: memoryStatsSet,
ContainerName: &container.containerMetadata.Name,
CpuStatsSet: cpuStatsSet,
MemoryStatsSet: memoryStatsSet,
StorageStatsSet: storageStatsSet,
}

task, err := engine.resolver.ResolveTask(dockerID)
Expand All @@ -629,12 +636,6 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]*
}
}

storageStatsSet, err := container.statsQueue.GetStorageStatsSet()
if err != nil {
seelog.Warnf("Error getting storage stats, err: %v, container: %v", err, dockerID)
}
containerMetric.StorageStatsSet = storageStatsSet

containerMetrics = append(containerMetrics, containerMetric)
}

Expand Down
38 changes: 27 additions & 11 deletions agent/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

const (
// BytesInMiB is the number of bytes in a MebiByte.
BytesInMiB = 1024 * 1024
BytesInMiB = 1024 * 1024
minimumQueueDatapoints = 2
MaxCPUUsagePerc float32 = 1024 * 1024
)

const minimumQueueDatapoints = 2

// Queue abstracts a queue using UsageStats slice.
type Queue struct {
buffer []UsageStats
Expand Down Expand Up @@ -92,19 +92,25 @@ func (queue *Queue) add(rawStat *ContainerStats) {
// % utilization can be calculated only when queue is non-empty.
lastStat := queue.buffer[queueLength-1]
timeSinceLastStat := float32(rawStat.timestamp.Sub(lastStat.Timestamp).Nanoseconds())
if timeSinceLastStat > 0 {
if timeSinceLastStat <= 0 {
// if we got a duplicate timestamp, set cpu percentage to the same value as the previous stat
seelog.Errorf("Received a docker stat object with duplicate timestamp")
stat.CPUUsagePerc = lastStat.CPUUsagePerc
} else {
cpuUsageSinceLastStat := float32(rawStat.cpuUsage - lastStat.cpuUsage)
stat.CPUUsagePerc = 100 * cpuUsageSinceLastStat / timeSinceLastStat
} else {
// Ignore the stat if the current timestamp is same as the last one. This
// results in the value being set as +infinity
// float32(1) / float32(0) = +Inf
seelog.Debugf("time since last stat is zero. Ignoring cpu stat")
}

if queue.maxSize == queueLength {
// Remove first element if queue is full.
queue.buffer = queue.buffer[1:queueLength]
}

if stat.CPUUsagePerc > MaxCPUUsagePerc {
// what in the world happened
seelog.Errorf("Calculated CPU usage percent (%.1f) is larger than backend maximum (%.1f). lastStatTS=%s lastStatCPUTime=%d thisStatTS=%s thisStatCPUTime=%d queueLength=%d",
stat.CPUUsagePerc, MaxCPUUsagePerc, lastStat.Timestamp.Format(time.RFC3339Nano), lastStat.cpuUsage, rawStat.timestamp.Format(time.RFC3339Nano), rawStat.cpuUsage, queueLength)
}
}

queue.buffer = append(queue.buffer, stat)
Expand Down Expand Up @@ -321,7 +327,7 @@ func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, erro
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("Need at least 2 data points in queue to calculate CW stats set")
return nil, fmt.Errorf("need at least 2 data points in queue to calculate CW stats set")
}

var min, max, sum float64
Expand All @@ -343,6 +349,11 @@ func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, erro
sum += thisStat
}

// don't emit metrics when sampleCount == 0
if sampleCount == 0 {
return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set")
}

return &ecstcs.CWStatsSet{
Max: &max,
Min: &min,
Expand All @@ -362,7 +373,7 @@ func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet,
queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("Need at least 2 data points in the queue to calculate int stats")
return nil, fmt.Errorf("need at least 2 data points in the queue to calculate int stats")
}

var min, max, sum uint64
Expand All @@ -384,6 +395,11 @@ func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet,
sampleCount++
}

// don't emit metrics when sampleCount == 0
if sampleCount == 0 {
return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set")
}

baseMin, overflowMin := getInt64WithOverflow(min)
baseMax, overflowMax := getInt64WithOverflow(max)
baseSum, overflowSum := getInt64WithOverflow(sum)
Expand Down
96 changes: 96 additions & 0 deletions agent/stats/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -512,3 +513,98 @@ func TestEnoughDatapointsInBuffer(t *testing.T) {
enoughDataPoints = queue.enoughDatapointsInBuffer()
assert.False(t, enoughDataPoints, "Queue is expected to not have enough data points right after RESET")
}

func TestCPUStatSetFailsWhenSampleCountIsZero(t *testing.T) {
timestamps := []time.Time{
parseNanoTime("2015-02-12T21:22:05.131117533Z"),
parseNanoTime("2015-02-12T21:22:05.131117533Z"),
}
cpuTimes := []uint64{
22400432,
116499979,
}
memoryUtilizationInBytes := []uint64{
3649536,
3649536,
}
// create a queue
queue := NewQueue(3)

for i, time := range timestamps {
queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time})
}

// if two cpu had identical timestamps,
// then there will not be enough valid cpu percentage stats to create
// a valid CpuStatsSet, and this function call should fail.
_, err := queue.GetCPUStatsSet()
require.Error(t, err)
}

func TestCPUStatsWithIdenticalTimestampsGetSameUsagePercent(t *testing.T) {
timestamps := []time.Time{
parseNanoTime("2015-02-12T21:22:05.131117000Z"),
parseNanoTime("2015-02-12T21:22:05.131117001Z"),
parseNanoTime("2015-02-12T21:22:05.131117002Z"),
parseNanoTime("2015-02-12T21:22:05.131117002Z"),
}
cpuTimes := []uint64{
0,
1,
3,
4,
}
memoryUtilizationInBytes := []uint64{
3649536,
3649536,
3649536,
3649536,
}
// create a queue
queue := NewQueue(4)

for i, time := range timestamps {
queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time})
}

// if there were three cpu metrics, and two had identical timestamps,
// then there will not be enough valid cpu percentage stats to create
// a valid CpuStatsSet, and this function call should fail.
statSet, err := queue.GetCPUStatsSet()
require.NoError(t, err)
require.Equal(t, float64(200), *statSet.Max)
require.Equal(t, float64(100), *statSet.Min)
require.Equal(t, int64(3), *statSet.SampleCount)
require.Equal(t, float64(500), *statSet.Sum)
}

func TestHugeCPUUsagePercentDoesntGetCapped(t *testing.T) {
timestamps := []time.Time{
parseNanoTime("2015-02-12T21:22:05.131117000Z"),
parseNanoTime("2015-02-12T21:22:05.131117001Z"),
parseNanoTime("2015-02-12T21:22:05.131117002Z"),
}
cpuTimes := []uint64{
0,
1,
300000000,
}
memoryUtilizationInBytes := []uint64{
3649536,
3649536,
3649536,
}
// create a queue
queue := NewQueue(4)

for i, time := range timestamps {
queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time})
}

statSet, err := queue.GetCPUStatsSet()
require.NoError(t, err)
require.Equal(t, float64(30000001024), *statSet.Max)
require.Equal(t, float64(100), *statSet.Min)
require.Equal(t, int64(2), *statSet.SampleCount)
require.Equal(t, float64(30000001124), *statSet.Sum)
}

0 comments on commit db9dad9

Please sign in to comment.