From 006adc690af2234cfe9068ad82c619ba394ac303 Mon Sep 17 00:00:00 2001 From: aattuluri <44482891+aattuluri@users.noreply.github.com> Date: Tue, 5 Jul 2022 16:50:00 -0700 Subject: [PATCH] Fix event processing during bootup (#240) Signed-off-by: sa --- admiral/pkg/apis/admiral/routes/handler_test.go | 2 ++ admiral/pkg/clusters/handler_test.go | 3 +++ admiral/pkg/clusters/registry_test.go | 2 ++ admiral/pkg/clusters/serviceentry.go | 4 ++++ admiral/pkg/clusters/serviceentry_test.go | 4 ++++ admiral/pkg/clusters/util.go | 5 +++++ admiral/pkg/controller/admiral/controller.go | 4 ++-- admiral/pkg/controller/admiral/service.go | 2 +- 8 files changed, 23 insertions(+), 3 deletions(-) diff --git a/admiral/pkg/apis/admiral/routes/handler_test.go b/admiral/pkg/apis/admiral/routes/handler_test.go index f008004b1..82d0e75bd 100644 --- a/admiral/pkg/apis/admiral/routes/handler_test.go +++ b/admiral/pkg/apis/admiral/routes/handler_test.go @@ -15,6 +15,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" ) func TestReturnSuccessGET(t *testing.T) { @@ -48,6 +49,7 @@ func TestGetClusters(t *testing.T) { RemoteClusters: map[string]*secret.RemoteCluster{}, }, }, + StartTime: time.Now(), }, } testCases := []struct { diff --git a/admiral/pkg/clusters/handler_test.go b/admiral/pkg/clusters/handler_test.go index 199dd8706..5151010ac 100644 --- a/admiral/pkg/clusters/handler_test.go +++ b/admiral/pkg/clusters/handler_test.go @@ -529,6 +529,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: cnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } @@ -548,6 +549,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: goodCnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } @@ -574,6 +576,7 @@ func TestHandleVirtualServiceEvent(t *testing.T) { CnameDependentClusterCache: goodCnameCache, SeClusterCache: common.NewMapOfMaps(), }, + StartTime: time.Now(), }, } diff --git a/admiral/pkg/clusters/registry_test.go b/admiral/pkg/clusters/registry_test.go index e263fb6ba..a6fdf9ccf 100644 --- a/admiral/pkg/clusters/registry_test.go +++ b/admiral/pkg/clusters/registry_test.go @@ -48,6 +48,7 @@ func TestDeleteCacheControllerThatDoesntExist(t *testing.T) { w := RemoteRegistry{ RemoteControllers: make(map[string]*RemoteController), + StartTime: time.Now(), } err := w.deleteCacheController("I don't exit") @@ -61,6 +62,7 @@ func TestDeleteCacheController(t *testing.T) { w := RemoteRegistry{ RemoteControllers: make(map[string]*RemoteController), + StartTime: time.Now(), } r := rest.Config{ diff --git a/admiral/pkg/clusters/serviceentry.go b/admiral/pkg/clusters/serviceentry.go index a1cdfc68c..fcedf9805 100644 --- a/admiral/pkg/clusters/serviceentry.go +++ b/admiral/pkg/clusters/serviceentry.go @@ -58,6 +58,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")() + if IsCacheWarmupTime(remoteRegistry) { + log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped during cache warm up state") + return nil + } //create a service entry, destination rule and virtual service in the local cluster sourceServices := make(map[string]*k8sV1.Service) sourceWeightedServices := make(map[string]map[string]*WeightedService) diff --git a/admiral/pkg/clusters/serviceentry_test.go b/admiral/pkg/clusters/serviceentry_test.go index 7b8e0c186..a8ed9189d 100644 --- a/admiral/pkg/clusters/serviceentry_test.go +++ b/admiral/pkg/clusters/serviceentry_test.go @@ -289,6 +289,7 @@ func TestCreateServiceEntryForNewServiceOrPod(t *testing.T) { KubeconfigPath: "testdata/fake.config", } rr, _ := InitAdmiral(context.Background(), p) + rr.StartTime = time.Now().Add(-60*time.Second) config := rest.Config{ Host: "localhost", @@ -875,6 +876,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) { rr, _ := InitAdmiral(context.Background(), p) + rr.StartTime = time.Now().Add(-60*time.Second) + config := rest.Config{ Host: "localhost", } @@ -1011,6 +1014,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) { config := rest.Config{ Host: "localhost", } + rr.StartTime = time.Now().Add(-60*time.Second) d, e := admiral.NewDeploymentController("", make(chan struct{}), &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300)) diff --git a/admiral/pkg/clusters/util.go b/admiral/pkg/clusters/util.go index eff4069e5..e4f1550b1 100644 --- a/admiral/pkg/clusters/util.go +++ b/admiral/pkg/clusters/util.go @@ -10,6 +10,7 @@ import ( k8sV1 "k8s.io/api/core/v1" "strconv" "strings" + "time" ) func GetMeshPorts(clusterName string, destService *k8sV1.Service, @@ -127,3 +128,7 @@ func ValidateConfigmapBeforePutting(cm *k8sV1.ConfigMap) error { } return nil } + +func IsCacheWarmupTime(remoteRegistry *RemoteRegistry) bool { + return time.Since(remoteRegistry.StartTime) < common.GetAdmiralParams().CacheRefreshDuration +} diff --git a/admiral/pkg/controller/admiral/controller.go b/admiral/pkg/controller/admiral/controller.go index c0c43f9d5..8457394fe 100644 --- a/admiral/pkg/controller/admiral/controller.go +++ b/admiral/pkg/controller/admiral/controller.go @@ -102,8 +102,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys()) - //wait for 30 seconds for the first time (for all caches to sync) - wait.Until(c.runWorker, 30 * time.Second, stopCh) + + wait.Until(c.runWorker, 5 * time.Second, stopCh) } func (c *Controller) runWorker() { diff --git a/admiral/pkg/controller/admiral/service.go b/admiral/pkg/controller/admiral/service.go index 5d39b732a..0f998c521 100644 --- a/admiral/pkg/controller/admiral/service.go +++ b/admiral/pkg/controller/admiral/service.go @@ -60,7 +60,6 @@ func (s *serviceCache) Put(service *k8sV1.Service) { Service: make(map[string]map[string]*k8sV1.Service), Identity: s.getKey(service), } - s.cache[identity] = existing } namespaceServices := existing.Service[service.Namespace] if namespaceServices == nil { @@ -68,6 +67,7 @@ func (s *serviceCache) Put(service *k8sV1.Service) { } namespaceServices[service.Name] = service existing.Service[service.Namespace] = namespaceServices + s.cache[identity] = existing }