diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 44f1877f37f..9b0a642edfa 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -596,20 +596,27 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* // Container is not terminal. Get CPU stats set. cpuStatsSet, err := container.statsQueue.GetCPUStatsSet() if err != nil { - seelog.Warnf("Error getting cpu stats, err: %v, container: %v", err, dockerID) + seelog.Warnf("Error getting cpu stats, skipping container, err: %v, container: %v", err, dockerID) continue } memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet() if err != nil { - seelog.Warnf("Error getting memory stats, err: %v, container: %v", err, dockerID) + seelog.Warnf("Error getting memory stats, skipping container, err: %v, container: %v", err, dockerID) + continue + } + + storageStatsSet, err := container.statsQueue.GetStorageStatsSet() + if err != nil { + seelog.Warnf("Error getting storage stats, skipping container, err: %v, container: %v", err, dockerID) continue } containerMetric := &ecstcs.ContainerMetric{ - ContainerName: &container.containerMetadata.Name, - CpuStatsSet: cpuStatsSet, - MemoryStatsSet: memoryStatsSet, + ContainerName: &container.containerMetadata.Name, + CpuStatsSet: cpuStatsSet, + MemoryStatsSet: memoryStatsSet, + StorageStatsSet: storageStatsSet, } task, err := engine.resolver.ResolveTask(dockerID) @@ -629,12 +636,6 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* } } - storageStatsSet, err := container.statsQueue.GetStorageStatsSet() - if err != nil { - seelog.Warnf("Error getting storage stats, err: %v, container: %v", err, dockerID) - } - containerMetric.StorageStatsSet = storageStatsSet - containerMetrics = append(containerMetrics, containerMetric) } diff --git a/agent/stats/queue.go b/agent/stats/queue.go index 09c51006ae3..8134cd662e2 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -26,11 +26,11 @@ import ( const ( // BytesInMiB is the number of bytes in a MebiByte. - BytesInMiB = 1024 * 1024 + BytesInMiB = 1024 * 1024 + minimumQueueDatapoints = 2 + MaxCPUUsagePerc float32 = 1024 * 1024 ) -const minimumQueueDatapoints = 2 - // Queue abstracts a queue using UsageStats slice. type Queue struct { buffer []UsageStats @@ -92,19 +92,25 @@ func (queue *Queue) add(rawStat *ContainerStats) { // % utilization can be calculated only when queue is non-empty. lastStat := queue.buffer[queueLength-1] timeSinceLastStat := float32(rawStat.timestamp.Sub(lastStat.Timestamp).Nanoseconds()) - if timeSinceLastStat > 0 { + if timeSinceLastStat <= 0 { + // if we got a duplicate timestamp, set cpu percentage to the same value as the previous stat + seelog.Errorf("Received a docker stat object with duplicate timestamp") + stat.CPUUsagePerc = lastStat.CPUUsagePerc + } else { cpuUsageSinceLastStat := float32(rawStat.cpuUsage - lastStat.cpuUsage) stat.CPUUsagePerc = 100 * cpuUsageSinceLastStat / timeSinceLastStat - } else { - // Ignore the stat if the current timestamp is same as the last one. This - // results in the value being set as +infinity - // float32(1) / float32(0) = +Inf - seelog.Debugf("time since last stat is zero. Ignoring cpu stat") } + if queue.maxSize == queueLength { // Remove first element if queue is full. queue.buffer = queue.buffer[1:queueLength] } + + if stat.CPUUsagePerc > MaxCPUUsagePerc { + // what in the world happened + seelog.Errorf("Calculated CPU usage percent (%.1f) is larger than backend maximum (%.1f). lastStatTS=%s lastStatCPUTime=%d thisStatTS=%s thisStatCPUTime=%d queueLength=%d", + stat.CPUUsagePerc, MaxCPUUsagePerc, lastStat.Timestamp.Format(time.RFC3339Nano), lastStat.cpuUsage, rawStat.timestamp.Format(time.RFC3339Nano), rawStat.cpuUsage, queueLength) + } } queue.buffer = append(queue.buffer, stat) @@ -321,7 +327,7 @@ func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, erro queueLength := len(queue.buffer) if queueLength < 2 { // Need at least 2 data points to calculate this. - return nil, fmt.Errorf("Need at least 2 data points in queue to calculate CW stats set") + return nil, fmt.Errorf("need at least 2 data points in queue to calculate CW stats set") } var min, max, sum float64 @@ -343,6 +349,11 @@ func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, erro sum += thisStat } + // don't emit metrics when sampleCount == 0 + if sampleCount == 0 { + return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set") + } + return &ecstcs.CWStatsSet{ Max: &max, Min: &min, @@ -362,7 +373,7 @@ func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet, queueLength := len(queue.buffer) if queueLength < 2 { // Need at least 2 data points to calculate this. - return nil, fmt.Errorf("Need at least 2 data points in the queue to calculate int stats") + return nil, fmt.Errorf("need at least 2 data points in the queue to calculate int stats") } var min, max, sum uint64 @@ -384,6 +395,11 @@ func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet, sampleCount++ } + // don't emit metrics when sampleCount == 0 + if sampleCount == 0 { + return nil, fmt.Errorf("need at least 1 non-NaN data points in queue to calculate CW stats set") + } + baseMin, overflowMin := getInt64WithOverflow(min) baseMax, overflowMax := getInt64WithOverflow(max) baseSum, overflowSum := getInt64WithOverflow(sum) diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index 2b701157715..9b81b9671be 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -23,6 +23,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs" "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -512,3 +513,98 @@ func TestEnoughDatapointsInBuffer(t *testing.T) { enoughDataPoints = queue.enoughDatapointsInBuffer() assert.False(t, enoughDataPoints, "Queue is expected to not have enough data points right after RESET") } + +func TestCPUStatSetFailsWhenSampleCountIsZero(t *testing.T) { + timestamps := []time.Time{ + parseNanoTime("2015-02-12T21:22:05.131117533Z"), + parseNanoTime("2015-02-12T21:22:05.131117533Z"), + } + cpuTimes := []uint64{ + 22400432, + 116499979, + } + memoryUtilizationInBytes := []uint64{ + 3649536, + 3649536, + } + // create a queue + queue := NewQueue(3) + + for i, time := range timestamps { + queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time}) + } + + // if two cpu had identical timestamps, + // then there will not be enough valid cpu percentage stats to create + // a valid CpuStatsSet, and this function call should fail. + _, err := queue.GetCPUStatsSet() + require.Error(t, err) +} + +func TestCPUStatsWithIdenticalTimestampsGetSameUsagePercent(t *testing.T) { + timestamps := []time.Time{ + parseNanoTime("2015-02-12T21:22:05.131117000Z"), + parseNanoTime("2015-02-12T21:22:05.131117001Z"), + parseNanoTime("2015-02-12T21:22:05.131117002Z"), + parseNanoTime("2015-02-12T21:22:05.131117002Z"), + } + cpuTimes := []uint64{ + 0, + 1, + 3, + 4, + } + memoryUtilizationInBytes := []uint64{ + 3649536, + 3649536, + 3649536, + 3649536, + } + // create a queue + queue := NewQueue(4) + + for i, time := range timestamps { + queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time}) + } + + // if there were three cpu metrics, and two had identical timestamps, + // then there will not be enough valid cpu percentage stats to create + // a valid CpuStatsSet, and this function call should fail. + statSet, err := queue.GetCPUStatsSet() + require.NoError(t, err) + require.Equal(t, float64(200), *statSet.Max) + require.Equal(t, float64(100), *statSet.Min) + require.Equal(t, int64(3), *statSet.SampleCount) + require.Equal(t, float64(500), *statSet.Sum) +} + +func TestHugeCPUUsagePercentDoesntGetCapped(t *testing.T) { + timestamps := []time.Time{ + parseNanoTime("2015-02-12T21:22:05.131117000Z"), + parseNanoTime("2015-02-12T21:22:05.131117001Z"), + parseNanoTime("2015-02-12T21:22:05.131117002Z"), + } + cpuTimes := []uint64{ + 0, + 1, + 300000000, + } + memoryUtilizationInBytes := []uint64{ + 3649536, + 3649536, + 3649536, + } + // create a queue + queue := NewQueue(4) + + for i, time := range timestamps { + queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time}) + } + + statSet, err := queue.GetCPUStatsSet() + require.NoError(t, err) + require.Equal(t, float64(30000001024), *statSet.Max) + require.Equal(t, float64(100), *statSet.Min) + require.Equal(t, int64(2), *statSet.SampleCount) + require.Equal(t, float64(30000001124), *statSet.Sum) +}