Skip to content

Commit

Permalink
[extension/ecsobserver] Init service client and mock (#3284)
Browse files Browse the repository at this point in the history
* ext: ecsobserver Init service client and mock

- Implement get all tasks
- Pacakge `ecsmock` provides ECS service API for a single cluster.

* ext: ecsobserver Move ecsmock to internal

It should get moved into internal/aws in the future. Keep it under
extension package for now
  • Loading branch information
pingleig authored May 11, 2021
1 parent cd16f93 commit ba57a13
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 1 deletion.
88 changes: 88 additions & 0 deletions extension/observer/ecsobserver/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/zap"
)

// ecsClient includes API required by taskFetcher.
type ecsClient interface {
ListTasksWithContext(ctx context.Context, input *ecs.ListTasksInput, opts ...request.Option) (*ecs.ListTasksOutput, error)
DescribeTasksWithContext(ctx context.Context, input *ecs.DescribeTasksInput, opts ...request.Option) (*ecs.DescribeTasksOutput, error)
}

type taskFetcher struct {
logger *zap.Logger
ecs ecsClient
cluster string
}

type taskFetcherOptions struct {
Logger *zap.Logger
Cluster string
Region string

// test overrides
ecsOverride ecsClient
}

func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) {
fetcher := taskFetcher{
logger: opts.Logger,
ecs: opts.ecsOverride,
cluster: opts.Cluster,
}
// Return early if clients are mocked
if fetcher.ecs != nil {
return &fetcher, nil
}
return nil, fmt.Errorf("actual aws init logic not implemented")
}

// GetAllTasks get arns of all running tasks and describe those tasks.
// There is no API to list task detail without arn so we need to call two APIs.
func (f *taskFetcher) GetAllTasks(ctx context.Context) ([]*ecs.Task, error) {
svc := f.ecs
cluster := aws.String(f.cluster)
req := ecs.ListTasksInput{Cluster: cluster}
var tasks []*ecs.Task
for {
listRes, err := svc.ListTasksWithContext(ctx, &req)
if err != nil {
return nil, fmt.Errorf("ecs.ListTasks failed: %w", err)
}
// NOTE: the limit for list task response and describe task request are both 100.
descRes, err := svc.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Cluster: cluster,
Tasks: listRes.TaskArns,
})
if err != nil {
return nil, fmt.Errorf("ecs.DescribeTasks failed: %w", err)
}
tasks = append(tasks, descRes.Tasks...)
if listRes.NextToken == nil {
break
}
req.NextToken = listRes.NextToken
}
return tasks, nil
}
42 changes: 42 additions & 0 deletions extension/observer/ecsobserver/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock"
)

func TestFetcher_GetAllTasks(t *testing.T) {
c := ecsmock.NewCluster()
f, err := newTaskFetcher(taskFetcherOptions{
Logger: zap.NewExample(),
Cluster: "not used",
Region: "not used",
ecsOverride: c,
})
require.NoError(t, err)
c.SetTasks(ecsmock.GenTasks("p", 203))
ctx := context.Background()
tasks, err := f.GetAllTasks(ctx)
require.NoError(t, err)
assert.Equal(t, 203, len(tasks))
}
17 changes: 17 additions & 0 deletions extension/observer/ecsobserver/internal/ecsmock/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package ecsmock implements mock server for ECS service API.
// Currently it is only used by ecsobserver extension for unit test.
package ecsmock
195 changes: 195 additions & 0 deletions extension/observer/ecsobserver/internal/ecsmock/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsmock

import (
"context"
"fmt"
"reflect"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ecs"
)

// PageLimit defines number of items in a single page for different APIs.
// Those numbers can be found on the Input and and Output struct comments.
// Call DefaultPageLimit() to config the mock to use numbers same as the actual AWS API.
type PageLimit struct {
ListTaskOutput int // default 100, max 100
DescribeTaskInput int // max 100
ListServiceOutput int // default 10, max 100
DescribeServiceInput int // max 10
DescribeContainerInstanceInput int // max 100
}

func DefaultPageLimit() PageLimit {
return PageLimit{
ListTaskOutput: 100,
DescribeTaskInput: 100,
ListServiceOutput: 10,
DescribeServiceInput: 10,
DescribeContainerInstanceInput: 100,
}
}

// Cluster implements both ECS and EC2 API for a single cluster.
type Cluster struct {
taskList []*ecs.Task
taskMap map[string]*ecs.Task
limit PageLimit
}

// NewCluster creates a mock ECS cluster with default limits.
func NewCluster() *Cluster {
return &Cluster{
taskMap: make(map[string]*ecs.Task),
limit: DefaultPageLimit(),
}
}

// API Start

func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksInput, _ ...request.Option) (*ecs.ListTasksOutput, error) {
page, err := getPage(pageInput{
nextToken: input.NextToken,
size: len(c.taskList),
limit: c.limit.ListTaskOutput,
})
if err != nil {
return nil, err
}
res := c.taskList[page.start:page.end]
return &ecs.ListTasksOutput{
TaskArns: getArns(res, func(i int) *string {
return res[i].TaskArn
}),
NextToken: page.nextToken,
}, nil
}

func (c *Cluster) DescribeTasksWithContext(_ context.Context, input *ecs.DescribeTasksInput, _ ...request.Option) (*ecs.DescribeTasksOutput, error) {
var (
failures []*ecs.Failure
tasks []*ecs.Task
)
for i, taskArn := range input.Tasks {
arn := aws.StringValue(taskArn)
task, ok := c.taskMap[arn]
if !ok {
failures = append(failures, &ecs.Failure{
Arn: taskArn,
Detail: aws.String(fmt.Sprintf("task not found index %d arn %s total tasks %d", i, arn, len(c.taskMap))),
Reason: aws.String("task not found"),
})
continue
}
tasks = append(tasks, task)
}
return &ecs.DescribeTasksOutput{Failures: failures, Tasks: tasks}, nil
}

// API End

// Hook Start

// SetTasks update both list and map.
func (c *Cluster) SetTasks(tasks []*ecs.Task) {
c.taskList = tasks
m := make(map[string]*ecs.Task, len(tasks))
for _, t := range tasks {
m[aws.StringValue(t.TaskArn)] = t
}
c.taskMap = m
}

// Hook End

// Util Start

// GenTasks returns tasks with TaskArn set to arnPrefix+offset, where offset is [0, count).
func GenTasks(arnPrefix string, count int) []*ecs.Task {
var tasks []*ecs.Task
for i := 0; i < count; i++ {
tasks = append(tasks, &ecs.Task{
TaskArn: aws.String(arnPrefix + strconv.Itoa(i)),
})
}
return tasks
}

// Util End

// pagination Start

type pageInput struct {
nextToken *string
size int
limit int
}

type pageOutput struct {
start int
end int
nextToken *string
}

// getPage returns new page offset based on existing one.
// It is not using the actual AWS token format, it simply uses number string to keep track of offset.
func getPage(p pageInput) (*pageOutput, error) {
var err error
start := 0
if p.nextToken != nil {
token := aws.StringValue(p.nextToken)
start, err = strconv.Atoi(token)
if err != nil {
return nil, fmt.Errorf("invalid next token %q: %w", token, err)
}
}
end := minInt(p.size, start+p.limit)
var newNextToken *string
if end < p.size {
newNextToken = aws.String(strconv.Itoa(end))
}
return &pageOutput{
start: start,
end: end,
nextToken: newNextToken,
}, nil
}

// pagination Emd

// 'generic' Start

// getArns is used by both ListTasks and ListServices
func getArns(items interface{}, arnGetter func(i int) *string) []*string {
rv := reflect.ValueOf(items)
var arns []*string
for i := 0; i < rv.Len(); i++ {
arns = append(arns, arnGetter(i))
}
return arns
}

// 'generic' End

func minInt(a, b int) int {
if a < b {
return a
}
return b
}
Loading

0 comments on commit ba57a13

Please sign in to comment.