Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/ecsobserver] Init service client and mock #3284

Merged
merged 2 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions extension/observer/ecsobserver/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/zap"
)

// ecsClient includes API required by taskFetcher.
type ecsClient interface {
ListTasksWithContext(ctx context.Context, input *ecs.ListTasksInput, opts ...request.Option) (*ecs.ListTasksOutput, error)
DescribeTasksWithContext(ctx context.Context, input *ecs.DescribeTasksInput, opts ...request.Option) (*ecs.DescribeTasksOutput, error)
}

type taskFetcher struct {
logger *zap.Logger
ecs ecsClient
cluster string
}

type taskFetcherOptions struct {
Logger *zap.Logger
Cluster string
Region string

// test overrides
ecsOverride ecsClient
}

func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) {
fetcher := taskFetcher{
logger: opts.Logger,
ecs: opts.ecsOverride,
cluster: opts.Cluster,
}
// Return early if clients are mocked
if fetcher.ecs != nil {
return &fetcher, nil
}
return nil, fmt.Errorf("actual aws init logic not implemented")
}

// GetAllTasks get arns of all running tasks and describe those tasks.
// There is no API to list task detail without arn so we need to call two APIs.
func (f *taskFetcher) GetAllTasks(ctx context.Context) ([]*ecs.Task, error) {
svc := f.ecs
cluster := aws.String(f.cluster)
req := ecs.ListTasksInput{Cluster: cluster}
var tasks []*ecs.Task
for {
listRes, err := svc.ListTasksWithContext(ctx, &req)
if err != nil {
return nil, fmt.Errorf("ecs.ListTasks failed: %w", err)
}
// NOTE: the limit for list task response and describe task request are both 100.
descRes, err := svc.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Cluster: cluster,
Tasks: listRes.TaskArns,
})
if err != nil {
return nil, fmt.Errorf("ecs.DescribeTasks failed: %w", err)
}
tasks = append(tasks, descRes.Tasks...)
if listRes.NextToken == nil {
break
}
req.NextToken = listRes.NextToken
}
return tasks, nil
}
42 changes: 42 additions & 0 deletions extension/observer/ecsobserver/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock"
)

func TestFetcher_GetAllTasks(t *testing.T) {
c := ecsmock.NewCluster()
f, err := newTaskFetcher(taskFetcherOptions{
Logger: zap.NewExample(),
Cluster: "not used",
Region: "not used",
ecsOverride: c,
})
require.NoError(t, err)
c.SetTasks(ecsmock.GenTasks("p", 203))
ctx := context.Background()
tasks, err := f.GetAllTasks(ctx)
require.NoError(t, err)
assert.Equal(t, 203, len(tasks))
}
17 changes: 17 additions & 0 deletions extension/observer/ecsobserver/internal/ecsmock/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package ecsmock implements mock server for ECS service API.
// Currently it is only used by ecsobserver extension for unit test.
package ecsmock
195 changes: 195 additions & 0 deletions extension/observer/ecsobserver/internal/ecsmock/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsmock

import (
"context"
"fmt"
"reflect"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ecs"
)

// PageLimit defines number of items in a single page for different APIs.
// Those numbers can be found on the Input and and Output struct comments.
// Call DefaultPageLimit() to config the mock to use numbers same as the actual AWS API.
type PageLimit struct {
ListTaskOutput int // default 100, max 100
DescribeTaskInput int // max 100
ListServiceOutput int // default 10, max 100
DescribeServiceInput int // max 10
DescribeContainerInstanceInput int // max 100
}

func DefaultPageLimit() PageLimit {
return PageLimit{
ListTaskOutput: 100,
DescribeTaskInput: 100,
ListServiceOutput: 10,
DescribeServiceInput: 10,
DescribeContainerInstanceInput: 100,
}
}

// Cluster implements both ECS and EC2 API for a single cluster.
type Cluster struct {
taskList []*ecs.Task
taskMap map[string]*ecs.Task
limit PageLimit
}

// NewCluster creates a mock ECS cluster with default limits.
func NewCluster() *Cluster {
return &Cluster{
taskMap: make(map[string]*ecs.Task),
limit: DefaultPageLimit(),
}
}

// API Start

func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksInput, _ ...request.Option) (*ecs.ListTasksOutput, error) {
page, err := getPage(pageInput{
nextToken: input.NextToken,
size: len(c.taskList),
limit: c.limit.ListTaskOutput,
})
if err != nil {
return nil, err
}
res := c.taskList[page.start:page.end]
return &ecs.ListTasksOutput{
TaskArns: getArns(res, func(i int) *string {
return res[i].TaskArn
}),
NextToken: page.nextToken,
}, nil
}

func (c *Cluster) DescribeTasksWithContext(_ context.Context, input *ecs.DescribeTasksInput, _ ...request.Option) (*ecs.DescribeTasksOutput, error) {
var (
failures []*ecs.Failure
tasks []*ecs.Task
)
for i, taskArn := range input.Tasks {
arn := aws.StringValue(taskArn)
task, ok := c.taskMap[arn]
if !ok {
failures = append(failures, &ecs.Failure{
Arn: taskArn,
Detail: aws.String(fmt.Sprintf("task not found index %d arn %s total tasks %d", i, arn, len(c.taskMap))),
Reason: aws.String("task not found"),
})
continue
}
tasks = append(tasks, task)
}
return &ecs.DescribeTasksOutput{Failures: failures, Tasks: tasks}, nil
}

// API End

// Hook Start

// SetTasks update both list and map.
func (c *Cluster) SetTasks(tasks []*ecs.Task) {
c.taskList = tasks
m := make(map[string]*ecs.Task, len(tasks))
for _, t := range tasks {
m[aws.StringValue(t.TaskArn)] = t
}
c.taskMap = m
}

// Hook End

// Util Start

// GenTasks returns tasks with TaskArn set to arnPrefix+offset, where offset is [0, count).
func GenTasks(arnPrefix string, count int) []*ecs.Task {
var tasks []*ecs.Task
for i := 0; i < count; i++ {
tasks = append(tasks, &ecs.Task{
TaskArn: aws.String(arnPrefix + strconv.Itoa(i)),
})
}
return tasks
}

// Util End

// pagination Start

type pageInput struct {
nextToken *string
size int
limit int
}

type pageOutput struct {
start int
end int
nextToken *string
}

// getPage returns new page offset based on existing one.
// It is not using the actual AWS token format, it simply uses number string to keep track of offset.
func getPage(p pageInput) (*pageOutput, error) {
var err error
start := 0
if p.nextToken != nil {
token := aws.StringValue(p.nextToken)
start, err = strconv.Atoi(token)
if err != nil {
return nil, fmt.Errorf("invalid next token %q: %w", token, err)
}
}
end := minInt(p.size, start+p.limit)
var newNextToken *string
if end < p.size {
newNextToken = aws.String(strconv.Itoa(end))
}
return &pageOutput{
start: start,
end: end,
nextToken: newNextToken,
}, nil
}

// pagination Emd

// 'generic' Start

// getArns is used by both ListTasks and ListServices
func getArns(items interface{}, arnGetter func(i int) *string) []*string {
rv := reflect.ValueOf(items)
var arns []*string
for i := 0; i < rv.Len(); i++ {
arns = append(arns, arnGetter(i))
}
return arns
}

// 'generic' End

func minInt(a, b int) int {
if a < b {
return a
}
return b
}
Loading