From 85fcaf9cb43d80b663eeab900270c66d3b3bf2b2 Mon Sep 17 00:00:00 2001 From: Mythri Garaga Manjunatha Date: Tue, 28 Jun 2022 14:32:04 -0700 Subject: [PATCH] Bugfix - collect and publish Service Connect metrics even when metrics gathering for tasks is explictly disabled --- agent/stats/common_test.go | 15 ++ agent/stats/engine.go | 35 ++++- agent/stats/engine_unix_integ_test.go | 206 +++++++++++++++----------- agent/stats/engine_unix_test.go | 40 +++++ agent/tcs/client/client.go | 13 +- 5 files changed, 208 insertions(+), 101 deletions(-) diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index ca226be1588..ccb8df05f9e 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -157,6 +157,21 @@ func validateInstanceMetrics(t *testing.T, engine *DockerStatsEngine, includeSer } } +func validateInstanceMetricsWithDisabledMetrics(t *testing.T, engine *DockerStatsEngine, includeServiceConnectStats bool) { + metadata, taskMetrics, err := engine.GetInstanceMetrics(includeServiceConnectStats) + assert.NoError(t, err, "gettting instance metrics failed") + assert.NoError(t, validateMetricsMetadata(metadata), "validating metadata failed") + assert.Len(t, taskMetrics, 1, "incorrect number of tasks") + + taskMetric := taskMetrics[0] + assert.Equal(t, aws.StringValue(taskMetric.TaskDefinitionFamily), taskDefinitionFamily, "unexpected task definition family") + assert.Equal(t, aws.StringValue(taskMetric.TaskDefinitionVersion), taskDefinitionVersion, "unexpected task definition version") + assert.NoError(t, validateContainerMetrics(taskMetric.ContainerMetrics, 0), "validating container metrics failed") + if includeServiceConnectStats { + assert.NoError(t, validateServiceConnectMetrics(taskMetric.ServiceConnectMetricsWrapper, 1), "validating service connect metrics failed") + } +} + func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expected int) error { if len(containerMetrics) != expected { return fmt.Errorf("Mismatch in number of ContainerStatsSet elements. Expected: %d, Got: %d", expected, len(containerMetrics)) diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 5a99f74bc0d..41ac182d2f6 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -417,7 +417,6 @@ func (engine *DockerStatsEngine) addToStatsContainerMapUnsafe( // GetInstanceMetrics gets all task metrics and instance metadata from stats engine. func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats bool) (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error) { - var taskMetrics []*ecstcs.TaskMetric idle := engine.isIdle() metricsMetadata := &ecstcs.MetricsMetadata{ Cluster: aws.String(engine.cluster), @@ -425,6 +424,8 @@ func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats b Idle: aws.Bool(idle), MessageId: aws.String(uuid.NewRandom().String()), } + + var taskMetrics []*ecstcs.TaskMetric if idle { seelog.Debug("Instance is idle. No task metrics to report") fin := true @@ -442,16 +443,24 @@ func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats b } } - for taskArn := range engine.tasksToContainers { + taskStatsToCollect := engine.getTaskStatsToCollect() + for taskArn := range taskStatsToCollect { + _, isServiceConnectTask := engine.taskToServiceConnectStats[taskArn] containerMetrics, err := engine.taskContainerMetricsUnsafe(taskArn) if err != nil { seelog.Debugf("Error getting container metrics for task: %s, err: %v", taskArn, err) - continue + // skip collecting service connect related metrics, if task is not service connect enabled + if !isServiceConnectTask { + continue + } } if len(containerMetrics) == 0 { seelog.Debugf("Empty containerMetrics for task, ignoring, task: %s", taskArn) - continue + // skip collecting service connect related metrics, if task is not service connect enabled + if !isServiceConnectTask { + continue + } } taskDef, exists := engine.tasksToDefinitions[taskArn] @@ -523,7 +532,7 @@ func (engine *DockerStatsEngine) isIdle() bool { engine.lock.RLock() defer engine.lock.RUnlock() - return len(engine.tasksToContainers) == 0 + return len(engine.tasksToContainers) == 0 && len(engine.taskToServiceConnectStats) == 0 } func (engine *DockerStatsEngine) containerHealthsToMonitor() bool { @@ -878,6 +887,22 @@ func (engine *DockerStatsEngine) ContainerDockerStats(taskARN string, containerI return containerStats, containerNetworkRateStats, nil } +// getTaskStatsToCollect returns a map of taskArns for which task metrics needs to collected +func (engine *DockerStatsEngine) getTaskStatsToCollect() map[string]bool { + taskStatsToCollect := make(map[string]bool) + for taskArn := range engine.tasksToContainers { + if _, taskArnExists := taskStatsToCollect[taskArn]; !taskArnExists { + taskStatsToCollect[taskArn] = true + } + } + for taskArn := range engine.taskToServiceConnectStats { + if _, taskArnExists := taskStatsToCollect[taskArn]; !taskArnExists { + taskStatsToCollect[taskArn] = true + } + } + return taskStatsToCollect +} + // getServiceConnectStats invokes the workflow to retrieve all service connect // related metrics for all service connect enabled tasks func (engine *DockerStatsEngine) getServiceConnectStats() error { diff --git a/agent/stats/engine_unix_integ_test.go b/agent/stats/engine_unix_integ_test.go index 275bd803cc5..cd07434e4a0 100644 --- a/agent/stats/engine_unix_integ_test.go +++ b/agent/stats/engine_unix_integ_test.go @@ -68,96 +68,122 @@ func TestStatsEngineWithNetworkStatsDifferentModes(t *testing.T) { } func TestStatsEngineWithServiceConnectMetrics(t *testing.T) { - testUDSPath := filepath.Join(t.TempDir(), "test_stats_metrics.sock") - - // Create a new docker stats engine - engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithServiceConnectMetrics")) - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - // Assign ContainerStop timeout to addressable variable - timeout := defaultDockerTimeoutSeconds - - // Create a container to get the container id. - container, err := createGremlin(client, "default") - require.NoError(t, err, "creating container failed") - defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) - - engine.cluster = defaultCluster - engine.containerInstanceArn = defaultContainerInstance - - err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{}) - require.NoError(t, err, "starting container failed") - defer client.ContainerStop(ctx, container.ID, &timeout) - - containerChangeEventStream := eventStream("TestStatsEngineWithServiceConnectMetrics") - taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, - nil, dockerstate.NewTaskEngineState(), nil, nil, nil) - testTask := createRunningTask() - testTask.ServiceConnectConfig = &serviceconnect.Config{ - ContainerName: serviceConnectContainerName, - RuntimeConfig: serviceconnect.RuntimeConfig{ - AdminSocketPath: testUDSPath, - StatsRequest: testStatsRestURL, - }, - } - // Populate Tasks and Container map in the engine. - dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) - dockerTaskEngine.State().AddTask(testTask) - dockerTaskEngine.State().AddContainer( - &apicontainer.DockerContainer{ - DockerID: container.ID, - DockerName: "gremlin", - Container: testTask.Containers[0], + testcases := []struct { + name string + shouldDisableMetrics bool + }{ + { + name: "Test Stats engine for Service Connect task with metrics enabled", }, - testTask) - - // Simulate container start prior to listener initialization. - time.Sleep(checkPointSleep) - err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance) - require.NoError(t, err, "initializing stats engine failed") - serviceConnectStats, err := newServiceConnectStats() - require.NoError(t, err, "expected no error") - engine.taskToServiceConnectStats[taskArn] = serviceConnectStats - assert.Equal(t, 1, len(engine.taskToServiceConnectStats)) - - defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler) - - // simulate appnet server providing service connect metrics - r := mux.NewRouter() - r.HandleFunc(testStatsRestPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "%v", stats) - })) - ts := httptest.NewUnstartedServer(r) - l, err := net.Listen("unix", testUDSPath) - require.NoError(t, err) - - ts.Listener.Close() - ts.Listener = l - ts.Start() - defer ts.Close() - - // Wait for the stats collection go routine to start. - time.Sleep(checkPointSleep) - validateInstanceMetrics(t, engine, true) - scStats := engine.taskToServiceConnectStats[taskArn] - require.True(t, scStats.sent, "expected service connect metrics sent flag to be set") - validateEmptyTaskHealthMetrics(t, engine) - - err = client.ContainerStop(ctx, container.ID, &timeout) - require.NoError(t, err, "stopping container failed") - - err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ - Status: apicontainerstatus.ContainerStopped, - DockerContainerMetadata: dockerapi.DockerContainerMetadata{ - DockerID: container.ID, + { + name: "Test Stats engine for Service Connect task with metrics disabled", + shouldDisableMetrics: true, }, - }) - assert.NoError(t, err, "failed to write to container change event stream") - - time.Sleep(waitForCleanupSleep) - - // Should not contain any metrics after cleanup. - validateIdleContainerMetrics(t, engine) - validateEmptyTaskHealthMetrics(t, engine) + } + testUDSPath := filepath.Join(t.TempDir(), "test_stats_metrics.sock") + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + testConfig := cfg + if tc.shouldDisableMetrics { + testConfig.DisableMetrics = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled} + } + + // Create a new docker stats engine + engine := NewDockerStatsEngine(&testConfig, dockerClient, eventStream("TestStatsEngineWithServiceConnectMetrics")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Assign ContainerStop timeout to addressable variable + timeout := defaultDockerTimeoutSeconds + + // Create a container to get the container id. + container, err := createGremlin(client, "default") + require.NoError(t, err, "creating container failed") + defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + + err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{}) + require.NoError(t, err, "starting container failed") + defer client.ContainerStop(ctx, container.ID, &timeout) + + containerChangeEventStream := eventStream("TestStatsEngineWithServiceConnectMetrics") + taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, + nil, dockerstate.NewTaskEngineState(), nil, nil, nil) + testTask := createRunningTask() + testTask.ServiceConnectConfig = &serviceconnect.Config{ + ContainerName: serviceConnectContainerName, + RuntimeConfig: serviceconnect.RuntimeConfig{ + AdminSocketPath: testUDSPath, + StatsRequest: testStatsRestURL, + }, + } + // Populate Tasks and Container map in the engine. + dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) + dockerTaskEngine.State().AddTask(testTask) + dockerTaskEngine.State().AddContainer( + &apicontainer.DockerContainer{ + DockerID: container.ID, + DockerName: "gremlin", + Container: testTask.Containers[0], + }, + testTask) + + // Simulate container start prior to listener initialization. + time.Sleep(checkPointSleep) + err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance) + require.NoError(t, err, "initializing stats engine failed") + serviceConnectStats, err := newServiceConnectStats() + require.NoError(t, err, "expected no error") + engine.taskToServiceConnectStats[taskArn] = serviceConnectStats + assert.Equal(t, 1, len(engine.taskToServiceConnectStats)) + + defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler) + + // simulate appnet server providing service connect metrics + r := mux.NewRouter() + r.HandleFunc(testStatsRestPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "%v", stats) + })) + ts := httptest.NewUnstartedServer(r) + l, err := net.Listen("unix", testUDSPath) + require.NoError(t, err) + + ts.Listener.Close() + ts.Listener = l + ts.Start() + defer ts.Close() + + // Wait for the stats collection go routine to start. + time.Sleep(checkPointSleep) + if tc.shouldDisableMetrics { + validateInstanceMetricsWithDisabledMetrics(t, engine, true) + } else { + validateInstanceMetrics(t, engine, true) + } + scStats := engine.taskToServiceConnectStats[taskArn] + require.True(t, scStats.sent, "expected service connect metrics sent flag to be set") + validateEmptyTaskHealthMetrics(t, engine) + + err = client.ContainerStop(ctx, container.ID, &timeout) + require.NoError(t, err, "stopping container failed") + + err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ + Status: apicontainerstatus.ContainerStopped, + DockerContainerMetadata: dockerapi.DockerContainerMetadata{ + DockerID: container.ID, + }, + }) + assert.NoError(t, err, "failed to write to container change event stream") + + time.Sleep(waitForCleanupSleep) + + // Should not contain any metrics after cleanup. + if !tc.shouldDisableMetrics { + validateIdleContainerMetrics(t, engine) + } + validateEmptyTaskHealthMetrics(t, engine) + }) + } } diff --git a/agent/stats/engine_unix_test.go b/agent/stats/engine_unix_test.go index ed67a1cd26d..d236b64d20d 100644 --- a/agent/stats/engine_unix_test.go +++ b/agent/stats/engine_unix_test.go @@ -22,8 +22,10 @@ import ( apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" apieni "github.com/aws/amazon-ecs-agent/agent/api/eni" + "github.com/aws/amazon-ecs-agent/agent/api/serviceconnect" apitask "github.com/aws/amazon-ecs-agent/agent/api/task" apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" + "github.com/aws/amazon-ecs-agent/agent/config" mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks" mock_resolver "github.com/aws/amazon-ecs-agent/agent/stats/resolver/mock" "github.com/docker/docker/api/types" @@ -125,3 +127,41 @@ func TestNetworkModeStatsAWSVPCMode(t *testing.T) { } } } + +func TestServiceConnectWithDisabledMetrics(t *testing.T) { + disableMetricsConfig := cfg + disableMetricsConfig.DisableMetrics = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled} + containerID := "containerID" + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + container := apicontainer.Container{ + Name: "service-connect", + HealthCheckType: "docker", + } + resolver := mock_resolver.NewMockContainerMetadataResolver(mockCtrl) + resolver.EXPECT().ResolveTask(containerID).Return(&apitask.Task{ + Arn: "t1", + KnownStatusUnsafe: apitaskstatus.TaskRunning, + Family: "f1", + ServiceConnectConfig: &serviceconnect.Config{ + ContainerName: "service-connect", + }, + Containers: []*apicontainer.Container{&container}, + }, nil) + resolver.EXPECT().ResolveContainer(containerID).Return(&apicontainer.DockerContainer{ + DockerID: containerID, + Container: &container, + }, nil).Times(2) + + engine := NewDockerStatsEngine(&disableMetricsConfig, nil, eventStream("TestServiceConnectWithDisabledMetrics")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.addAndStartStatsContainer(containerID) + + assert.Len(t, engine.tasksToContainers, 0, "No containers should be tracked if metrics is disabled") + assert.Len(t, engine.tasksToHealthCheckContainers, 1) + assert.Len(t, engine.taskToServiceConnectStats, 1) +} diff --git a/agent/tcs/client/client.go b/agent/tcs/client/client.go index 9ba17affc9b..2345c7e0bf7 100644 --- a/agent/tcs/client/client.go +++ b/agent/tcs/client/client.go @@ -116,9 +116,8 @@ func (cs *clientServer) Serve() error { cs.publishHealthTicker = time.NewTicker(cs.publishMetricsInterval) cs.pullInstanceStatusTicker = time.NewTicker(cs.publishMetricsInterval) - if !cs.disableResourceMetrics { - go cs.publishMetrics() - } + go cs.publishMetrics() + go cs.publishHealthMetrics() go cs.publishInstanceStatus() @@ -188,9 +187,11 @@ func (cs *clientServer) publishMetrics() { metricCounter = 0 } cs.statsEngine.SetPublishServiceConnectTickerInterval(metricCounter) - err := cs.publishMetricsOnce(includeServiceConnectStats) - if err != nil { - seelog.Warnf("Error publishing metrics: %v", err) + if !cs.disableResourceMetrics || includeServiceConnectStats { + err := cs.publishMetricsOnce(includeServiceConnectStats) + if err != nil { + seelog.Warnf("Error publishing metrics: %v", err) + } } case <-cs.ctx.Done(): return