-
Notifications
You must be signed in to change notification settings - Fork 619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added ability to poll for container stats instead of constant stream #1646
Changes from 8 commits
062d013
f0270e7
45e9e1b
9902dd9
5e8ec5f
87e896d
d079426
0abc2b7
360baba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,10 @@ const ( | |
// clean up task's containers. | ||
DefaultTaskCleanupWaitDuration = 3 * time.Hour | ||
|
||
// DefaultPollingMetricsWaitDuration specifies the default value for polling metrics wait duration | ||
// This is only used when PollMetrics is set to true | ||
DefaultPollingMetricsWaitDuration = 15 * time.Second | ||
|
||
// defaultDockerStopTimeout specifies the value for container stop timeout duration | ||
defaultDockerStopTimeout = 30 * time.Second | ||
|
||
|
@@ -83,6 +87,14 @@ const ( | |
// 'stuck' in the pull / unpack step. Very small values are unsafe and lead to high failure rate. | ||
minimumImagePullInactivityTimeout = 1 * time.Minute | ||
|
||
// minimumPollingMetricsWaitDuration specifies the minimum duration to wait before polling for new stats | ||
// from docker. This is only used when PollMetrics is set to true | ||
minimumPollingMetricsWaitDuration = 1 * time.Second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just curious, how did we come up with the min and max values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. customer did, there are some testing/benchmarking info in the original PR |
||
|
||
// maximumPollingMetricsWaitDuration specifies the maximum duration to wait before polling for new stats | ||
// from docker. This is only used when PollMetrics is set to true | ||
maximumPollingMetricsWaitDuration = 20 * time.Second | ||
|
||
// minimumDockerStopTimeout specifies the minimum value for docker StopContainer API | ||
minimumDockerStopTimeout = 1 * time.Second | ||
|
||
|
@@ -303,11 +315,28 @@ func (cfg *Config) validateAndOverrideBounds() error { | |
cfg.TaskMetadataBurstRate = DefaultTaskMetadataBurstRate | ||
} | ||
|
||
// check the PollMetrics specific configurations | ||
cfg.pollMetricsOverrides() | ||
|
||
cfg.platformOverrides() | ||
|
||
return nil | ||
} | ||
|
||
func (cfg *Config) pollMetricsOverrides() { | ||
if cfg.PollMetrics { | ||
if cfg.PollingMetricsWaitDuration < minimumPollingMetricsWaitDuration { | ||
seelog.Warnf("Invalid value for polling metrics wait duration, will be overridden with the default value: %s. Parsed value: %v, minimum value: %v.", DefaultPollingMetricsWaitDuration.String(), cfg.PollingMetricsWaitDuration, minimumPollingMetricsWaitDuration) | ||
cfg.PollingMetricsWaitDuration = DefaultPollingMetricsWaitDuration | ||
} | ||
|
||
if cfg.PollingMetricsWaitDuration > maximumPollingMetricsWaitDuration { | ||
seelog.Warnf("Invalid value for polling metrics wait duration, will be overridden with the default value: %s. Parsed value: %v, maximum value: %v.", DefaultPollingMetricsWaitDuration.String(), cfg.PollingMetricsWaitDuration, maximumPollingMetricsWaitDuration) | ||
cfg.PollingMetricsWaitDuration = DefaultPollingMetricsWaitDuration | ||
} | ||
} | ||
} | ||
|
||
// checkMissingAndDeprecated checks all zero-valued fields for tags of the form | ||
// missing:STRING and acts based on that string. Current options are: fatal, | ||
// warn. Fatal will result in an error being returned, warn will result in a | ||
|
@@ -457,6 +486,8 @@ func environmentConfig() (Config, error) { | |
UpdatesEnabled: utils.ParseBool(os.Getenv("ECS_UPDATES_ENABLED"), false), | ||
UpdateDownloadDir: os.Getenv("ECS_UPDATE_DOWNLOAD_DIR"), | ||
DisableMetrics: utils.ParseBool(os.Getenv("ECS_DISABLE_METRICS"), false), | ||
PollMetrics: utils.ParseBool(os.Getenv("ECS_POLL_METRICS"), false), | ||
PollingMetricsWaitDuration: parseEnvVariableDuration("ECS_POLLING_METRICS_WAIT_DURATION"), | ||
ReservedMemory: parseEnvVariableUint16("ECS_RESERVED_MEMORY"), | ||
AvailableLoggingDrivers: parseAvailableLoggingDrivers(), | ||
PrivilegedDisabled: utils.ParseBool(os.Getenv("ECS_DISABLE_PRIVILEGED"), false), | ||
|
@@ -514,6 +545,8 @@ func (cfg *Config) String() string { | |
"AuthType: %v, "+ | ||
"UpdatesEnabled: %v, "+ | ||
"DisableMetrics: %v, "+ | ||
"PollMetrics: %v, "+ | ||
"PollingMetricsWaitDuration: %v, "+ | ||
"ReservedMem: %v, "+ | ||
"TaskCleanupWaitDuration: %v, "+ | ||
"DockerStopTimeout: %v, "+ | ||
|
@@ -527,6 +560,8 @@ func (cfg *Config) String() string { | |
cfg.EngineAuthType, | ||
cfg.UpdatesEnabled, | ||
cfg.DisableMetrics, | ||
cfg.PollMetrics, | ||
cfg.PollingMetricsWaitDuration, | ||
cfg.ReservedMemory, | ||
cfg.TaskCleanupWaitDuration, | ||
cfg.DockerStopTimeout, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,8 @@ func TestEnvironmentConfig(t *testing.T) { | |
defer setTestEnv("ECS_ENABLE_TASK_ENI", "true")() | ||
defer setTestEnv("ECS_TASK_METADATA_RPS_LIMIT", "1000,1100")() | ||
defer setTestEnv("ECS_SHARED_VOLUME_MATCH_FULL_CONFIG", "true")() | ||
defer setTestEnv("ECS_POLL_METRICS", "true")() | ||
defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "10s")() | ||
additionalLocalRoutesJSON := `["1.2.3.4/22","5.6.7.8/32"]` | ||
setTestEnv("ECS_AWSVPC_ADDITIONAL_LOCAL_ROUTES", additionalLocalRoutesJSON) | ||
setTestEnv("ECS_ENABLE_CONTAINER_METADATA", "true") | ||
|
@@ -134,6 +136,9 @@ func TestEnvironmentConfig(t *testing.T) { | |
assert.True(t, conf.TaskIAMRoleEnabled, "Wrong value for TaskIAMRoleEnabled") | ||
assert.True(t, conf.TaskIAMRoleEnabledForNetworkHost, "Wrong value for TaskIAMRoleEnabledForNetworkHost") | ||
assert.True(t, conf.ImageCleanupDisabled, "Wrong value for ImageCleanupDisabled") | ||
assert.True(t, conf.PollMetrics, "Wrong value for PollMetrics") | ||
expectedDurationPollingMetricsWaitDuration, _ := time.ParseDuration("10s") | ||
assert.Equal(t, expectedDurationPollingMetricsWaitDuration, conf.PollingMetricsWaitDuration) | ||
|
||
assert.True(t, conf.TaskENIEnabled, "Wrong value for TaskNetwork") | ||
assert.Equal(t, (30 * time.Minute), conf.MinimumImageDeletionAge) | ||
|
@@ -212,6 +217,12 @@ func TestInvalidLoggingDriver(t *testing.T) { | |
assert.Error(t, conf.validateAndOverrideBounds(), "Should be error with invalid-logging-driver") | ||
} | ||
|
||
func TestDefaultPollMetricsWithoutECSDataDir(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's what it means (see TestDefaultCheckpointWithoutECSDataDir below), if we do not have config file it should take default value. |
||
conf, err := environmentConfig() | ||
assert.NoError(t, err) | ||
assert.False(t, conf.PollMetrics) | ||
} | ||
|
||
func TestDefaultCheckpointWithoutECSDataDir(t *testing.T) { | ||
conf, err := environmentConfig() | ||
assert.NoError(t, err) | ||
|
@@ -321,6 +332,24 @@ func TestInvalidValueDockerPullInactivityTimeout(t *testing.T) { | |
assert.Equal(t, conf.ImagePullInactivityTimeout, defaultImagePullInactivityTimeout, "Wrong value for ImagePullInactivityTimeout") | ||
} | ||
|
||
func TestInvalidValueMaxPollingMetricsWaitDuration(t *testing.T) { | ||
defer setTestRegion()() | ||
defer setTestEnv("ECS_POLL_METRICS", "true")() | ||
defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "21s")() | ||
conf, err := NewConfig(ec2.NewBlackholeEC2MetadataClient()) | ||
assert.NoError(t, err) | ||
assert.Equal(t, conf.PollingMetricsWaitDuration, DefaultPollingMetricsWaitDuration, "Wrong value for PollingMetricsWaitDuration") | ||
} | ||
|
||
func TestInvalidValueMinPollingMetricsWaitDuration(t *testing.T) { | ||
defer setTestRegion()() | ||
defer setTestEnv("ECS_POLL_METRICS", "true")() | ||
defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "0s")() | ||
conf, err := NewConfig(ec2.NewBlackholeEC2MetadataClient()) | ||
assert.NoError(t, err) | ||
assert.Equal(t, conf.PollingMetricsWaitDuration, DefaultPollingMetricsWaitDuration, "Wrong value for PollingMetricsWaitDuration") | ||
} | ||
|
||
func TestInvalidFormatParseEnvVariableUint16(t *testing.T) { | ||
defer setTestRegion()() | ||
setTestEnv("FOO", "foo") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"io" | ||
"math/rand" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
|
@@ -1263,41 +1264,76 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou | |
statsChnl := make(chan *types.Stats) | ||
var resp types.ContainerStats | ||
|
||
go func() { | ||
defer cancelRequest() | ||
defer close(statsChnl) | ||
// ContainerStats outputs a io.ReadCloser and an OSType | ||
stream := true | ||
resp, err = client.ContainerStats(subCtx, id, stream) | ||
if err != nil { | ||
seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) | ||
return | ||
} | ||
|
||
// handle inactivity timeout | ||
var canceled uint32 | ||
var ch chan<- struct{} | ||
resp.Body, ch = handleInactivityTimeout(resp.Body, inactivityTimeout, cancelRequest, &canceled) | ||
defer resp.Body.Close() | ||
defer close(ch) | ||
|
||
// Returns a *Decoder and takes in a readCloser | ||
decoder := json.NewDecoder(resp.Body) | ||
data := new(types.Stats) | ||
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { | ||
if !dg.config.PollMetrics { | ||
go func() { | ||
defer cancelRequest() | ||
defer close(statsChnl) | ||
// ContainerStats outputs an io.ReadCloser and an OSType | ||
stream := true | ||
resp, err = client.ContainerStats(subCtx, id, stream) | ||
if err != nil { | ||
seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) | ||
seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) | ||
return | ||
} | ||
if atomic.LoadUint32(&canceled) != 0 { | ||
seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id) | ||
return | ||
|
||
// handle inactivity timeout | ||
var canceled uint32 | ||
var ch chan<- struct{} | ||
resp.Body, ch = handleInactivityTimeout(resp.Body, inactivityTimeout, cancelRequest, &canceled) | ||
defer resp.Body.Close() | ||
defer close(ch) | ||
|
||
// Returns a *Decoder and takes in a readCloser | ||
decoder := json.NewDecoder(resp.Body) | ||
data := new(types.Stats) | ||
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { | ||
if err != nil { | ||
seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) | ||
return | ||
} | ||
if atomic.LoadUint32(&canceled) != 0 { | ||
seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id) | ||
return | ||
} | ||
|
||
statsChnl <- data | ||
data = new(types.Stats) | ||
} | ||
}() | ||
} else { | ||
seelog.Infof("DockerGoClient: Starting to Poll for metrics for container %s", id) | ||
//Sleep for a random period time up to the polling interval. This will help make containers ask for stats at different times | ||
time.Sleep(time.Second * time.Duration(rand.Intn(int(dg.config.PollingMetricsWaitDuration.Seconds())))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good intuition but I don't think this is needed. The only time all containers of a Task would poll at the same time is if Agent stops and starts while the Task is running. Otherwise, assuming go routines run at approximately the same speed (or even randomly), containers are added to the stats engine iteratively so they will already be staggered to some degree. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is value in adding extra jitter here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean this could end up sleeping for the whole wait duration, skipping a cycle of poll metrics? not sure if this could lead to any unexpected behavior There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at this point the polling cycle has not started, this is adding jitter before starting it. |
||
|
||
statPollTicker := time.NewTicker(dg.config.PollingMetricsWaitDuration) | ||
go func() { | ||
defer cancelRequest() | ||
defer close(statsChnl) | ||
defer statPollTicker.Stop() | ||
|
||
for range statPollTicker.C { | ||
// ContainerStats outputs an io.ReadCloser and an OSType | ||
stream := false | ||
resp, err = client.ContainerStats(subCtx, id, stream) | ||
if err != nil { | ||
seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) | ||
return | ||
} | ||
|
||
statsChnl <- data | ||
data = new(types.Stats) | ||
} | ||
}() | ||
// Returns a *Decoder and takes in a readCloser | ||
decoder := json.NewDecoder(resp.Body) | ||
data := new(types.Stats) | ||
err := decoder.Decode(data) | ||
if err != nil { | ||
seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) | ||
return | ||
} | ||
|
||
statsChnl <- data | ||
data = new(types.Stats) | ||
} | ||
}() | ||
} | ||
|
||
return statsChnl, nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -539,6 +539,82 @@ func telemetryTest(t *testing.T, taskDefinition string) { | |
assert.NoError(t, err, "Task stopped, verify metrics for memory utilization failed") | ||
} | ||
|
||
func telemetryTestWithStatsPolling(t *testing.T, taskDefinition string) { | ||
// Try to use a new cluster for this test, ensure no other task metrics for this cluster | ||
newClusterName := "ecstest-telemetry-polling-" + uuid.New() | ||
_, err := ECS.CreateCluster(&ecsapi.CreateClusterInput{ | ||
ClusterName: aws.String(newClusterName), | ||
}) | ||
require.NoError(t, err, "Failed to create cluster") | ||
defer DeleteCluster(t, newClusterName) | ||
|
||
// additional config fields to use polling instead of stream | ||
agentOptions := AgentOptions{ | ||
ExtraEnvironment: map[string]string{ | ||
"ECS_CLUSTER": newClusterName, | ||
"ECS_POLL_METRICS": "true", | ||
"ECS_POLLING_METRICS_WAIT_DURATION": "15s", | ||
}, | ||
} | ||
agent := RunAgent(t, &agentOptions) | ||
defer agent.Cleanup() | ||
|
||
params := &cloudwatch.GetMetricStatisticsInput{ | ||
MetricName: aws.String("CPUUtilization"), | ||
Namespace: aws.String("AWS/ECS"), | ||
Period: aws.Int64(60), | ||
Statistics: []*string{ | ||
aws.String("Average"), | ||
aws.String("SampleCount"), | ||
}, | ||
Dimensions: []*cloudwatch.Dimension{ | ||
{ | ||
Name: aws.String("ClusterName"), | ||
Value: aws.String(newClusterName), | ||
}, | ||
}, | ||
} | ||
params.StartTime = aws.Time(RoundTimeUp(time.Now(), time.Minute).UTC()) | ||
params.EndTime = aws.Time((*params.StartTime).Add(waitMetricsInCloudwatchDuration).UTC()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think the params here are used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! I shortened this test's checks and didn't remove the set up part. Removed. |
||
|
||
cwclient := cloudwatch.New(session.New(), aws.NewConfig().WithRegion(*ECS.Config.Region)) | ||
cpuNum := runtime.NumCPU() | ||
|
||
tdOverrides := make(map[string]string) | ||
// Set the container cpu percentage 25% | ||
tdOverrides["$$$$CPUSHARE$$$$"] = strconv.Itoa(int(float64(cpuNum*cpuSharesPerCore) * 0.25)) | ||
|
||
testTask, err := agent.StartTaskWithTaskDefinitionOverrides(t, taskDefinition, tdOverrides) | ||
require.NoError(t, err, "Failed to start telemetry task") | ||
// Wait for the task to run and the agent to send back metrics | ||
err = testTask.WaitRunning(waitTaskStateChangeDuration) | ||
require.NoError(t, err, "Error wait telemetry task running") | ||
|
||
time.Sleep(waitMetricsInCloudwatchDuration) | ||
params.EndTime = aws.Time(RoundTimeUp(time.Now(), time.Minute).UTC()) | ||
params.StartTime = aws.Time((*params.EndTime).Add(-waitMetricsInCloudwatchDuration).UTC()) | ||
params.MetricName = aws.String("CPUUtilization") | ||
metrics, err := VerifyMetrics(cwclient, params, false) | ||
assert.NoError(t, err, "Task is running, verify metrics for CPU utilization failed") | ||
// Also verify the cpu usage is around 25% +/- 5% | ||
assert.InDelta(t, 25, *metrics.Average, 5) | ||
|
||
params.MetricName = aws.String("MemoryUtilization") | ||
metrics, err = VerifyMetrics(cwclient, params, false) | ||
assert.NoError(t, err, "Task is running, verify metrics for memory utilization failed") | ||
memInfo, err := system.ReadMemInfo() | ||
require.NoError(t, err, "Acquiring system info failed") | ||
totalMemory := memInfo.MemTotal / bytePerMegabyte | ||
// Verify the memory usage is around 1024/totalMemory +/- 5% | ||
assert.InDelta(t, float32(1024*100)/float32(totalMemory), *metrics.Average, 5) | ||
|
||
err = testTask.Stop() | ||
require.NoError(t, err, "Failed to stop the telemetry task") | ||
|
||
err = testTask.WaitStopped(waitTaskStateChangeDuration) | ||
require.NoError(t, err, "Waiting for task stop failed") | ||
} | ||
|
||
// containerHealthMetricsTest tests the container health metrics based on the task definition | ||
func containerHealthMetricsTest(t *testing.T, taskDefinition string, overrides map[string]string) { | ||
agent := RunAgent(t, nil) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an Agent-specific decision to be able to turn off metrics completely? (How does this affect TACS if two Agents in a cluster poll at different frequencies?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that way, if this is turned off we fall back to the streaming