diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go new file mode 100644 index 000000000000..934740aff0c4 --- /dev/null +++ b/extension/observer/ecsobserver/fetcher.go @@ -0,0 +1,88 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsobserver + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/ecs" + "go.uber.org/zap" +) + +// ecsClient includes API required by taskFetcher. +type ecsClient interface { + ListTasksWithContext(ctx context.Context, input *ecs.ListTasksInput, opts ...request.Option) (*ecs.ListTasksOutput, error) + DescribeTasksWithContext(ctx context.Context, input *ecs.DescribeTasksInput, opts ...request.Option) (*ecs.DescribeTasksOutput, error) +} + +type taskFetcher struct { + logger *zap.Logger + ecs ecsClient + cluster string +} + +type taskFetcherOptions struct { + Logger *zap.Logger + Cluster string + Region string + + // test overrides + ecsOverride ecsClient +} + +func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { + fetcher := taskFetcher{ + logger: opts.Logger, + ecs: opts.ecsOverride, + cluster: opts.Cluster, + } + // Return early if clients are mocked + if fetcher.ecs != nil { + return &fetcher, nil + } + return nil, fmt.Errorf("actual aws init logic not implemented") +} + +// GetAllTasks get arns of all running tasks and describe those tasks. +// There is no API to list task detail without arn so we need to call two APIs. +func (f *taskFetcher) GetAllTasks(ctx context.Context) ([]*ecs.Task, error) { + svc := f.ecs + cluster := aws.String(f.cluster) + req := ecs.ListTasksInput{Cluster: cluster} + var tasks []*ecs.Task + for { + listRes, err := svc.ListTasksWithContext(ctx, &req) + if err != nil { + return nil, fmt.Errorf("ecs.ListTasks failed: %w", err) + } + // NOTE: the limit for list task response and describe task request are both 100. + descRes, err := svc.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{ + Cluster: cluster, + Tasks: listRes.TaskArns, + }) + if err != nil { + return nil, fmt.Errorf("ecs.DescribeTasks failed: %w", err) + } + tasks = append(tasks, descRes.Tasks...) + if listRes.NextToken == nil { + break + } + req.NextToken = listRes.NextToken + } + return tasks, nil +} diff --git a/extension/observer/ecsobserver/fetcher_test.go b/extension/observer/ecsobserver/fetcher_test.go new file mode 100644 index 000000000000..c2138dc11e8c --- /dev/null +++ b/extension/observer/ecsobserver/fetcher_test.go @@ -0,0 +1,42 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsobserver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" +) + +func TestFetcher_GetAllTasks(t *testing.T) { + c := ecsmock.NewCluster() + f, err := newTaskFetcher(taskFetcherOptions{ + Logger: zap.NewExample(), + Cluster: "not used", + Region: "not used", + ecsOverride: c, + }) + require.NoError(t, err) + c.SetTasks(ecsmock.GenTasks("p", 203)) + ctx := context.Background() + tasks, err := f.GetAllTasks(ctx) + require.NoError(t, err) + assert.Equal(t, 203, len(tasks)) +} diff --git a/extension/observer/ecsobserver/internal/ecsmock/doc.go b/extension/observer/ecsobserver/internal/ecsmock/doc.go new file mode 100644 index 000000000000..bc088e6b55c7 --- /dev/null +++ b/extension/observer/ecsobserver/internal/ecsmock/doc.go @@ -0,0 +1,17 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ecsmock implements mock server for ECS service API. +// Currently it is only used by ecsobserver extension for unit test. +package ecsmock diff --git a/extension/observer/ecsobserver/internal/ecsmock/service.go b/extension/observer/ecsobserver/internal/ecsmock/service.go new file mode 100644 index 000000000000..201010101d79 --- /dev/null +++ b/extension/observer/ecsobserver/internal/ecsmock/service.go @@ -0,0 +1,195 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsmock + +import ( + "context" + "fmt" + "reflect" + "strconv" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/ecs" +) + +// PageLimit defines number of items in a single page for different APIs. +// Those numbers can be found on the Input and and Output struct comments. +// Call DefaultPageLimit() to config the mock to use numbers same as the actual AWS API. +type PageLimit struct { + ListTaskOutput int // default 100, max 100 + DescribeTaskInput int // max 100 + ListServiceOutput int // default 10, max 100 + DescribeServiceInput int // max 10 + DescribeContainerInstanceInput int // max 100 +} + +func DefaultPageLimit() PageLimit { + return PageLimit{ + ListTaskOutput: 100, + DescribeTaskInput: 100, + ListServiceOutput: 10, + DescribeServiceInput: 10, + DescribeContainerInstanceInput: 100, + } +} + +// Cluster implements both ECS and EC2 API for a single cluster. +type Cluster struct { + taskList []*ecs.Task + taskMap map[string]*ecs.Task + limit PageLimit +} + +// NewCluster creates a mock ECS cluster with default limits. +func NewCluster() *Cluster { + return &Cluster{ + taskMap: make(map[string]*ecs.Task), + limit: DefaultPageLimit(), + } +} + +// API Start + +func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksInput, _ ...request.Option) (*ecs.ListTasksOutput, error) { + page, err := getPage(pageInput{ + nextToken: input.NextToken, + size: len(c.taskList), + limit: c.limit.ListTaskOutput, + }) + if err != nil { + return nil, err + } + res := c.taskList[page.start:page.end] + return &ecs.ListTasksOutput{ + TaskArns: getArns(res, func(i int) *string { + return res[i].TaskArn + }), + NextToken: page.nextToken, + }, nil +} + +func (c *Cluster) DescribeTasksWithContext(_ context.Context, input *ecs.DescribeTasksInput, _ ...request.Option) (*ecs.DescribeTasksOutput, error) { + var ( + failures []*ecs.Failure + tasks []*ecs.Task + ) + for i, taskArn := range input.Tasks { + arn := aws.StringValue(taskArn) + task, ok := c.taskMap[arn] + if !ok { + failures = append(failures, &ecs.Failure{ + Arn: taskArn, + Detail: aws.String(fmt.Sprintf("task not found index %d arn %s total tasks %d", i, arn, len(c.taskMap))), + Reason: aws.String("task not found"), + }) + continue + } + tasks = append(tasks, task) + } + return &ecs.DescribeTasksOutput{Failures: failures, Tasks: tasks}, nil +} + +// API End + +// Hook Start + +// SetTasks update both list and map. +func (c *Cluster) SetTasks(tasks []*ecs.Task) { + c.taskList = tasks + m := make(map[string]*ecs.Task, len(tasks)) + for _, t := range tasks { + m[aws.StringValue(t.TaskArn)] = t + } + c.taskMap = m +} + +// Hook End + +// Util Start + +// GenTasks returns tasks with TaskArn set to arnPrefix+offset, where offset is [0, count). +func GenTasks(arnPrefix string, count int) []*ecs.Task { + var tasks []*ecs.Task + for i := 0; i < count; i++ { + tasks = append(tasks, &ecs.Task{ + TaskArn: aws.String(arnPrefix + strconv.Itoa(i)), + }) + } + return tasks +} + +// Util End + +// pagination Start + +type pageInput struct { + nextToken *string + size int + limit int +} + +type pageOutput struct { + start int + end int + nextToken *string +} + +// getPage returns new page offset based on existing one. +// It is not using the actual AWS token format, it simply uses number string to keep track of offset. +func getPage(p pageInput) (*pageOutput, error) { + var err error + start := 0 + if p.nextToken != nil { + token := aws.StringValue(p.nextToken) + start, err = strconv.Atoi(token) + if err != nil { + return nil, fmt.Errorf("invalid next token %q: %w", token, err) + } + } + end := minInt(p.size, start+p.limit) + var newNextToken *string + if end < p.size { + newNextToken = aws.String(strconv.Itoa(end)) + } + return &pageOutput{ + start: start, + end: end, + nextToken: newNextToken, + }, nil +} + +// pagination Emd + +// 'generic' Start + +// getArns is used by both ListTasks and ListServices +func getArns(items interface{}, arnGetter func(i int) *string) []*string { + rv := reflect.ValueOf(items) + var arns []*string + for i := 0; i < rv.Len(); i++ { + arns = append(arns, arnGetter(i)) + } + return arns +} + +// 'generic' End + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/extension/observer/ecsobserver/internal/ecsmock/service_test.go b/extension/observer/ecsobserver/internal/ecsmock/service_test.go new file mode 100644 index 000000000000..4dd85181c7dc --- /dev/null +++ b/extension/observer/ecsobserver/internal/ecsmock/service_test.go @@ -0,0 +1,80 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsmock + +import ( + "context" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCluster_ListTasksWithContext(t *testing.T) { + ctx := context.Background() + c := NewCluster() + count := DefaultPageLimit().ListTaskOutput*2 + 1 + c.SetTasks(GenTasks("p", count)) + + t.Run("get all", func(t *testing.T) { + req := &ecs.ListTasksInput{} + listedTasks := 0 + pages := 0 + for { + res, err := c.ListTasksWithContext(ctx, req) + require.NoError(t, err) + listedTasks += len(res.TaskArns) + pages++ + if res.NextToken == nil { + break + } + req.NextToken = res.NextToken + } + assert.Equal(t, count, listedTasks) + assert.Equal(t, 3, pages) + }) + + t.Run("invalid token", func(t *testing.T) { + req := &ecs.ListTasksInput{NextToken: aws.String("asd")} + _, err := c.ListTasksWithContext(ctx, req) + require.Error(t, err) + }) +} + +func TestCluster_DescribeTasksWithContext(t *testing.T) { + ctx := context.Background() + c := NewCluster() + count := 10 + c.SetTasks(GenTasks("p", count)) + + t.Run("exists", func(t *testing.T) { + req := &ecs.DescribeTasksInput{Tasks: []*string{aws.String("p0"), aws.String(fmt.Sprintf("p%d", count-1))}} + res, err := c.DescribeTasksWithContext(ctx, req) + require.NoError(t, err) + assert.Len(t, res.Tasks, 2) + assert.Len(t, res.Failures, 0) + }) + + t.Run("not found", func(t *testing.T) { + req := &ecs.DescribeTasksInput{Tasks: []*string{aws.String("p0"), aws.String(fmt.Sprintf("p%d", count))}} + res, err := c.DescribeTasksWithContext(ctx, req) + require.NoError(t, err) + assert.Len(t, res.Tasks, 1) + assert.Len(t, res.Failures, 1) + }) +} diff --git a/extension/observer/ecsobserver/task.go b/extension/observer/ecsobserver/task.go index 09cc9ab65561..be8fa2735c8f 100644 --- a/extension/observer/ecsobserver/task.go +++ b/extension/observer/ecsobserver/task.go @@ -23,7 +23,7 @@ import ( ) // Task contains both raw task info and its definition. -// It is generated from TaskFetcher. +// It is generated from taskFetcher. type Task struct { Task *ecs.Task Definition *ecs.TaskDefinition