Skip to content

Commit

Permalink
Add a task attachment handler to manage attachments from ACS
Browse files Browse the repository at this point in the history
  • Loading branch information
chienhanlin authored and yinyic committed Oct 3, 2022
1 parent 85fcaf9 commit d047662
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 47 deletions.
50 changes: 3 additions & 47 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,58 +323,14 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
//initialize resources map for task
task.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)

// get the service connect attachment from acsTask.Attachments
if acsTask.Attachments != nil {
scConfig, err := getServiceConnectConfig(acsTask)
if err != nil {
return nil, err
}
task.ServiceConnectConfig = scConfig
// extract and validate attachments
if err := handleTaskAttachments(acsTask, task); err != nil {
return nil, err
}

return task, nil
}

// getServiceConnectConfig returns service connect config from the service connect type attachment if it exists.
func getServiceConnectConfig(acsTask *ecsacs.Task) (*serviceconnect.Config, error) {
var scAttachment *ecsacs.Attachment
for _, attachment := range acsTask.Attachments {
if aws.StringValue(attachment.AttachmentType) == serviceConnectAttachmentType {
scAttachment = attachment
break
}
}

// extract the service connect container name and service connect config value from the service connect attachment
if scAttachment != nil {
networkMode := aws.StringValue(acsTask.NetworkMode)
ipv6Enabled := false
if acsTask.ElasticNetworkInterfaces != nil {
for _, eni := range acsTask.ElasticNetworkInterfaces {
if len(eni.Ipv6Addresses) != 0 {
ipv6Enabled = true
break
}
}
}

// parse service connect from the attachment value
scConfig, err := serviceconnect.ParseServiceConnectAttachment(scAttachment)
if err != nil {
return nil, fmt.Errorf("error parsing service connect config value from the service connect attachment: %w", err)
}

// validate service connect config
if err := serviceconnect.ValidateServiceConnectConfig(scConfig, acsTask.Containers, networkMode, ipv6Enabled); err != nil {
return nil, fmt.Errorf("service connect config validation failed: %w", err)
}

return scConfig, nil
}

return nil, nil
}

func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error {
err := task.initializeDockerLocalVolumes(dockerClient, ctx)
if err != nil {
Expand Down
109 changes: 109 additions & 0 deletions agent/api/task/task_attachment_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package task

import (
"fmt"

"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
"github.com/aws/amazon-ecs-agent/agent/logger"
"github.com/aws/aws-sdk-go/aws"
)

// AttachmentHandler defines an interface to handel attachment received from ACS.
type AttachmentHandler interface {
parseAttachment(acsAttachment *ecsacs.Attachment) error
validateAttachment(acsTask *ecsacs.Task) error
}

// ServiceConnectAttachmentHandler defines a service connect type attachment handler.
type ServiceConnectAttachmentHandler struct {
scConfig *serviceconnect.Config
}

// NewAttachmentHandlers returns all type of handlers to handle different types of attachment.
func NewAttachmentHandlers() map[string]AttachmentHandler {
attachmentHandlers := make(map[string]AttachmentHandler)
attachmentHandlers[serviceConnectAttachmentType] = &ServiceConnectAttachmentHandler{}
return attachmentHandlers
}

// getHandlerByType returns the attachment handler based on the given type, and returns error if no matching hander can be found.
func getHandlerByType(handlerType string, handlers map[string]AttachmentHandler) (AttachmentHandler, error) {
if handler, ok := handlers[handlerType]; ok {
return handler, nil
}
return nil, fmt.Errorf("error to find an attachment handler for %s attachment type", handlerType)
}

// attachment parser of service connect attachment handler.
func (scAttachment *ServiceConnectAttachmentHandler) parseAttachment(acsAttachment *ecsacs.Attachment) error {
config, err := serviceconnect.ParseServiceConnectAttachment(acsAttachment)
scAttachment.scConfig = config
return err
}

// attachment validator of service connect attachment handler.
func (scAttachment *ServiceConnectAttachmentHandler) validateAttachment(acsTask *ecsacs.Task) error {
config := scAttachment.scConfig
taskContainers := acsTask.Containers
networkMode := aws.StringValue(acsTask.NetworkMode)
ipv6Enabled := false
if acsTask.ElasticNetworkInterfaces != nil {
for _, eni := range acsTask.ElasticNetworkInterfaces {
if len(eni.Ipv6Addresses) != 0 {
ipv6Enabled = true
break
}
}
}
return serviceconnect.ValidateServiceConnectConfig(config, taskContainers, networkMode, ipv6Enabled)
}

// handleTaskAttachments parses and validates attachments based on attachment type.
func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
if acsTask.Attachments != nil {
var serviceConnectAttachment *ecsacs.Attachment
for _, attachment := range acsTask.Attachments {
switch aws.StringValue(attachment.AttachmentType) {
case serviceConnectAttachmentType:
serviceConnectAttachment = attachment
default:
logger.Debug("Received an attachment type", logger.Fields{
"attachmentType": attachment.AttachmentType,
})
}
}

handlers := NewAttachmentHandlers()
if serviceConnectAttachment != nil {
scHandler, err := getHandlerByType(serviceConnectAttachmentType, handlers)
if err != nil {
return err
}

if err := scHandler.(*ServiceConnectAttachmentHandler).parseAttachment(serviceConnectAttachment); err != nil {
return fmt.Errorf("error parsing service connect config value from the service connect attachment: %w", err)
}

// validate the service connect config parsed from the service connect attachment
if err := scHandler.(*ServiceConnectAttachmentHandler).validateAttachment(acsTask); err != nil {
return fmt.Errorf("service connect config validation failed: %w", err)
}
task.ServiceConnectConfig = scHandler.(*ServiceConnectAttachmentHandler).scConfig
}
}
return nil
}
202 changes: 202 additions & 0 deletions agent/api/task/task_attachment_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package task

import (
"fmt"
"strconv"
"strings"
"testing"

"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/assert"
)

var (
testSCContainerName = "ecs-service-connect"
testInboundListener = "testInboundListener"
testOutboundListener = "testOutboundListenerName"
testHost = "testHostName"
testIngressPort = "9090"
testIPv4 = "172.31.21.40"
testIPv4CIDR = "127.255.0.0/16"
testIPv6 = "abcd:dcba:1234:4321::"
testIPv6CIDR = "2002::1234:abcd:ffff:c0a8:101/64"
testIpv4ElasticNetworkInterface = &ecsacs.ElasticNetworkInterface{
Ipv4Addresses: []*ecsacs.IPv4AddressAssignment{
{
Primary: aws.Bool(true),
PrivateAddress: aws.String(testIPv4),
},
},
}
testIpv6ElasticNetworkInterface = &ecsacs.ElasticNetworkInterface{
Ipv6Addresses: []*ecsacs.IPv6AddressAssignment{
{
Address: aws.String(testIPv6),
},
},
}
)

func stringToPointer(s string) *string { return &s }

func getTestcontainerFromACS(containerName, networkMode string) *ecsacs.Container {
return &ecsacs.Container{
Name: aws.String(containerName),
DockerConfig: &ecsacs.DockerConfig{
HostConfig: aws.String(fmt.Sprintf(
`{"NetworkMode":"%s"}`, networkMode)),
},
}
}

func constructTestServiceConnectConfig(
ingressPort,
ingressListenerName,
egressListenerName,
egressIPv4Cidr,
egressIPv6Cidr,
dnsHostName,
dnsAddress string) string {
testIngressConfig := fmt.Sprintf(`\"ingressConfig\":[{\"interceptPort\":%s,\"listenerName\":\"%s\"}]`, ingressPort, ingressListenerName)
testEgressConfig := fmt.Sprintf(`\"egressConfig\":{\"listenerName\":\"%s\",\"vip\":{\"ipv4Cidr\":\"%s\",\"ipv6Cidr\":\"%s\"}}`, egressListenerName, egressIPv4Cidr, egressIPv6Cidr)
testDnsConfig := fmt.Sprintf(`\"dnsConfig\":[{\"hostname\":\"%s\",\"address\":\"%s\"}]`, dnsHostName, dnsAddress)
testServiceConnectConfig := strings.Join([]string{`"{`,
testEgressConfig + `,`,
testDnsConfig + `,`,
testIngressConfig,
`}"`,
}, "")
unquotedSCConfig, _ := strconv.Unquote(testServiceConnectConfig)
return unquotedSCConfig
}

func TestNewAttachmentHandlers(t *testing.T) {
handlers := NewAttachmentHandlers()
scHandler, err := getHandlerByType(serviceConnectAttachmentType, handlers)
assert.Nil(t, err, "Should not return error")
assert.NotNil(t, scHandler, "Should find service connect attachment type handler")
}

func TestHandleTaskAttachmentsWithServiceConnectAttachment(t *testing.T) {
tt := []struct {
testName string
testServiceConnectConfig string
shouldReturnError bool
}{
{
testName: "AWSVPC IPv6 enabled without error",
testServiceConnectConfig: constructTestServiceConnectConfig(
testIngressPort,
testInboundListener,
testOutboundListener,
testIPv4CIDR,
testIPv6CIDR,
testHost,
testIPv6,
),
shouldReturnError: false,
},
{
testName: "AWSVPC IPv6 enabled with error",
testServiceConnectConfig: constructTestServiceConnectConfig(
testIngressPort,
testInboundListener,
testOutboundListener,
"",
testIPv6CIDR,
testHost,
testIPv6,
),
shouldReturnError: true,
},
}

testExpectedSCConfig := &serviceconnect.Config{
ContainerName: testSCContainerName,
IngressConfig: []serviceconnect.IngressConfigEntry{
{
InterceptPort: aws.Uint16(9090),
ListenerName: testInboundListener,
},
},
EgressConfig: &serviceconnect.EgressConfig{
ListenerName: testOutboundListener,
VIP: serviceconnect.VIP{
IPV4CIDR: testIPv4CIDR,
IPV6CIDR: testIPv6CIDR,
},
},
DNSConfig: []serviceconnect.DNSConfigEntry{
{
HostName: testHost,
Address: testIPv6,
},
},
}

for _, tc := range tt {
t.Run(tc.testName, func(t *testing.T) {
testAcsTask := &ecsacs.Task{
ElasticNetworkInterfaces: []*ecsacs.ElasticNetworkInterface{testIpv6ElasticNetworkInterface},
Containers: []*ecsacs.Container{
getTestcontainerFromACS(testSCContainerName, AWSVPCNetworkMode),
},
Attachments: []*ecsacs.Attachment{
{
AttachmentArn: stringToPointer("attachmentArn"),
AttachmentProperties: []*ecsacs.AttachmentProperty{
{
Name: stringToPointer(serviceconnect.GetServiceConnectConfigKey()),
Value: stringToPointer(tc.testServiceConnectConfig),
},
{
Name: stringToPointer(serviceconnect.GetServiceConnectContainerNameKey()),
Value: stringToPointer(testSCContainerName),
},
},
AttachmentType: stringToPointer(serviceConnectAttachmentType),
},
},
NetworkMode: stringToPointer(AWSVPCNetworkMode),
}
testTask := &Task{}
err := handleTaskAttachments(testAcsTask, testTask)
if tc.shouldReturnError {
assert.NotNil(t, err, "Should return error")
} else {
assert.Nil(t, err, "Should not return error")
assert.NotNil(t, testTask.ServiceConnectConfig, "Should get valid service connect config from attachments")
assert.Equal(t, testExpectedSCConfig, testTask.ServiceConnectConfig)
}
})
}
}

func TestHandleTaskAttachmentsWithoutAttachment(t *testing.T) {
testAcsTask := &ecsacs.Task{
ElasticNetworkInterfaces: []*ecsacs.ElasticNetworkInterface{testIpv4ElasticNetworkInterface},
Containers: []*ecsacs.Container{
getTestcontainerFromACS("C1", BridgeNetworkMode),
},
NetworkMode: stringToPointer(BridgeNetworkMode),
}
testTask := &Task{}
err := handleTaskAttachments(testAcsTask, testTask)
assert.Nil(t, err, "Should not return error")
assert.Nil(t, testTask.ServiceConnectConfig, "Should not return service connect config from attachments")
}

0 comments on commit d047662

Please sign in to comment.