diff --git a/CHANGELOG.md b/CHANGELOG.md index 164dc64fed3..66b0887a79e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.24.0-dev +* Feature - Configurable poll duration for container stats [@jcbowman](https://github.com/jcbowman) [#1646](https://github.com/aws/amazon-ecs-agent/pull/1646) + ## 1.23.0 * Feature - Add support for ECS Secrets integrating with AWS Secrets Manager [#1713](https://github.com/aws/amazon-ecs-agent/pull/1713) * Enhancement - Add availability zone to task metadata endpoint [#1674](https://github.com/aws/amazon-ecs-agent/pull/1674) diff --git a/Makefile b/Makefile index caf077ff703..178354f615d 100644 --- a/Makefile +++ b/Makefile @@ -193,7 +193,7 @@ test-in-docker: docker run --net=none -v "$(PWD):/go/src/github.com/aws/amazon-ecs-agent" --privileged "amazon/amazon-ecs-agent-test:make" run-functional-tests: testnnp test-registry ecr-execution-role-image telemetry-test-image - . ./scripts/shared_env && go test -tags functional -timeout=32m -v ./agent/functional_tests/... + . ./scripts/shared_env && go test -tags functional -timeout=34m -v ./agent/functional_tests/... .PHONY: build-image-for-ecr ecr-execution-role-image-for-upload upload-images replicate-images diff --git a/README.md b/README.md index a3fe6f72225..c7abe6f0244 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,8 @@ additional details on each available environment variable. | `ECS_UPDATES_ENABLED` | <true | false> | Whether to exit for an updater to apply updates when requested. | false | false | | `ECS_UPDATE_DOWNLOAD_DIR` | /cache | Where to place update tarballs within the container. | | | | `ECS_DISABLE_METRICS` | <true | false> | Whether to disable metrics gathering for tasks. | false | true | +| `ECS_POLL_METRICS` | <true | false> | Whether to poll or stream when gathering metrics for tasks. | false | false | +| `ECS_POLLING_METRICS_WAIT_DURATION` | 30s | Time to wait to poll for new metrics for a task. Only used when ECS_POLL_METRICS is true | 15s | 15s | | `ECS_RESERVED_MEMORY` | 32 | Memory, in MB, to reserve for use by things other than containers managed by Amazon ECS. | 0 | 0 | | `ECS_AVAILABLE_LOGGING_DRIVERS` | `["awslogs","fluentd","gelf","json-file","journald","logentries","splunk","syslog"]` | Which logging drivers are available on the container instance. | `["json-file","none"]` | `["json-file","none"]` | | `ECS_DISABLE_PRIVILEGED` | `true` | Whether launching privileged containers is disabled on the container instance. | `false` | `false` | diff --git a/agent/config/config.go b/agent/config/config.go index a4557d8bb89..8b24fc1be89 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -60,6 +60,10 @@ const ( // clean up task's containers. DefaultTaskCleanupWaitDuration = 3 * time.Hour + // DefaultPollingMetricsWaitDuration specifies the default value for polling metrics wait duration + // This is only used when PollMetrics is set to true + DefaultPollingMetricsWaitDuration = 15 * time.Second + // defaultDockerStopTimeout specifies the value for container stop timeout duration defaultDockerStopTimeout = 30 * time.Second @@ -83,6 +87,14 @@ const ( // 'stuck' in the pull / unpack step. Very small values are unsafe and lead to high failure rate. minimumImagePullInactivityTimeout = 1 * time.Minute + // minimumPollingMetricsWaitDuration specifies the minimum duration to wait before polling for new stats + // from docker. This is only used when PollMetrics is set to true + minimumPollingMetricsWaitDuration = 1 * time.Second + + // maximumPollingMetricsWaitDuration specifies the maximum duration to wait before polling for new stats + // from docker. This is only used when PollMetrics is set to true + maximumPollingMetricsWaitDuration = 20 * time.Second + // minimumDockerStopTimeout specifies the minimum value for docker StopContainer API minimumDockerStopTimeout = 1 * time.Second @@ -303,11 +315,28 @@ func (cfg *Config) validateAndOverrideBounds() error { cfg.TaskMetadataBurstRate = DefaultTaskMetadataBurstRate } + // check the PollMetrics specific configurations + cfg.pollMetricsOverrides() + cfg.platformOverrides() return nil } +func (cfg *Config) pollMetricsOverrides() { + if cfg.PollMetrics { + if cfg.PollingMetricsWaitDuration < minimumPollingMetricsWaitDuration { + seelog.Warnf("Invalid value for polling metrics wait duration, will be overridden with the default value: %s. Parsed value: %v, minimum value: %v.", DefaultPollingMetricsWaitDuration.String(), cfg.PollingMetricsWaitDuration, minimumPollingMetricsWaitDuration) + cfg.PollingMetricsWaitDuration = DefaultPollingMetricsWaitDuration + } + + if cfg.PollingMetricsWaitDuration > maximumPollingMetricsWaitDuration { + seelog.Warnf("Invalid value for polling metrics wait duration, will be overridden with the default value: %s. Parsed value: %v, maximum value: %v.", DefaultPollingMetricsWaitDuration.String(), cfg.PollingMetricsWaitDuration, maximumPollingMetricsWaitDuration) + cfg.PollingMetricsWaitDuration = DefaultPollingMetricsWaitDuration + } + } +} + // checkMissingAndDeprecated checks all zero-valued fields for tags of the form // missing:STRING and acts based on that string. Current options are: fatal, // warn. Fatal will result in an error being returned, warn will result in a @@ -457,6 +486,8 @@ func environmentConfig() (Config, error) { UpdatesEnabled: utils.ParseBool(os.Getenv("ECS_UPDATES_ENABLED"), false), UpdateDownloadDir: os.Getenv("ECS_UPDATE_DOWNLOAD_DIR"), DisableMetrics: utils.ParseBool(os.Getenv("ECS_DISABLE_METRICS"), false), + PollMetrics: utils.ParseBool(os.Getenv("ECS_POLL_METRICS"), false), + PollingMetricsWaitDuration: parseEnvVariableDuration("ECS_POLLING_METRICS_WAIT_DURATION"), ReservedMemory: parseEnvVariableUint16("ECS_RESERVED_MEMORY"), AvailableLoggingDrivers: parseAvailableLoggingDrivers(), PrivilegedDisabled: utils.ParseBool(os.Getenv("ECS_DISABLE_PRIVILEGED"), false), @@ -514,6 +545,8 @@ func (cfg *Config) String() string { "AuthType: %v, "+ "UpdatesEnabled: %v, "+ "DisableMetrics: %v, "+ + "PollMetrics: %v, "+ + "PollingMetricsWaitDuration: %v, "+ "ReservedMem: %v, "+ "TaskCleanupWaitDuration: %v, "+ "DockerStopTimeout: %v, "+ @@ -527,6 +560,8 @@ func (cfg *Config) String() string { cfg.EngineAuthType, cfg.UpdatesEnabled, cfg.DisableMetrics, + cfg.PollMetrics, + cfg.PollingMetricsWaitDuration, cfg.ReservedMemory, cfg.TaskCleanupWaitDuration, cfg.DockerStopTimeout, diff --git a/agent/config/config_test.go b/agent/config/config_test.go index 12e8ed2c221..1b69d0a2098 100644 --- a/agent/config/config_test.go +++ b/agent/config/config_test.go @@ -111,6 +111,8 @@ func TestEnvironmentConfig(t *testing.T) { defer setTestEnv("ECS_ENABLE_TASK_ENI", "true")() defer setTestEnv("ECS_TASK_METADATA_RPS_LIMIT", "1000,1100")() defer setTestEnv("ECS_SHARED_VOLUME_MATCH_FULL_CONFIG", "true")() + defer setTestEnv("ECS_POLL_METRICS", "true")() + defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "10s")() additionalLocalRoutesJSON := `["1.2.3.4/22","5.6.7.8/32"]` setTestEnv("ECS_AWSVPC_ADDITIONAL_LOCAL_ROUTES", additionalLocalRoutesJSON) setTestEnv("ECS_ENABLE_CONTAINER_METADATA", "true") @@ -134,6 +136,9 @@ func TestEnvironmentConfig(t *testing.T) { assert.True(t, conf.TaskIAMRoleEnabled, "Wrong value for TaskIAMRoleEnabled") assert.True(t, conf.TaskIAMRoleEnabledForNetworkHost, "Wrong value for TaskIAMRoleEnabledForNetworkHost") assert.True(t, conf.ImageCleanupDisabled, "Wrong value for ImageCleanupDisabled") + assert.True(t, conf.PollMetrics, "Wrong value for PollMetrics") + expectedDurationPollingMetricsWaitDuration, _ := time.ParseDuration("10s") + assert.Equal(t, expectedDurationPollingMetricsWaitDuration, conf.PollingMetricsWaitDuration) assert.True(t, conf.TaskENIEnabled, "Wrong value for TaskNetwork") assert.Equal(t, (30 * time.Minute), conf.MinimumImageDeletionAge) @@ -212,6 +217,12 @@ func TestInvalidLoggingDriver(t *testing.T) { assert.Error(t, conf.validateAndOverrideBounds(), "Should be error with invalid-logging-driver") } +func TestDefaultPollMetricsWithoutECSDataDir(t *testing.T) { + conf, err := environmentConfig() + assert.NoError(t, err) + assert.False(t, conf.PollMetrics) +} + func TestDefaultCheckpointWithoutECSDataDir(t *testing.T) { conf, err := environmentConfig() assert.NoError(t, err) @@ -321,6 +332,24 @@ func TestInvalidValueDockerPullInactivityTimeout(t *testing.T) { assert.Equal(t, conf.ImagePullInactivityTimeout, defaultImagePullInactivityTimeout, "Wrong value for ImagePullInactivityTimeout") } +func TestInvalidValueMaxPollingMetricsWaitDuration(t *testing.T) { + defer setTestRegion()() + defer setTestEnv("ECS_POLL_METRICS", "true")() + defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "21s")() + conf, err := NewConfig(ec2.NewBlackholeEC2MetadataClient()) + assert.NoError(t, err) + assert.Equal(t, conf.PollingMetricsWaitDuration, DefaultPollingMetricsWaitDuration, "Wrong value for PollingMetricsWaitDuration") +} + +func TestInvalidValueMinPollingMetricsWaitDuration(t *testing.T) { + defer setTestRegion()() + defer setTestEnv("ECS_POLL_METRICS", "true")() + defer setTestEnv("ECS_POLLING_METRICS_WAIT_DURATION", "0s")() + conf, err := NewConfig(ec2.NewBlackholeEC2MetadataClient()) + assert.NoError(t, err) + assert.Equal(t, conf.PollingMetricsWaitDuration, DefaultPollingMetricsWaitDuration, "Wrong value for PollingMetricsWaitDuration") +} + func TestInvalidFormatParseEnvVariableUint16(t *testing.T) { defer setTestRegion()() setTestEnv("FOO", "foo") diff --git a/agent/config/config_unix.go b/agent/config/config_unix.go index d59ac8cfb1f..3a7e4369b1b 100644 --- a/agent/config/config_unix.go +++ b/agent/config/config_unix.go @@ -66,6 +66,8 @@ func DefaultConfig() Config { PauseContainerTarballPath: pauseContainerTarballPath, PauseContainerImageName: DefaultPauseContainerImageName, PauseContainerTag: DefaultPauseContainerTag, + PollMetrics: false, + PollingMetricsWaitDuration: DefaultPollingMetricsWaitDuration, AWSVPCBlockInstanceMetdata: false, ContainerMetadataEnabled: false, TaskCPUMemLimit: DefaultEnabled, diff --git a/agent/config/config_windows.go b/agent/config/config_windows.go index 30f10d1cacf..9418ae523d0 100644 --- a/agent/config/config_windows.go +++ b/agent/config/config_windows.go @@ -83,6 +83,8 @@ func DefaultConfig() Config { ReservedMemory: 0, AvailableLoggingDrivers: []dockerclient.LoggingDriver{dockerclient.JSONFileDriver, dockerclient.NoneDriver, dockerclient.AWSLogsDriver}, TaskCleanupWaitDuration: DefaultTaskCleanupWaitDuration, + PollMetrics: false, + PollingMetricsWaitDuration: DefaultPollingMetricsWaitDuration, DockerStopTimeout: defaultDockerStopTimeout, ContainerStartTimeout: defaultContainerStartTimeout, ImagePullInactivityTimeout: defaultImagePullInactivityTimeout, diff --git a/agent/config/types.go b/agent/config/types.go index 99f1d54398a..cf297f853e1 100644 --- a/agent/config/types.go +++ b/agent/config/types.go @@ -89,6 +89,14 @@ type Config struct { // sent to the ECS telemetry endpoint DisableMetrics bool + // PollMetrics configures whether metrics are constantly streamed for each container or + // polled on interval instead. + PollMetrics bool + + // PollingMetricsWaitDuration configures how long a container should wait before polling metrics + // again when PollMetrics is set to true + PollingMetricsWaitDuration time.Duration + // ReservedMemory specifies the amount of memory (in MB) to reserve for things // other than containers managed by ECS ReservedMemory uint16 diff --git a/agent/dockerclient/dockerapi/docker_client.go b/agent/dockerclient/dockerapi/docker_client.go index 1b10d957752..06a4c5449e9 100644 --- a/agent/dockerclient/dockerapi/docker_client.go +++ b/agent/dockerclient/dockerapi/docker_client.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "math/rand" "strconv" "strings" "sync" @@ -1263,41 +1264,76 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou statsChnl := make(chan *types.Stats) var resp types.ContainerStats - go func() { - defer cancelRequest() - defer close(statsChnl) - // ContainerStats outputs a io.ReadCloser and an OSType - stream := true - resp, err = client.ContainerStats(subCtx, id, stream) - if err != nil { - seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) - return - } - - // handle inactivity timeout - var canceled uint32 - var ch chan<- struct{} - resp.Body, ch = handleInactivityTimeout(resp.Body, inactivityTimeout, cancelRequest, &canceled) - defer resp.Body.Close() - defer close(ch) - - // Returns a *Decoder and takes in a readCloser - decoder := json.NewDecoder(resp.Body) - data := new(types.Stats) - for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { + if !dg.config.PollMetrics { + go func() { + defer cancelRequest() + defer close(statsChnl) + // ContainerStats outputs an io.ReadCloser and an OSType + stream := true + resp, err = client.ContainerStats(subCtx, id, stream) if err != nil { - seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) + seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) return } - if atomic.LoadUint32(&canceled) != 0 { - seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id) - return + + // handle inactivity timeout + var canceled uint32 + var ch chan<- struct{} + resp.Body, ch = handleInactivityTimeout(resp.Body, inactivityTimeout, cancelRequest, &canceled) + defer resp.Body.Close() + defer close(ch) + + // Returns a *Decoder and takes in a readCloser + decoder := json.NewDecoder(resp.Body) + data := new(types.Stats) + for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { + if err != nil { + seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) + return + } + if atomic.LoadUint32(&canceled) != 0 { + seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while retrieving stats for container %s", id) + return + } + + statsChnl <- data + data = new(types.Stats) } + }() + } else { + seelog.Infof("DockerGoClient: Starting to Poll for metrics for container %s", id) + //Sleep for a random period time up to the polling interval. This will help make containers ask for stats at different times + time.Sleep(time.Second * time.Duration(rand.Intn(int(dg.config.PollingMetricsWaitDuration.Seconds())))) + + statPollTicker := time.NewTicker(dg.config.PollingMetricsWaitDuration) + go func() { + defer cancelRequest() + defer close(statsChnl) + defer statPollTicker.Stop() + + for range statPollTicker.C { + // ContainerStats outputs an io.ReadCloser and an OSType + stream := false + resp, err = client.ContainerStats(subCtx, id, stream) + if err != nil { + seelog.Warnf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err) + return + } - statsChnl <- data - data = new(types.Stats) - } - }() + // Returns a *Decoder and takes in a readCloser + decoder := json.NewDecoder(resp.Body) + data := new(types.Stats) + err := decoder.Decode(data) + if err != nil { + seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err) + return + } + + statsChnl <- data + data = new(types.Stats) + } + }() + } return statsChnl, nil } diff --git a/agent/functional_tests/tests/functionaltests_test.go b/agent/functional_tests/tests/functionaltests_test.go index ebdd0ffddaf..753dc8be98a 100644 --- a/agent/functional_tests/tests/functionaltests_test.go +++ b/agent/functional_tests/tests/functionaltests_test.go @@ -539,6 +539,80 @@ func telemetryTest(t *testing.T, taskDefinition string) { assert.NoError(t, err, "Task stopped, verify metrics for memory utilization failed") } +func telemetryTestWithStatsPolling(t *testing.T, taskDefinition string) { + // Try to use a new cluster for this test, ensure no other task metrics for this cluster + newClusterName := "ecstest-telemetry-polling-" + uuid.New() + _, err := ECS.CreateCluster(&ecsapi.CreateClusterInput{ + ClusterName: aws.String(newClusterName), + }) + require.NoError(t, err, "Failed to create cluster") + defer DeleteCluster(t, newClusterName) + + // additional config fields to use polling instead of stream + agentOptions := AgentOptions{ + ExtraEnvironment: map[string]string{ + "ECS_CLUSTER": newClusterName, + "ECS_POLL_METRICS": "true", + "ECS_POLLING_METRICS_WAIT_DURATION": "15s", + }, + } + agent := RunAgent(t, &agentOptions) + defer agent.Cleanup() + + params := &cloudwatch.GetMetricStatisticsInput{ + MetricName: aws.String("CPUUtilization"), + Namespace: aws.String("AWS/ECS"), + Period: aws.Int64(60), + Statistics: []*string{ + aws.String("Average"), + aws.String("SampleCount"), + }, + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("ClusterName"), + Value: aws.String(newClusterName), + }, + }, + } + + cwclient := cloudwatch.New(session.New(), aws.NewConfig().WithRegion(*ECS.Config.Region)) + cpuNum := runtime.NumCPU() + + tdOverrides := make(map[string]string) + // Set the container cpu percentage 25% + tdOverrides["$$$$CPUSHARE$$$$"] = strconv.Itoa(int(float64(cpuNum*cpuSharesPerCore) * 0.25)) + + testTask, err := agent.StartTaskWithTaskDefinitionOverrides(t, taskDefinition, tdOverrides) + require.NoError(t, err, "Failed to start telemetry task") + // Wait for the task to run and the agent to send back metrics + err = testTask.WaitRunning(waitTaskStateChangeDuration) + require.NoError(t, err, "Error wait telemetry task running") + + time.Sleep(waitMetricsInCloudwatchDuration) + params.EndTime = aws.Time(RoundTimeUp(time.Now(), time.Minute).UTC()) + params.StartTime = aws.Time((*params.EndTime).Add(-waitMetricsInCloudwatchDuration).UTC()) + params.MetricName = aws.String("CPUUtilization") + metrics, err := VerifyMetrics(cwclient, params, false) + assert.NoError(t, err, "Task is running, verify metrics for CPU utilization failed") + // Also verify the cpu usage is around 25% +/- 5% + assert.InDelta(t, 25, *metrics.Average, 5) + + params.MetricName = aws.String("MemoryUtilization") + metrics, err = VerifyMetrics(cwclient, params, false) + assert.NoError(t, err, "Task is running, verify metrics for memory utilization failed") + memInfo, err := system.ReadMemInfo() + require.NoError(t, err, "Acquiring system info failed") + totalMemory := memInfo.MemTotal / bytePerMegabyte + // Verify the memory usage is around 1024/totalMemory +/- 5% + assert.InDelta(t, float32(1024*100)/float32(totalMemory), *metrics.Average, 5) + + err = testTask.Stop() + require.NoError(t, err, "Failed to stop the telemetry task") + + err = testTask.WaitStopped(waitTaskStateChangeDuration) + require.NoError(t, err, "Waiting for task stop failed") +} + // containerHealthMetricsTest tests the container health metrics based on the task definition func containerHealthMetricsTest(t *testing.T, taskDefinition string, overrides map[string]string) { agent := RunAgent(t, nil) diff --git a/agent/functional_tests/tests/functionaltests_unix_test.go b/agent/functional_tests/tests/functionaltests_unix_test.go index 6a0a1d6c6be..d3920a141d8 100644 --- a/agent/functional_tests/tests/functionaltests_unix_test.go +++ b/agent/functional_tests/tests/functionaltests_unix_test.go @@ -368,11 +368,16 @@ func TestAWSLogsDriver(t *testing.T) { assert.Equal(t, *resp.Events[0].Message, "hello world", fmt.Sprintf("Got log events message unexpected: %s", *resp.Events[0].Message)) } -// TestTelemetry tests whether agent can send metrics to TACS +// TestTelemetry tests whether agent can send metrics to TACS, through streaming docker stats func TestTelemetry(t *testing.T) { telemetryTest(t, "telemetry") } +// TestTelemetry tests whether agent can send metrics to TACS, through polling docker stats +func TestTelemetryWithStatsPolling(t *testing.T) { + telemetryTestWithStatsPolling(t, "telemetry") +} + func TestTaskIAMRolesNetHostMode(t *testing.T) { // The test runs only when the environment TEST_IAM_ROLE was set if os.Getenv("TEST_DISABLE_TASK_IAM_ROLE_NET_HOST") == "true" { diff --git a/agent/functional_tests/tests/functionaltests_windows_test.go b/agent/functional_tests/tests/functionaltests_windows_test.go index f1ef997b4b9..caffe91478f 100644 --- a/agent/functional_tests/tests/functionaltests_windows_test.go +++ b/agent/functional_tests/tests/functionaltests_windows_test.go @@ -193,11 +193,16 @@ func TestContainerMetadataFile(t *testing.T) { testContainerMetadataFile(t, "container-metadata-file-validator-windows", "ecs-functional-tests-container-metadata-file-validator-windows") } -// TestTelemetry tests whether agent can send metrics to TACS +// TestTelemetry tests whether agent can send metrics to TACS, through streaming docker stats func TestTelemetry(t *testing.T) { telemetryTest(t, "telemetry-windows") } +// TestTelemetry tests whether agent can send metrics to TACS, through polling docker stats +func TestTelemetryWithStatsPolling(t *testing.T) { + telemetryTestWithStatsPolling(t, "telemetry-windows") +} + // TestOOMContainer verifies that an OOM container returns an error func TestOOMContainer(t *testing.T) { agent := RunAgent(t, nil) diff --git a/agent/stats/engine_integ_test.go b/agent/stats/engine_integ_test.go index 79d6253ba0b..15809cc6329 100644 --- a/agent/stats/engine_integ_test.go +++ b/agent/stats/engine_integ_test.go @@ -340,6 +340,90 @@ func TestStatsEngineWithNewContainers(t *testing.T) { validateEmptyTaskHealthMetrics(t, engine) } +func TestStatsEngineWithNewContainersWithPolling(t *testing.T) { + // additional config fields to use polling instead of stream + cfg.PollMetrics = true + cfg.PollingMetricsWaitDuration = 1 * time.Second + // Create a new docker stats engine + engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers")) + defer engine.removeAll() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Assign ContainerStop timeout to addressable variable + timeout := defaultDockerTimeoutSeconds + + container, err := createHealthContainer(client) + require.NoError(t, err, "creating container failed") + defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + + containerChangeEventStream := eventStream("TestStatsEngineWithNewContainers") + taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, + nil, dockerstate.NewTaskEngineState(), nil, nil) + + testTask := createRunningTask() + // enable health check of the container + testTask.Containers[0].HealthCheckType = "docker" + // Populate Tasks and Container map in the engine. + dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) + dockerTaskEngine.State().AddTask(testTask) + dockerTaskEngine.State().AddContainer( + &apicontainer.DockerContainer{ + DockerID: container.ID, + DockerName: "container-health", + Container: testTask.Containers[0], + }, + testTask) + + err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance) + require.NoError(t, err, "initializing stats engine failed") + defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler) + + err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{}) + require.NoError(t, err, "starting container failed") + defer client.ContainerStop(ctx, container.ID, &timeout) + + // Write the container change event to event stream + err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ + Status: apicontainerstatus.ContainerRunning, + DockerContainerMetadata: dockerapi.DockerContainerMetadata{ + DockerID: container.ID, + }, + }) + assert.NoError(t, err, "failed to write to container change event stream") + + // Wait for the stats collection go routine to start. + time.Sleep(checkPointSleep) + validateInstanceMetrics(t, engine) + // Verify the health metrics of container + validateTaskHealthMetrics(t, engine) + + err = client.ContainerStop(ctx, container.ID, &timeout) + require.NoError(t, err, "stopping container failed") + + // Write the container change event to event stream + err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ + Status: apicontainerstatus.ContainerStopped, + DockerContainerMetadata: dockerapi.DockerContainerMetadata{ + DockerID: container.ID, + }, + }) + assert.NoError(t, err, "failed to write to container change event stream") + + time.Sleep(waitForCleanupSleep) + + // Should not contain any metrics after cleanup. + validateIdleContainerMetrics(t, engine) + validateEmptyTaskHealthMetrics(t, engine) + + // reset cfg, currently cfg is shared by all tests. + cfg.PollMetrics = false + cfg.PollingMetricsWaitDuration = config.DefaultPollingMetricsWaitDuration +} + func TestStatsEngineWithDockerTaskEngine(t *testing.T) { containerChangeEventStream := eventStream("TestStatsEngineWithDockerTaskEngine") taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream,