Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for concurrent map iteration and map write #197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions admiral/pkg/apis/admiral/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package routes
import (
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"log"
"net/http"
"strings"

"github.com/gorilla/mux"
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
)

type RouteOpts struct {
Expand Down Expand Up @@ -126,16 +128,18 @@ func (opts *RouteOpts) GetServiceEntriesByIdentity(w http.ResponseWriter, r *htt

if identity != "" {

for cname, serviceCluster := range opts.RemoteRegistry.AdmiralCache.SeClusterCache.Map() {
m := opts.RemoteRegistry.AdmiralCache.SeClusterCache

m.Range(func(cname string, serviceCluster *common.Map) {
if strings.Contains(cname, identity) {
var identityServiceEntry IdentityServiceEntry
identityServiceEntry.Cname = cname
for _, clusterId := range serviceCluster.Map() {
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterId)
}
serviceCluster.Range(func(k string, clusterID string) {
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterID)
})
response = append(response, identityServiceEntry)
}
}
})
out, err := json.Marshal(response)
if err != nil {
log.Printf("Failed to marshall response GetServiceEntriesByIdentity call")
Expand Down
106 changes: 54 additions & 52 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package clusters
import (
"bytes"
"fmt"
"reflect"
"sort"
"strings"
"time"

argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/gogo/protobuf/types"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
Expand All @@ -16,10 +21,6 @@ import (
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"sort"
"strings"
"time"
)

const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
Expand All @@ -45,8 +46,8 @@ type SidecarHandler struct {
}

type WeightedService struct {
Weight int32
Service *k8sV1.Service
Weight int32
Service *k8sV1.Service
}

func updateIdentityDependencyCache(sourceIdentity string, identityDependencyCache *common.MapOfMaps, dr *v1.Dependency) {
Expand All @@ -70,9 +71,9 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr
processGtp = false
}
outlierDetection := &v1alpha32.OutlierDetection{
BaseEjectionTime: &types.Duration{Seconds: 300},
BaseEjectionTime: &types.Duration{Seconds: 300},
Consecutive_5XxErrors: &types.UInt32Value{Value: uint32(10)},
Interval: &types.Duration{Seconds: 60},
Interval: &types.Duration{Seconds: 60},
}
if gtpTrafficPolicy != nil && processGtp {
var loadBalancerSettings = &v1alpha32.LoadBalancerSettings{
Expand Down Expand Up @@ -108,52 +109,52 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr

func (se *ServiceEntryHandler) Added(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Updated(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (se *ServiceEntryHandler) Deleted(obj *v1alpha3.ServiceEntry) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
}

func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Add, common.DestinationRule)
}

func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Update, common.DestinationRule)
}

func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
handleDestinationRuleEvent(obj, dh, common.Delete, common.DestinationRule)
}

func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Add, common.VirtualService)
Expand All @@ -164,7 +165,7 @@ func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {

func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Update, common.VirtualService)
Expand All @@ -175,7 +176,7 @@ func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {

func (vh *VirtualServiceHandler) Deleted(obj *v1alpha3.VirtualService) {
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
return
}
err := handleVirtualServiceEvent(obj, vh, common.Delete, common.VirtualService)
Expand All @@ -192,7 +193,7 @@ func (dh *SidecarHandler) Deleted(obj *v1alpha3.Sidecar) {}

func IgnoreIstioResource(exportTo []string, annotations map[string]string, namespace string) bool {

if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
return true
}

Expand Down Expand Up @@ -225,9 +226,9 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

r := dh.RemoteRegistry

dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host)
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy()

if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
if len(dependentClusters) > 0 {

log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")

Expand All @@ -240,7 +241,7 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu

allDependentClusters := make(map[string]string)

util.MapCopy(allDependentClusters, dependentClusters.Map())
util.MapCopy(allDependentClusters, dependentClusters)

allDependentClusters[clusterId] = clusterId

Expand Down Expand Up @@ -416,7 +417,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}

//check if this virtual service is used by Argo rollouts for canary strategy, if so, update the corresponding SE with appropriate weights
if common.GetAdmiralParams().ArgoRolloutsEnabled {
if common.GetAdmiralParams().ArgoRolloutsEnabled {
rollouts, err := vh.RemoteRegistry.RemoteControllers[clusterId].RolloutController.RolloutClient.Rollouts(obj.Namespace).List(v12.ListOptions{})

if err != nil {
Expand All @@ -432,11 +433,11 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}
}

dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0])
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0]).Copy()

if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
if len(dependentClusters) > 0 {

for _, dependentCluster := range dependentClusters.Map() {
for _, dependentCluster := range dependentClusters {

rc := r.RemoteControllers[dependentCluster]

Expand Down Expand Up @@ -480,7 +481,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
}
return nil
} else {
log.Infof(LogFormat,"Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
}

//copy the VirtualService `as is` if they are not generated by Admiral (not in CnameDependentClusterCache)
Expand Down Expand Up @@ -510,7 +511,7 @@ func addUpdateVirtualService(obj *v1alpha3.VirtualService, exist *v1alpha3.Virtu
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || len(exist.Spec.Hosts) == 0 {
obj.Namespace = namespace
obj.ResourceVersion = ""
Expand All @@ -537,20 +538,20 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || exist.Spec.Hosts == nil {
obj.Namespace = namespace
obj.ResourceVersion = ""
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj)
op = "Add"
log.Infof(LogFormat + " SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
log.Infof(LogFormat+" SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
} else {
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
op = "Update"
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
if diff != "" {
log.Infof(LogFormat + " diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
}
if skipUpdate {
log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase")
Expand All @@ -573,7 +574,7 @@ func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old
skipDestructive = false
destructive, diff := getServiceEntryDiff(new, old)
//do not update SEs during bootup phase if they are destructive
if time.Since(rc.StartTime) < (2 * common.GetAdmiralParams().CacheRefreshDuration) && destructive {
if time.Since(rc.StartTime) < (2*common.GetAdmiralParams().CacheRefreshDuration) && destructive {
skipDestructive = true
}

Expand Down Expand Up @@ -603,17 +604,17 @@ func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry)
found[nEndpoint.Address] = "1"
if !reflect.DeepEqual(val, nEndpoint) {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
}
} else {
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
}
}

for key := range oldEndpointMap {
if _, ok := found[key]; !ok {
destructive = true
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
}
}

Expand All @@ -638,7 +639,7 @@ func addUpdateDestinationRule(obj *v1alpha3.DestinationRule, exist *v1alpha3.Des
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
if exist == nil || exist.Name == "" || exist.Spec.Host == "" {
obj.Namespace = namespace
obj.ResourceVersion = ""
Expand Down Expand Up @@ -714,22 +715,24 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return matchedService
}

func getDependentClusters(dependents *common.Map, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
func getDependentClusters(dependents map[string]string, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
var dependentClusters = make(map[string]string)
//TODO optimize this map construction
if dependents != nil {
for identity, clusters := range identityClusterCache.Map() {
for depIdentity := range dependents.Map() {
if identity == depIdentity {
for _, clusterId := range clusters.Map() {
_, ok := sourceServices[clusterId]
if !ok {
dependentClusters[clusterId] = clusterId
}
}
}
}

if dependents == nil {
return dependentClusters
}

for depIdentity := range dependents {
clusters := identityClusterCache.Get(depIdentity)
if clusters == nil {
continue
}
clusters.Range(func(k string, clusterID string) {
_, ok := sourceServices[clusterID]
if !ok {
dependentClusters[clusterID] = clusterID
}
})
}
return dependentClusters
}
Expand Down Expand Up @@ -761,7 +764,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
return nil
}

var canaryService, stableService, virtualServiceRouteName string
var canaryService, stableService, virtualServiceRouteName string

var istioCanaryWeights = make(map[string]int32)

Expand Down Expand Up @@ -827,7 +830,6 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)


//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

Expand Down
Loading