Skip to content

Commit

Permalink
add overflow field for int64
Browse files Browse the repository at this point in the history
Updating to go 1.12

Make CGroups CPU period configurable

Signed-off-by: Mohamad Arab <boynux@gmail.com>

taskresources: set terminal reason msg for all errors (aws#2004)

* taskresources: set terminal reason msg for all errors

closes aws#1631

return a default terminal reason

remove terminalReason code for cgroups based on PR review

Test that volume errors are propogated

* fix cleanup error test and add string check

Fix telemetry test for ws2019

On WS2019 functional test image mcr.microsoft.com/windows/servercore:ltsc2019,
the behavior to handle path changed from ws2016 image, it needs to set entrypoint with
absolute path or have entrypoint set to powershell to run executable without absolute path.
Also removed unused binary in windows telemetry test.

add storageReadBytes storageWriteBytes stats
  • Loading branch information
fierlion committed Apr 30, 2019
1 parent f213fb0 commit 799142b
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 59 deletions.
4 changes: 2 additions & 2 deletions agent/stats/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func validateEmptyTaskHealthMetrics(t *testing.T, engine *DockerStatsEngine) {

func createFakeContainerStats() []*ContainerStats {
return []*ContainerStats{
{22400432, 1839104, parseNanoTime("2015-02-12T21:22:05.131117533Z")},
{116499979, 3649536, parseNanoTime("2015-02-12T21:22:05.232291187Z")},
{22400432, 1839104, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.131117533Z")},
{116499979, 3649536, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.232291187Z")},
}
}

Expand Down
6 changes: 3 additions & 3 deletions agent/stats/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) {
ts1 := parseNanoTime("2015-02-12T21:22:05.131117533Z")
ts2 := parseNanoTime("2015-02-12T21:22:05.232291187Z")
containerStats := []*ContainerStats{
{22400432, 1839104, ts1},
{116499979, 3649536, ts2},
{22400432, 1839104, uint64(0), uint64(0), ts1},
{116499979, 3649536, uint64(0), uint64(0), ts2},
}
dockerStats := []*types.StatsJSON{{}, {},}
dockerStats := []*types.StatsJSON{{}, {}}
dockerStats[0].Read = ts1
dockerStats[1].Read = ts2
containers, _ := engine.tasksToContainers["t1"]
Expand Down
94 changes: 87 additions & 7 deletions agent/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (queue *Queue) add(rawStat *ContainerStats) {
stat := UsageStats{
CPUUsagePerc: float32(nan32()),
MemoryUsageInMegs: uint32(rawStat.memoryUsage / BytesInMiB),
StorageReadBytes: rawStat.storageReadBytes,
StorageWriteBytes: rawStat.storageWriteBytes,
Timestamp: rawStat.timestamp,
cpuUsage: rawStat.cpuUsage,
}
Expand Down Expand Up @@ -125,6 +127,16 @@ func (queue *Queue) GetMemoryStatsSet() (*ecstcs.CWStatsSet, error) {
return queue.getCWStatsSet(getMemoryUsagePerc)
}

// GetStorageReadStatsSet gets the stats set for aggregate storage read
func (queue *Queue) GetStorageReadStatsSet() (*ecstcs.ULongStatsSet, error) {
return queue.getULongStatsSet(getStorageReadBytes)
}

// GetStorageWriteStatsSet gets the stats set for aggregate storage written
func (queue *Queue) GetStorageWriteStatsSet() (*ecstcs.ULongStatsSet, error) {
return queue.getULongStatsSet(getStorageWriteBytes)
}

// GetRawUsageStats gets the array of most recent raw UsageStats, in descending
// order of timestamps.
func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) {
Expand All @@ -147,6 +159,8 @@ func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) {
usageStats[i] = UsageStats{
CPUUsagePerc: rawUsageStat.CPUUsagePerc,
MemoryUsageInMegs: rawUsageStat.MemoryUsageInMegs,
StorageReadBytes: rawUsageStat.StorageReadBytes,
StorageWriteBytes: rawUsageStat.StorageWriteBytes,
Timestamp: rawUsageStat.Timestamp,
}
}
Expand All @@ -162,7 +176,24 @@ func getMemoryUsagePerc(s *UsageStats) float64 {
return float64(s.MemoryUsageInMegs)
}

type getUsageFunc func(*UsageStats) float64
func getStorageReadBytes(s *UsageStats) uint64 {
return s.StorageReadBytes
}

func getStorageWriteBytes(s *UsageStats) uint64 {
return s.StorageWriteBytes
}

func getOverflow(uintStat uint64) (int64, int64) {
if uintStat > math.MaxInt64 {
overflow := int64(uintStat % uint64(math.MaxInt64))
return math.MaxInt64, overflow
}
return int64(uintStat), int64(0)
}

type getUsageFloatFunc func(*UsageStats) float64
type getUsageIntFunc func(*UsageStats) uint64

func (queue *Queue) resetThresholdElapsed(timeout time.Duration) bool {
queue.lock.RLock()
Expand All @@ -179,7 +210,8 @@ func (queue *Queue) enoughDatapointsInBuffer() bool {

// getCWStatsSet gets the stats set for either CPU or Memory based on the
// function pointer.
func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) {
// stats are float64 type
func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()

Expand All @@ -197,15 +229,15 @@ func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) {
sampleCount = 0

for _, stat := range queue.buffer {
perc := f(&stat)
if math.IsNaN(perc) {
thisStat := f(&stat)
if math.IsNaN(thisStat) {
continue
}

min = math.Min(min, perc)
max = math.Max(max, perc)
min = math.Min(min, thisStat)
max = math.Max(max, thisStat)
sampleCount++
sum += perc
sum += thisStat
}

return &ecstcs.CWStatsSet{
Expand All @@ -215,3 +247,51 @@ func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) {
Sum: &sum,
}, nil
}

// getULongStatsSet gets the stats set for the specified raw stat type
// stats come from docker as uint64 type, and by neccesity are packed into int64 type
// where there is overflow (math.MaxInt64 + 1 or greater)
// we capture the excess in optional overflow fields.
func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet, error) {
queue.lock.Lock()
defer queue.lock.Unlock()

queueLength := len(queue.buffer)
if queueLength < 2 {
// Need at least 2 data points to calculate this.
return nil, fmt.Errorf("No data in the queue")
}

var min, max, sum uint64
var sampleCount int64
min = math.MaxUint64
max = 0
sum = 0
sampleCount = 0

for _, stat := range queue.buffer {
thisStat := f(&stat)
if thisStat < min {
min = thisStat
}
if thisStat > max {
max = thisStat
}
sum += thisStat
sampleCount++
}

baseMin, overflowMin := getOverflow(min)
baseMax, overflowMax := getOverflow(max)
baseSum, overflowSum := getOverflow(sum)

return &ecstcs.ULongStatsSet{
Max: &baseMax,
OverflowMax: &overflowMax,
Min: &baseMin,
OverflowMin: &overflowMin,
SampleCount: &sampleCount,
Sum: &baseSum,
OverflowSum: &overflowSum,
}, nil
}
132 changes: 118 additions & 14 deletions agent/stats/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import (

const (
predictableHighMemoryUtilizationInBytes = 7377772544

// predictableHighMemoryUtilizationInMiB is the expected Memory usage in MiB for
// the "predictableHighMemoryUtilizationInBytes" value (7377772544 / (1024 * 1024))
predictableHighMemoryUtilizationInMiB = 7035
// the "predictableInt64Overflow" requires 3 metrics to guarantee overflow
// note math.MaxInt64 is odd, so integer division will trim off 1
predictableInt64Overflow = math.MaxInt64 / int64(2)
predictableUint64 = 200000000
)

func getTimestamps() []time.Time {
Expand Down Expand Up @@ -60,7 +63,7 @@ func getTimestamps() []time.Time {

}

func getCPUTimes() []uint64 {
func getUintStats() []uint64 {
return []uint64{
22400432,
116499979,
Expand Down Expand Up @@ -122,26 +125,70 @@ func getPredictableHighMemoryUtilizationInBytes(size int) []uint64 {
return memBytes
}

func createQueue(size int, predictableHighMemoryUtilization bool) *Queue {
func getPredictableInt64Overflow(size int) []uint64 {
var uintStats []uint64
for i := 0; i < size; i++ {
uintStats = append(uintStats, uint64(predictableInt64Overflow))
}
return uintStats

}

func createQueue(size int, predictableHighUtilization bool) *Queue {
timestamps := getTimestamps()
cpuTimes := getCPUTimes()
cpuTimes := getUintStats()
var memoryUtilizationInBytes []uint64
if predictableHighMemoryUtilization {
var uintStats []uint64
if predictableHighUtilization {
memoryUtilizationInBytes = getPredictableHighMemoryUtilizationInBytes(len(cpuTimes))
uintStats = getPredictableInt64Overflow(len(cpuTimes))
} else {
memoryUtilizationInBytes = getRandomMemoryUtilizationInBytes()
uintStats = getUintStats()
}
queue := NewQueue(size)
for i, time := range timestamps {
queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time})
queue.add(&ContainerStats{
cpuUsage: cpuTimes[i],
memoryUsage: memoryUtilizationInBytes[i],
storageReadBytes: uintStats[i],
storageWriteBytes: uintStats[i],
timestamp: time})
}
return queue
}

func TestQueueUintStats(t *testing.T) {
queueLength := 3
queue := createQueue(queueLength, true)
buf := queue.buffer
if len(buf) != queueLength {
t.Errorf("Buffer size is incorrect. Expected: %d, Got: %d", queueLength, len(buf))
}

storageReadStatsSet, err := queue.GetStorageReadStatsSet()

if err != nil {
t.Error("Error getting storage read stats set:", err)
}
// assuming min is initialized to math.MaxUint64 then truncated
// min/max should be the same as predictableInt64Overflow
// their overflow should be 0
assert.Equal(t, *storageReadStatsSet.Min, predictableInt64Overflow)
assert.Equal(t, *storageReadStatsSet.OverflowMin, int64(0))
assert.Equal(t, *storageReadStatsSet.Max, predictableInt64Overflow)
assert.Equal(t, *storageReadStatsSet.OverflowMax, int64(0))
// the sum of three predictableInt64Overflow should be equal to MaxInt64
// with an overflow of predictableInt64Overflow - 1
// (see the definition of predictableInt64Overflow for why -1)
assert.Equal(t, *storageReadStatsSet.Sum, int64(math.MaxInt64))
assert.Equal(t, *storageReadStatsSet.OverflowSum, predictableInt64Overflow-1)
}

func TestQueueAddRemove(t *testing.T) {
timestamps := getTimestamps()
queueLength := 5
// Set predictableHighMemoryUtilization to false, expect random values when aggregated.
// Set predictableHighUtilization to false, expect random values when aggregated.
queue := createQueue(queueLength, false)
buf := queue.buffer
if len(buf) != queueLength {
Expand All @@ -157,7 +204,7 @@ func TestQueueAddRemove(t *testing.T) {

cpuStatsSet, err := queue.GetCPUStatsSet()
if err != nil {
t.Error("Error gettting cpu stats set:", err)
t.Error("Error getting cpu stats set:", err)
}
if *cpuStatsSet.Min == math.MaxFloat64 || math.IsNaN(*cpuStatsSet.Min) {
t.Error("Min value incorrectly set: ", *cpuStatsSet.Min)
Expand All @@ -174,12 +221,12 @@ func TestQueueAddRemove(t *testing.T) {

memStatsSet, err := queue.GetMemoryStatsSet()
if err != nil {
t.Error("Error gettting memory stats set:", err)
t.Error("Error getting memory stats set:", err)
}
if *memStatsSet.Min == float64(-math.MaxFloat32) {
if *memStatsSet.Min == math.MaxFloat64 || math.IsNaN(*memStatsSet.Min) {
t.Error("Min value incorrectly set: ", *memStatsSet.Min)
}
if *memStatsSet.Max == 0 {
if *memStatsSet.Max == -math.MaxFloat64 || math.IsNaN(*memStatsSet.Max) {
t.Error("Max value incorrectly set: ", *memStatsSet.Max)
}
if *memStatsSet.SampleCount != int64(queueLength) {
Expand All @@ -189,9 +236,45 @@ func TestQueueAddRemove(t *testing.T) {
t.Error("Sum value incorrectly set: ", *memStatsSet.Sum)
}

storageReadStatsSet, err := queue.GetStorageReadStatsSet()
if err != nil {
t.Error("Error getting storage read stats set:", err)
}
// assuming min is initialized to math.MaxUint64 then truncated
if *storageReadStatsSet.Min == int64(math.MaxInt64) &&
*storageReadStatsSet.OverflowMin == int64(math.MaxInt64) {
t.Error("Min value incorrectly set: ", *storageReadStatsSet.Min)
}
if *storageReadStatsSet.Max == 0 {
t.Error("Max value incorrectly set: ", *storageReadStatsSet.Max)
}
if *storageReadStatsSet.SampleCount != int64(queueLength) {
t.Error("Expected samplecount: ", queueLength, " got: ", *storageReadStatsSet.SampleCount)
}
if *storageReadStatsSet.Sum == 0 {
t.Error("Sum value incorrectly set: ", *storageReadStatsSet.Sum)
}

storageWriteStatsSet, err := queue.GetStorageWriteStatsSet()
if err != nil {
t.Error("Error getting storage read stats set:", err)
}
if *storageWriteStatsSet.Min == int64(math.MaxInt64) {
t.Error("Min value incorrectly set: ", *storageWriteStatsSet.Min)
}
if *storageWriteStatsSet.Max == 0 {
t.Error("Max value incorrectly set: ", *storageWriteStatsSet.Max)
}
if *storageWriteStatsSet.SampleCount != int64(queueLength) {
t.Error("Expected samplecount: ", queueLength, " got: ", *storageWriteStatsSet.SampleCount)
}
if *storageWriteStatsSet.Sum == 0 {
t.Error("Sum value incorrectly set: ", *storageWriteStatsSet.Sum)
}

rawUsageStats, err := queue.GetRawUsageStats(2 * queueLength)
if err != nil {
t.Error("Error gettting raw usage stats: ", err)
t.Error("Error getting raw usage stats: ", err)
}

if len(rawUsageStats) != queueLength {
Expand All @@ -218,7 +301,7 @@ func TestQueueAddRemove(t *testing.T) {
func TestQueueAddPredictableHighMemoryUtilization(t *testing.T) {
timestamps := getTimestamps()
queueLength := 5
// Set predictableHighMemoryUtilization to true
// Set predictableHighUtilization to true
// This lets us compare the computed values against pre-computed expected values
queue := createQueue(queueLength, true)
buf := queue.buffer
Expand All @@ -235,7 +318,7 @@ func TestQueueAddPredictableHighMemoryUtilization(t *testing.T) {

memStatsSet, err := queue.GetMemoryStatsSet()
if err != nil {
t.Error("Error gettting memory stats set:", err)
t.Error("Error getting memory stats set:", err)
}

// Test if both min and max for memory utilization are set to 7035MiB
Expand Down Expand Up @@ -312,6 +395,27 @@ func TestCpuStatsSetNotSetToInfinity(t *testing.T) {
}
}

func TestUintOverflow(t *testing.T) {
var maxInt, overInt int64
var underUint, overUint uint64
underUint = uint64(10)
maxInt = math.MaxInt64
overInt = int64(20)
overUint = uint64(maxInt) + uint64(overInt)

baseUnderUint, overflowUnderUint := getOverflow(underUint)
baseMaxInt, overflowMaxInt := getOverflow(uint64(maxInt))
baseOverUint, overflowOverUint := getOverflow(overUint)

assert.Equal(t, baseUnderUint, int64(10))
assert.Equal(t, overflowUnderUint, int64(0))
assert.Equal(t, baseMaxInt, int64(math.MaxInt64))
assert.Equal(t, overflowMaxInt, int64(0))
assert.Equal(t, baseOverUint, int64(math.MaxInt64))
assert.Equal(t, overflowOverUint, int64(20))

}

func TestResetThresholdElapsed(t *testing.T) {
// create a queue
queueLength := 3
Expand Down
Loading

0 comments on commit 799142b

Please sign in to comment.