Skip to content

Commit

Permalink
Bugfix - collect and publish Service Connect metrics even when metric…
Browse files Browse the repository at this point in the history
…s gathering for tasks is explictly disabled
  • Loading branch information
mythri-garaga authored and yinyic committed Oct 3, 2022
1 parent 993b655 commit 85fcaf9
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 101 deletions.
15 changes: 15 additions & 0 deletions agent/stats/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ func validateInstanceMetrics(t *testing.T, engine *DockerStatsEngine, includeSer
}
}

func validateInstanceMetricsWithDisabledMetrics(t *testing.T, engine *DockerStatsEngine, includeServiceConnectStats bool) {
metadata, taskMetrics, err := engine.GetInstanceMetrics(includeServiceConnectStats)
assert.NoError(t, err, "gettting instance metrics failed")
assert.NoError(t, validateMetricsMetadata(metadata), "validating metadata failed")
assert.Len(t, taskMetrics, 1, "incorrect number of tasks")

taskMetric := taskMetrics[0]
assert.Equal(t, aws.StringValue(taskMetric.TaskDefinitionFamily), taskDefinitionFamily, "unexpected task definition family")
assert.Equal(t, aws.StringValue(taskMetric.TaskDefinitionVersion), taskDefinitionVersion, "unexpected task definition version")
assert.NoError(t, validateContainerMetrics(taskMetric.ContainerMetrics, 0), "validating container metrics failed")
if includeServiceConnectStats {
assert.NoError(t, validateServiceConnectMetrics(taskMetric.ServiceConnectMetricsWrapper, 1), "validating service connect metrics failed")
}
}

func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expected int) error {
if len(containerMetrics) != expected {
return fmt.Errorf("Mismatch in number of ContainerStatsSet elements. Expected: %d, Got: %d", expected, len(containerMetrics))
Expand Down
35 changes: 30 additions & 5 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,15 @@ func (engine *DockerStatsEngine) addToStatsContainerMapUnsafe(

// GetInstanceMetrics gets all task metrics and instance metadata from stats engine.
func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats bool) (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error) {
var taskMetrics []*ecstcs.TaskMetric
idle := engine.isIdle()
metricsMetadata := &ecstcs.MetricsMetadata{
Cluster: aws.String(engine.cluster),
ContainerInstance: aws.String(engine.containerInstanceArn),
Idle: aws.Bool(idle),
MessageId: aws.String(uuid.NewRandom().String()),
}

var taskMetrics []*ecstcs.TaskMetric
if idle {
seelog.Debug("Instance is idle. No task metrics to report")
fin := true
Expand All @@ -442,16 +443,24 @@ func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats b
}
}

for taskArn := range engine.tasksToContainers {
taskStatsToCollect := engine.getTaskStatsToCollect()
for taskArn := range taskStatsToCollect {
_, isServiceConnectTask := engine.taskToServiceConnectStats[taskArn]
containerMetrics, err := engine.taskContainerMetricsUnsafe(taskArn)
if err != nil {
seelog.Debugf("Error getting container metrics for task: %s, err: %v", taskArn, err)
continue
// skip collecting service connect related metrics, if task is not service connect enabled
if !isServiceConnectTask {
continue
}
}

if len(containerMetrics) == 0 {
seelog.Debugf("Empty containerMetrics for task, ignoring, task: %s", taskArn)
continue
// skip collecting service connect related metrics, if task is not service connect enabled
if !isServiceConnectTask {
continue
}
}

taskDef, exists := engine.tasksToDefinitions[taskArn]
Expand Down Expand Up @@ -523,7 +532,7 @@ func (engine *DockerStatsEngine) isIdle() bool {
engine.lock.RLock()
defer engine.lock.RUnlock()

return len(engine.tasksToContainers) == 0
return len(engine.tasksToContainers) == 0 && len(engine.taskToServiceConnectStats) == 0
}

func (engine *DockerStatsEngine) containerHealthsToMonitor() bool {
Expand Down Expand Up @@ -878,6 +887,22 @@ func (engine *DockerStatsEngine) ContainerDockerStats(taskARN string, containerI
return containerStats, containerNetworkRateStats, nil
}

// getTaskStatsToCollect returns a map of taskArns for which task metrics needs to collected
func (engine *DockerStatsEngine) getTaskStatsToCollect() map[string]bool {
taskStatsToCollect := make(map[string]bool)
for taskArn := range engine.tasksToContainers {
if _, taskArnExists := taskStatsToCollect[taskArn]; !taskArnExists {
taskStatsToCollect[taskArn] = true
}
}
for taskArn := range engine.taskToServiceConnectStats {
if _, taskArnExists := taskStatsToCollect[taskArn]; !taskArnExists {
taskStatsToCollect[taskArn] = true
}
}
return taskStatsToCollect
}

// getServiceConnectStats invokes the workflow to retrieve all service connect
// related metrics for all service connect enabled tasks
func (engine *DockerStatsEngine) getServiceConnectStats() error {
Expand Down
206 changes: 116 additions & 90 deletions agent/stats/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,96 +68,122 @@ func TestStatsEngineWithNetworkStatsDifferentModes(t *testing.T) {
}

func TestStatsEngineWithServiceConnectMetrics(t *testing.T) {
testUDSPath := filepath.Join(t.TempDir(), "test_stats_metrics.sock")

// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithServiceConnectMetrics"))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// Assign ContainerStop timeout to addressable variable
timeout := defaultDockerTimeoutSeconds

// Create a container to get the container id.
container, err := createGremlin(client, "default")
require.NoError(t, err, "creating container failed")
defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})

engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance

err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{})
require.NoError(t, err, "starting container failed")
defer client.ContainerStop(ctx, container.ID, &timeout)

containerChangeEventStream := eventStream("TestStatsEngineWithServiceConnectMetrics")
taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream,
nil, dockerstate.NewTaskEngineState(), nil, nil, nil)
testTask := createRunningTask()
testTask.ServiceConnectConfig = &serviceconnect.Config{
ContainerName: serviceConnectContainerName,
RuntimeConfig: serviceconnect.RuntimeConfig{
AdminSocketPath: testUDSPath,
StatsRequest: testStatsRestURL,
},
}
// Populate Tasks and Container map in the engine.
dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine)
dockerTaskEngine.State().AddTask(testTask)
dockerTaskEngine.State().AddContainer(
&apicontainer.DockerContainer{
DockerID: container.ID,
DockerName: "gremlin",
Container: testTask.Containers[0],
testcases := []struct {
name string
shouldDisableMetrics bool
}{
{
name: "Test Stats engine for Service Connect task with metrics enabled",
},
testTask)

// Simulate container start prior to listener initialization.
time.Sleep(checkPointSleep)
err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
serviceConnectStats, err := newServiceConnectStats()
require.NoError(t, err, "expected no error")
engine.taskToServiceConnectStats[taskArn] = serviceConnectStats
assert.Equal(t, 1, len(engine.taskToServiceConnectStats))

defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler)

// simulate appnet server providing service connect metrics
r := mux.NewRouter()
r.HandleFunc(testStatsRestPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%v", stats)
}))
ts := httptest.NewUnstartedServer(r)
l, err := net.Listen("unix", testUDSPath)
require.NoError(t, err)

ts.Listener.Close()
ts.Listener = l
ts.Start()
defer ts.Close()

// Wait for the stats collection go routine to start.
time.Sleep(checkPointSleep)
validateInstanceMetrics(t, engine, true)
scStats := engine.taskToServiceConnectStats[taskArn]
require.True(t, scStats.sent, "expected service connect metrics sent flag to be set")
validateEmptyTaskHealthMetrics(t, engine)

err = client.ContainerStop(ctx, container.ID, &timeout)
require.NoError(t, err, "stopping container failed")

err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{
Status: apicontainerstatus.ContainerStopped,
DockerContainerMetadata: dockerapi.DockerContainerMetadata{
DockerID: container.ID,
{
name: "Test Stats engine for Service Connect task with metrics disabled",
shouldDisableMetrics: true,
},
})
assert.NoError(t, err, "failed to write to container change event stream")

time.Sleep(waitForCleanupSleep)

// Should not contain any metrics after cleanup.
validateIdleContainerMetrics(t, engine)
validateEmptyTaskHealthMetrics(t, engine)
}
testUDSPath := filepath.Join(t.TempDir(), "test_stats_metrics.sock")
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
testConfig := cfg
if tc.shouldDisableMetrics {
testConfig.DisableMetrics = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
}

// Create a new docker stats engine
engine := NewDockerStatsEngine(&testConfig, dockerClient, eventStream("TestStatsEngineWithServiceConnectMetrics"))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// Assign ContainerStop timeout to addressable variable
timeout := defaultDockerTimeoutSeconds

// Create a container to get the container id.
container, err := createGremlin(client, "default")
require.NoError(t, err, "creating container failed")
defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})

engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance

err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{})
require.NoError(t, err, "starting container failed")
defer client.ContainerStop(ctx, container.ID, &timeout)

containerChangeEventStream := eventStream("TestStatsEngineWithServiceConnectMetrics")
taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream,
nil, dockerstate.NewTaskEngineState(), nil, nil, nil)
testTask := createRunningTask()
testTask.ServiceConnectConfig = &serviceconnect.Config{
ContainerName: serviceConnectContainerName,
RuntimeConfig: serviceconnect.RuntimeConfig{
AdminSocketPath: testUDSPath,
StatsRequest: testStatsRestURL,
},
}
// Populate Tasks and Container map in the engine.
dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine)
dockerTaskEngine.State().AddTask(testTask)
dockerTaskEngine.State().AddContainer(
&apicontainer.DockerContainer{
DockerID: container.ID,
DockerName: "gremlin",
Container: testTask.Containers[0],
},
testTask)

// Simulate container start prior to listener initialization.
time.Sleep(checkPointSleep)
err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
serviceConnectStats, err := newServiceConnectStats()
require.NoError(t, err, "expected no error")
engine.taskToServiceConnectStats[taskArn] = serviceConnectStats
assert.Equal(t, 1, len(engine.taskToServiceConnectStats))

defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler)

// simulate appnet server providing service connect metrics
r := mux.NewRouter()
r.HandleFunc(testStatsRestPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%v", stats)
}))
ts := httptest.NewUnstartedServer(r)
l, err := net.Listen("unix", testUDSPath)
require.NoError(t, err)

ts.Listener.Close()
ts.Listener = l
ts.Start()
defer ts.Close()

// Wait for the stats collection go routine to start.
time.Sleep(checkPointSleep)
if tc.shouldDisableMetrics {
validateInstanceMetricsWithDisabledMetrics(t, engine, true)
} else {
validateInstanceMetrics(t, engine, true)
}
scStats := engine.taskToServiceConnectStats[taskArn]
require.True(t, scStats.sent, "expected service connect metrics sent flag to be set")
validateEmptyTaskHealthMetrics(t, engine)

err = client.ContainerStop(ctx, container.ID, &timeout)
require.NoError(t, err, "stopping container failed")

err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{
Status: apicontainerstatus.ContainerStopped,
DockerContainerMetadata: dockerapi.DockerContainerMetadata{
DockerID: container.ID,
},
})
assert.NoError(t, err, "failed to write to container change event stream")

time.Sleep(waitForCleanupSleep)

// Should not contain any metrics after cleanup.
if !tc.shouldDisableMetrics {
validateIdleContainerMetrics(t, engine)
}
validateEmptyTaskHealthMetrics(t, engine)
})
}
}
40 changes: 40 additions & 0 deletions agent/stats/engine_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apieni "github.com/aws/amazon-ecs-agent/agent/api/eni"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status"
"github.com/aws/amazon-ecs-agent/agent/config"
mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks"
mock_resolver "github.com/aws/amazon-ecs-agent/agent/stats/resolver/mock"
"github.com/docker/docker/api/types"
Expand Down Expand Up @@ -125,3 +127,41 @@ func TestNetworkModeStatsAWSVPCMode(t *testing.T) {
}
}
}

func TestServiceConnectWithDisabledMetrics(t *testing.T) {
disableMetricsConfig := cfg
disableMetricsConfig.DisableMetrics = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
containerID := "containerID"
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

container := apicontainer.Container{
Name: "service-connect",
HealthCheckType: "docker",
}
resolver := mock_resolver.NewMockContainerMetadataResolver(mockCtrl)
resolver.EXPECT().ResolveTask(containerID).Return(&apitask.Task{
Arn: "t1",
KnownStatusUnsafe: apitaskstatus.TaskRunning,
Family: "f1",
ServiceConnectConfig: &serviceconnect.Config{
ContainerName: "service-connect",
},
Containers: []*apicontainer.Container{&container},
}, nil)
resolver.EXPECT().ResolveContainer(containerID).Return(&apicontainer.DockerContainer{
DockerID: containerID,
Container: &container,
}, nil).Times(2)

engine := NewDockerStatsEngine(&disableMetricsConfig, nil, eventStream("TestServiceConnectWithDisabledMetrics"))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
engine.ctx = ctx
engine.resolver = resolver
engine.addAndStartStatsContainer(containerID)

assert.Len(t, engine.tasksToContainers, 0, "No containers should be tracked if metrics is disabled")
assert.Len(t, engine.tasksToHealthCheckContainers, 1)
assert.Len(t, engine.taskToServiceConnectStats, 1)
}
13 changes: 7 additions & 6 deletions agent/tcs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ func (cs *clientServer) Serve() error {
cs.publishHealthTicker = time.NewTicker(cs.publishMetricsInterval)
cs.pullInstanceStatusTicker = time.NewTicker(cs.publishMetricsInterval)

if !cs.disableResourceMetrics {
go cs.publishMetrics()
}
go cs.publishMetrics()

go cs.publishHealthMetrics()

go cs.publishInstanceStatus()
Expand Down Expand Up @@ -188,9 +187,11 @@ func (cs *clientServer) publishMetrics() {
metricCounter = 0
}
cs.statsEngine.SetPublishServiceConnectTickerInterval(metricCounter)
err := cs.publishMetricsOnce(includeServiceConnectStats)
if err != nil {
seelog.Warnf("Error publishing metrics: %v", err)
if !cs.disableResourceMetrics || includeServiceConnectStats {
err := cs.publishMetricsOnce(includeServiceConnectStats)
if err != nil {
seelog.Warnf("Error publishing metrics: %v", err)
}
}
case <-cs.ctx.Done():
return
Expand Down

0 comments on commit 85fcaf9

Please sign in to comment.