From f02856c08b33af6ee890c5d1e249a4d8d156d4d7 Mon Sep 17 00:00:00 2001 From: Richard Kovacs Date: Wed, 21 Mar 2018 10:45:28 +0100 Subject: [PATCH] Refactor cloud inits and filter in operation --- Makefile | 16 ++++- action/termination.go | 2 +- aws/aws.go | 133 ++++++++++++++++++++------------------- azure/azure.go | 63 ++++++++++--------- context/context.go | 24 ++++++- gcp/gcp.go | 64 +++++++++---------- operation/common.go | 23 ++++--- operation/longrunning.go | 21 +++---- operation/ownerless.go | 19 +++++- types/cloud.go | 1 - utils/utils.go | 4 +- 11 files changed, 211 insertions(+), 159 deletions(-) diff --git a/Makefile b/Makefile index f5bfd21d..1f2a237b 100644 --- a/Makefile +++ b/Makefile @@ -6,10 +6,20 @@ PKG_BASE=github.com/hortonworks/cloud-cost-reducer BUILD_TIME=$(shell date +%FT%T) LDFLAGS=-X $(PKG_BASE)/context.Version=${VERSION} -X $(PKG_BASE)/context.BuildTime=${BUILD_TIME} -GCP_OWNER_LABEL?=owner -LDFLAGS+= -X $(PKG_BASE)/gcp.OwnerLabel=$(GCP_OWNER_LABEL) +AWS_IGNORE_LABEL?=cloud-cost-reducer-ignore +LDFLAGS+= -X $(PKG_BASE)/context.AwsIgnoreLabel=$(AWS_IGNORE_LABEL) +AWS_OWNER_LABEL?=Owner +LDFLAGS+= -X $(PKG_BASE)/contextgcp.AwsOwnerLabel=$(AWS_OWNER_LABEL) + +AZURE_IGNORE_LABEL?=cloud-cost-reducer-ignore +LDFLAGS+= -X $(PKG_BASE)/context.AzureIgnoreLabel=$(AZURE_IGNORE_LABEL) +AZURE_OWNER_LABEL?=Owner +LDFLAGS+= -X $(PKG_BASE)/contextgcp.AzureOwnerLabel=$(AZURE_OWNER_LABEL) + GCP_IGNORE_LABEL?=cloud-cost-reducer-ignore -LDFLAGS+= -X $(PKG_BASE)/gcp.IgnoreLabel=$(GCP_IGNORE_LABEL) +LDFLAGS+= -X $(PKG_BASE)/context.GcpIgnoreLabel=$(GCP_IGNORE_LABEL) +GCP_OWNER_LABEL?=owner +LDFLAGS+= -X $(PKG_BASE)/contextgcp.GcpOwnerLabel=$(GCP_OWNER_LABEL) GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./.git/*") diff --git a/action/termination.go b/action/termination.go index 2631d0cb..682822f9 100644 --- a/action/termination.go +++ b/action/termination.go @@ -33,7 +33,7 @@ func (a TerminationAction) Execute(allInstances []*types.Instance) { log.Errorf("[TERMINATION] Failed to terminate instances on %s, err: %s", cType.String(), err.Error()) } } - }(t, p) + }(t, p()) } wg.Wait() } diff --git a/aws/aws.go b/aws/aws.go index d59e393c..4d04a8fb 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -3,6 +3,7 @@ package aws import ( "crypto/tls" "errors" + "fmt" "net/http" "sync" @@ -20,28 +21,74 @@ var ( ) func init() { - log.Infof("[AWS] Trying to register as provider") - var err error - regions, err = getRegions() - if err != nil { - log.Errorf("[AWS] Failed to authenticate, err: %s", err.Error()) - return + context.CloudProviders[types.AWS] = func() types.CloudProvider { + prepare() + return new(AwsProvider) } - for _, region := range regions { - if client, err := newEc2Client(®ion); err != nil { - log.Errorf("[AWS] Failed to create client in region %s, err: %s", region, err.Error()) - return - } else { - regionClients[region] = client +} + +func prepare() { + if len(regionClients) == 0 { + log.Infof("[AWS] Trying to prepare") + var err error + regions, err = getRegions() + if err != nil { + panic("[AWS] Failed to authenticate, err: " + err.Error()) } + for _, region := range regions { + if client, err := newEc2Client(®ion); err != nil { + panic(fmt.Sprintf("[AWS] Failed to create client in region %s, err: %s", region, err.Error())) + } else { + regionClients[region] = client + } + } + log.Infof("[AWS] Successfully prepared") } - context.CloudProviders[types.AWS] = new(AwsProvider) - log.Infof("[AWS] Successfully registered as provider") } type AwsProvider struct { } +func (p *AwsProvider) GetRunningInstances() ([]*types.Instance, error) { + instChan := make(chan *types.Instance, 5) + wg := sync.WaitGroup{} + wg.Add(len(regions)) + for _, region := range regions { + go func(region string) { + defer wg.Done() + + filterName := "instance-state-name" + filterValue := ec2.InstanceStateNameRunning + runningFilter := []*ec2.Filter{{Name: &filterName, Values: []*string{&filterValue}}} + instanceResult, e := regionClients[region].DescribeInstances(&ec2.DescribeInstancesInput{ + Filters: runningFilter, + }) + if e != nil { + log.Errorf("[AWS] Failed to fetch the running instances in region: %s, err: %s", region, e) + return + } + for _, res := range instanceResult.Reservations { + for _, inst := range res.Instances { + instChan <- newInstance(inst) + } + } + }(region) + } + go func() { + wg.Wait() + close(instChan) + }() + instances := []*types.Instance{} + for inst := range instChan { + instances = append(instances, inst) + } + return instances, nil +} + +func (a AwsProvider) TerminateInstances([]*types.Instance) error { + return errors.New("[AWS] Termination not supported") +} + func getRegions() ([]string, error) { client, err := newEc2Client(nil) if err != nil { @@ -86,49 +133,6 @@ func newEc2Client(region *string) (*ec2.EC2, error) { return ec2.New(awsSession), nil } -func (p *AwsProvider) GetRunningInstances() ([]*types.Instance, error) { - instChan := make(chan *types.Instance, 5) - wg := sync.WaitGroup{} - wg.Add(len(regions)) - for _, region := range regions { - go func(region string) { - defer wg.Done() - - filterName := "instance-state-name" - filterValue := ec2.InstanceStateNameRunning - runningFilter := []*ec2.Filter{{Name: &filterName, Values: []*string{&filterValue}}} - instanceResult, e := regionClients[region].DescribeInstances(&ec2.DescribeInstancesInput{ - Filters: runningFilter, - }) - if e != nil { - log.Errorf("[AWS] Failed to fetch the running instances in region: %s, err: %s", region, e) - return - } - for _, res := range instanceResult.Reservations { - for _, inst := range res.Instances { // TODO filter by ignore tag - tags := getTags(inst.Tags) - instChan <- &types.Instance{ - Id: *inst.InstanceId, - Name: tags["Name"], - Created: *inst.LaunchTime, - CloudType: types.AWS, - Tags: tags, - } - } - } - }(region) - } - go func() { - wg.Wait() - close(instChan) - }() - instances := []*types.Instance{} - for inst := range instChan { - instances = append(instances, inst) - } - return instances, nil -} - func getTags(ec2Tags []*ec2.Tag) types.Tags { tags := make(types.Tags, 0) for _, t := range ec2Tags { @@ -137,10 +141,13 @@ func getTags(ec2Tags []*ec2.Tag) types.Tags { return tags } -func (a AwsProvider) GetOwnerLessInstances() ([]*types.Instance, error) { - return nil, errors.New("[AWS] Ownerless operation not supported") -} - -func (a AwsProvider) TerminateInstances([]*types.Instance) error { - return errors.New("[AWS] Termination not supported") +func newInstance(inst *ec2.Instance) *types.Instance { + tags := getTags(inst.Tags) + return &types.Instance{ + Id: *inst.InstanceId, + Name: tags["Name"], + Created: *inst.LaunchTime, + CloudType: types.AWS, + Tags: tags, + } } diff --git a/azure/azure.go b/azure/azure.go index aac4669f..a6d0b6a5 100644 --- a/azure/azure.go +++ b/azure/azure.go @@ -8,7 +8,6 @@ import ( "github.com/hortonworks/cloud-cost-reducer/utils" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-12-01/compute" - "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" "github.com/Azure/go-autorest/autorest/azure/auth" log "github.com/Sirupsen/logrus" "github.com/hortonworks/cloud-cost-reducer/context" @@ -16,34 +15,36 @@ import ( ) var ( + IgnoreLabel string = "cloud-cost-reducer-ignore" subscriptionId string vmClient compute.VirtualMachinesClient - rgClient resources.GroupsClient - resClient resources.Client + // rgClient resources.GroupsClient + // resClient resources.Client + typesToCollect = map[string]bool{"Microsoft.Compute/virtualMachines": true} ) -var typesToCollect = map[string]bool{"Microsoft.Compute/virtualMachines": true} - func init() { - subscriptionId = os.Getenv("AZURE_SUBSCRIPTION_ID") - if len(subscriptionId) > 0 { - log.Infof("[AZURE] Trying to register as provider") + context.CloudProviders[types.AZURE] = func() types.CloudProvider { + prepare() + return new(AzureProvider) + } +} + +func prepare() { + if len(vmClient.SubscriptionID) == 0 { + subscriptionId = os.Getenv("AZURE_SUBSCRIPTION_ID") + if len(subscriptionId) == 0 { + panic("[AZURE] AZURE_SUBSCRIPTION_ID environment variable is missing") + } + log.Infof("[AZURE] Trying to prepare") authorization, err := auth.NewAuthorizerFromEnvironment() if err != nil { - log.Errorf("[AZURE] Failed to authenticate, err: %s", err.Error()) - return + panic("[AZURE] Failed to authenticate, err: " + err.Error()) } vmClient = compute.NewVirtualMachinesClient(subscriptionId) vmClient.Authorizer = authorization - rgClient = resources.NewGroupsClient(subscriptionId) - rgClient.Authorizer = authorization - resClient = resources.NewClient(subscriptionId) - resClient.Authorizer = authorization - context.CloudProviders[types.AZURE] = new(AzureProvider) - log.Info("[AZURE] Successfully registered as provider") - } else { - log.Warn("[AZURE] AZURE_SUBSCRIPTION_ID environment variable is missing") + log.Info("[AZURE] Successfully prepared") } } @@ -57,24 +58,19 @@ func (p *AzureProvider) GetRunningInstances() ([]*types.Instance, error) { log.Errorf("[AZURE] Failed to fetch the running instances, err: %s", err.Error()) return nil, err } - for _, inst := range result.Values() { // TODO filter by ignore tag - instances = append(instances, &types.Instance{ - Name: *inst.Name, - Id: *inst.ID, - CloudType: types.AZURE, - Tags: utils.ConvertTags(inst.Tags), - }) + for _, inst := range result.Values() { + instances = append(instances, newInstance(inst)) } return instances, nil } -func (a AzureProvider) GetOwnerLessInstances() ([]*types.Instance, error) { - return nil, errors.New("[AZURE] Ownerless operation not supported") -} - func (a AzureProvider) TerminateInstances([]*types.Instance) error { return errors.New("[AZURE] Termination not supported") // AZURE + // rgClient = resources.NewGroupsClient(subscriptionId) + // rgClient.Authorizer = authorization + // resClient = resources.NewClient(subscriptionId) + // resClient.Authorizer = authorization // instances := make([]*types.Instance, 0) // groups, err := rgClient.List(ctx.Background(), "", nil) // if err != nil { @@ -103,3 +99,12 @@ func (a AzureProvider) TerminateInstances([]*types.Instance) error { // return instances, nil } + +func newInstance(inst compute.VirtualMachine) *types.Instance { + return &types.Instance{ + Name: *inst.Name, + Id: *inst.ID, + CloudType: types.AZURE, + Tags: utils.ConvertTags(inst.Tags), + } +} diff --git a/context/context.go b/context/context.go index 4fb329e2..d6c66514 100644 --- a/context/context.go +++ b/context/context.go @@ -7,14 +7,36 @@ import ( var ( Version string BuildTime string + + AwsIgnoreLabel string = "cloud-cost-reducer-ignore" + AzureIgnoreLabel string = "cloud-cost-reducer-ignore" + GcpIgnoreLabel string = "cloud-cost-reducer-ignore" + + AwsOwnerLabel string = "Owner" + AzureOwnerLabel string = "Owner" + GcpOwnerLabel string = "owner" ) var DryRun = false var Operations = make(map[types.OpType]types.Operation) -var CloudProviders = make(map[types.CloudType]types.CloudProvider) +var CloudProviders = make(map[types.CloudType]func() types.CloudProvider) + +var IgnoreLabels = make(map[types.CloudType]string) + +var OwnerLabels = make(map[types.CloudType]string) var Dispatchers = make(map[string]types.Dispatcher) var Actions = make(map[types.ActionType]types.Action) + +func init() { + IgnoreLabels[types.AWS] = AwsIgnoreLabel + IgnoreLabels[types.AZURE] = AzureIgnoreLabel + IgnoreLabels[types.GCP] = GcpIgnoreLabel + + OwnerLabels[types.AWS] = AwsOwnerLabel + OwnerLabels[types.AZURE] = AzureOwnerLabel + OwnerLabels[types.GCP] = GcpOwnerLabel +} diff --git a/gcp/gcp.go b/gcp/gcp.go index 1135eef6..e85f9861 100644 --- a/gcp/gcp.go +++ b/gcp/gcp.go @@ -18,30 +18,34 @@ import ( ) var ( - IgnoreLabel string = "cloud-cost-reducer-ignore" - OwnerLabel string = "owner" projectId string computeClient *compute.Service ) func init() { - projectId = os.Getenv("GOOGLE_PROJECT_ID") - if len(projectId) > 0 { - log.Infof("[GCP] Trying to register as provider") + context.CloudProviders[types.GCP] = func() types.CloudProvider { + prepare() + return new(GcpProvider) + } +} + +func prepare() { + if computeClient == nil { + projectId = os.Getenv("GOOGLE_PROJECT_ID") + if len(projectId) == 0 { + panic("[GCP] GOOGLE_PROJECT_ID environment variable is missing") + } + log.Infof("[GCP] Trying to prepare") httpClient, err := google.DefaultClient(ctx.Background(), compute.CloudPlatformScope) if err != nil { - log.Errorf("[GCP] Failed to authenticate, err: %s", err.Error()) - return + panic("[GCP] Failed to authenticate, err: " + err.Error()) } computeClient, err = compute.New(httpClient) if err != nil { - log.Errorf("[GCP] Failed to authenticate, err: %s", err.Error()) - return + panic("[GCP] Failed to authenticate, err: " + err.Error()) } - context.CloudProviders[types.GCP] = new(GcpProvider) - log.Info("[GCP] Successfully registered as provider") - } else { - log.Warn("[GCP] GOOGLE_PROJECT_ID environment variable is missing") + + log.Info("[GCP] Successfully prepared") } } @@ -49,11 +53,18 @@ type GcpProvider struct { } func (p *GcpProvider) GetRunningInstances() ([]*types.Instance, error) { - return getRunningInstancesFilterByLabel(IgnoreLabel) -} - -func (a GcpProvider) GetOwnerLessInstances() ([]*types.Instance, error) { - return getRunningInstancesFilterByLabel(OwnerLabel, IgnoreLabel) + instances := make([]*types.Instance, 0) + instanceList, err := computeClient.Instances.AggregatedList(projectId).Filter("status eq RUNNING").Do() + if err != nil { + log.Errorf("[GCP] Failed to fetch the running instances, err: %s", err.Error()) + return nil, err + } + for _, items := range instanceList.Items { + for _, inst := range items.Instances { + instances = append(instances, newInstance(inst)) + } + } + return instances, nil } func (a GcpProvider) TerminateInstances(instances []*types.Instance) error { @@ -115,23 +126,6 @@ func (a GcpProvider) TerminateInstances(instances []*types.Instance) error { return nil } -func getRunningInstancesFilterByLabel(ignoreLabels ...string) ([]*types.Instance, error) { - instances := make([]*types.Instance, 0) - instanceList, err := computeClient.Instances.AggregatedList(projectId).Filter("status eq RUNNING").Do() - if err != nil { - log.Errorf("[GCP] Failed to fetch the running instances, err: %s", err.Error()) - return nil, err - } - for _, items := range instanceList.Items { - for _, inst := range items.Instances { - if !utils.IsAnyMatch(inst.Labels, ignoreLabels...) { - instances = append(instances, newInstance(inst)) - } - } - } - return instances, nil -} - func getRegions() ([]string, error) { regionList, err := computeClient.Regions.List(projectId).Do() if err != nil { diff --git a/operation/common.go b/operation/common.go index d15155e7..5d17f89a 100644 --- a/operation/common.go +++ b/operation/common.go @@ -8,8 +8,7 @@ import ( "github.com/hortonworks/cloud-cost-reducer/types" ) -func collectInstances(clouds []types.CloudType, - getInstances func(types.CloudProvider) ([]*types.Instance, error)) (chan []*types.Instance, chan error) { +func collectRunningInstances(clouds []types.CloudType) (chan []*types.Instance, chan error) { instsChan := make(chan []*types.Instance, 10) errChan := make(chan error, 5) wg := sync.WaitGroup{} @@ -18,8 +17,8 @@ func collectInstances(clouds []types.CloudType, go func(cloud types.CloudType) { defer wg.Done() - provider := context.CloudProviders[cloud] - instances, err := getInstances(provider) + provider := context.CloudProviders[cloud]() + instances, err := provider.GetRunningInstances() if err != nil { errChan <- err } @@ -34,8 +33,7 @@ func collectInstances(clouds []types.CloudType, return instsChan, errChan } -func waitForInstances(instsChan chan []*types.Instance, errChan chan error, errorMsg string, - filter func([]*types.Instance) []*types.Instance) []*types.Instance { +func waitForInstances(instsChan chan []*types.Instance, errChan chan error, errorMsg string) []*types.Instance { var allInstances = make([]*types.Instance, 0) exit := false for !exit { @@ -45,9 +43,6 @@ func waitForInstances(instsChan chan []*types.Instance, errChan chan error, erro exit = true break } - if filter != nil { - instances = filter(instances) - } allInstances = append(allInstances, instances...) case err, ok := <-errChan: if !ok { @@ -59,3 +54,13 @@ func waitForInstances(instsChan chan []*types.Instance, errChan chan error, erro } return allInstances } + +func filter(instances []*types.Instance, isNeeded func(*types.Instance) bool) []*types.Instance { + filtered := []*types.Instance{} + for _, inst := range instances { + if isNeeded(inst) { + filtered = append(filtered, inst) + } + } + return filtered +} diff --git a/operation/longrunning.go b/operation/longrunning.go index 5361f717..63d3fa97 100644 --- a/operation/longrunning.go +++ b/operation/longrunning.go @@ -8,6 +8,7 @@ import ( log "github.com/Sirupsen/logrus" ctx "github.com/hortonworks/cloud-cost-reducer/context" "github.com/hortonworks/cloud-cost-reducer/types" + "github.com/hortonworks/cloud-cost-reducer/utils" ) var runningPeriod = 24 * time.Hour @@ -29,18 +30,14 @@ type LongRunning struct { } func (o LongRunning) Execute(clouds []types.CloudType) []*types.Instance { - instsChan, errChan := collectInstances(clouds, func(provider types.CloudProvider) ([]*types.Instance, error) { - return provider.GetRunningInstances() - }) - return waitForInstances(instsChan, errChan, "[LONGRUNNING] Failed to collect long running instances", getInstancesRunningLongerThan) + instsChan, errChan := collectRunningInstances(clouds) + instances := waitForInstances(instsChan, errChan, "[LONGRUNNING] Failed to collect long running instances") + return filterInstancesRunningLongerThan(instances) } -func getInstancesRunningLongerThan(instances []*types.Instance) []*types.Instance { - filtered := make([]*types.Instance, 0) - for _, instance := range instances { - if instance.Created.Add(runningPeriod).Before(time.Now()) { - filtered = append(filtered, instance) - } - } - return filtered +func filterInstancesRunningLongerThan(instances []*types.Instance) []*types.Instance { + return filter(instances, func(inst *types.Instance) bool { + ignoreLabel, ok := ctx.IgnoreLabels[inst.CloudType] + return (!ok || !utils.IsAnyMatch(inst.Tags, ignoreLabel)) && inst.Created.Add(runningPeriod).Before(time.Now()) + }) } diff --git a/operation/ownerless.go b/operation/ownerless.go index c1d7c6af..32408a9b 100644 --- a/operation/ownerless.go +++ b/operation/ownerless.go @@ -3,6 +3,7 @@ package operation import ( "github.com/hortonworks/cloud-cost-reducer/context" "github.com/hortonworks/cloud-cost-reducer/types" + "github.com/hortonworks/cloud-cost-reducer/utils" ) func init() { @@ -13,8 +14,20 @@ type Ownerless struct { } func (o Ownerless) Execute(clouds []types.CloudType) []*types.Instance { - instsChan, errChan := collectInstances(clouds, func(provider types.CloudProvider) ([]*types.Instance, error) { - return provider.GetOwnerLessInstances() + instsChan, errChan := collectRunningInstances(clouds) + instances := waitForInstances(instsChan, errChan, "[OWNERLESS] Failed to collect owner less instances") + return filterInstancesWithoutOwner(instances) +} + +func filterInstancesWithoutOwner(instances []*types.Instance) []*types.Instance { + return filter(instances, func(inst *types.Instance) bool { + labels := []string{} + if label, ok := context.IgnoreLabels[inst.CloudType]; ok { + labels = append(labels, label) + } + if label, ok := context.OwnerLabels[inst.CloudType]; ok { + labels = append(labels, label) + } + return !utils.IsAnyMatch(inst.Tags, labels...) }) - return waitForInstances(instsChan, errChan, "[OWNERLESS] Failed to collect owner less instances", nil) } diff --git a/types/cloud.go b/types/cloud.go index 4014a56c..d0ab3a65 100644 --- a/types/cloud.go +++ b/types/cloud.go @@ -14,6 +14,5 @@ const ( type CloudProvider interface { GetRunningInstances() ([]*Instance, error) - GetOwnerLessInstances() ([]*Instance, error) TerminateInstances([]*Instance) error } diff --git a/utils/utils.go b/utils/utils.go index 4b23d36b..481ffdbc 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -6,8 +6,8 @@ import ( "github.com/hortonworks/cloud-cost-reducer/types" ) -func IsAnyMatch(haystack map[string]string, needle ...string) bool { - for _, k := range needle { +func IsAnyMatch(haystack types.Tags, needles ...string) bool { + for _, k := range needles { if _, ok := haystack[k]; ok { return true }