Skip to content

Commit

Permalink
Fix event processing during bootup (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri authored Jul 5, 2022
1 parent 808976e commit 0e07ab3
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 5 deletions.
2 changes: 2 additions & 0 deletions admiral/pkg/apis/admiral/routes/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestReturnSuccessGET(t *testing.T) {
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestGetClusters(t *testing.T) {
RemoteClusters: map[string]*secret.RemoteCluster{},
},
},
StartTime: time.Now(),
},
}
testCases := []struct {
Expand Down
5 changes: 3 additions & 2 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func addUpdateVirtualService(obj *v1alpha3.VirtualService, exist *v1alpha3.Virtu

func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) {
var err error
var op string
var op, diff string
var skipUpdate bool
if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
Expand All @@ -589,7 +590,7 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
exist.Labels = obj.Labels
exist.Annotations = obj.Annotations
op = "Update"
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
skipUpdate, diff = skipDestructiveUpdate(rc, obj, exist)
if diff != "" {
log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
}
Expand Down
3 changes: 3 additions & 0 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: cnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -548,6 +549,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand All @@ -574,6 +576,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
},
StartTime: time.Now(),
},
}

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis
common.InitializeConfig(params)
w := RemoteRegistry{
ctx: ctx,
StartTime: time.Now(),
}

wd := DependencyHandler{
Expand Down
2 changes: 2 additions & 0 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

err := w.deleteCacheController("I don't exit")
Expand All @@ -61,6 +62,7 @@ func TestDeleteCacheController(t *testing.T) {

w := RemoteRegistry{
RemoteControllers: make(map[string]*RemoteController),
StartTime: time.Now(),
}

r := rest.Config{
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s

defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()

if IsCacheWarmupTime(remoteRegistry) {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state")
return nil
}
//create a service entry, destination rule and virtual service in the local cluster
sourceServices := make(map[string]*k8sV1.Service)
sourceWeightedServices := make(map[string]map[string]*WeightedService)
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) {
KubeconfigPath: "testdata/fake.config",
}
rr, _ := InitAdmiral(context.Background(), p)
rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
Expand Down Expand Up @@ -875,6 +876,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {

rr, _ := InitAdmiral(context.Background(), p)

rr.StartTime = time.Now().Add(-60*time.Second)

config := rest.Config{
Host: "localhost",
}
Expand Down Expand Up @@ -1011,6 +1014,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
config := rest.Config{
Host: "localhost",
}
rr.StartTime = time.Now().Add(-60*time.Second)

d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300))

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type RemoteRegistry struct {
secretClient k8s.Interface
ctx context.Context
AdmiralCache *AdmiralCache
StartTime time.Time
}

func (r *RemoteRegistry) shutdown() {
Expand Down
5 changes: 5 additions & 0 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
k8sV1 "k8s.io/api/core/v1"
"strconv"
"strings"
"time"
)

func GetMeshPorts(clusterName string, destService *k8sV1.Service,
Expand Down Expand Up @@ -127,3 +128,7 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error {
}
return nil
}

func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool {
return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration
}
4 changes: 2 additions & 2 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
//wait for 30 seconds for the first time (for all caches to sync)
wait.Until(c.runWorker, 30 * time.Second, stopCh)

wait.Until(c.runWorker, 5 * time.Second, stopCh)
}

func (c *Controller) runWorker() {
Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/controller/admiral/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ func (s *serviceCache) Put(service *k8sV1.Service) {
Service: make(map[string]map[string]*k8sV1.Service),
Identity: s.getKey(service),
}
s.cache[identity] = existing
}
namespaceServices := existing.Service[service.Namespace]
if namespaceServices == nil {
namespaceServices = make(map[string]*k8sV1.Service)
}
namespaceServices[service.Name] = service
existing.Service[service.Namespace] = namespaceServices
s.cache[identity] = existing

}

Expand Down

0 comments on commit 0e07ab3

Please sign in to comment.