diff --git a/pkg/globalnet/controllers/gateway_monitor.go b/pkg/globalnet/controllers/gateway_monitor.go index 50b0df648..8464790ae 100644 --- a/pkg/globalnet/controllers/gateway_monitor.go +++ b/pkg/globalnet/controllers/gateway_monitor.go @@ -37,6 +37,7 @@ import ( "github.com/submariner-io/submariner/pkg/netlink" routeAgent "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" @@ -45,11 +46,12 @@ import ( func NewGatewayMonitor(spec Specification, localCIDRs []string, config *watcher.Config) (Interface, error) { // We'll panic if config is nil, this is intentional gatewayMonitor := &gatewayMonitor{ - baseController: newBaseController(), - spec: spec, - isGatewayNode: false, - localSubnets: stringset.New(localCIDRs...).Elements(), - remoteSubnets: stringset.NewSynchronized(), + baseController: newBaseController(), + spec: spec, + isGatewayNode: false, + localSubnets: stringset.New(localCIDRs...).Elements(), + remoteSubnets: stringset.NewSynchronized(), + remoteEndpointTimeStamp: map[string]metav1.Time{}, } var err error @@ -143,6 +145,14 @@ func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, numR logger.V(log.DEBUG).Infof("In processNextEndpoint, endpoint info: %+v", endpoint) if endpoint.Spec.ClusterID != g.spec.ClusterID { + lastProcessedTime, ok := g.remoteEndpointTimeStamp[endpoint.Spec.ClusterID] + + if ok && lastProcessedTime.After(endpoint.CreationTimestamp.Time) { + logger.Infof("Ignoring new remote %#v since a later endpoint was already"+ + "processed", endpoint) + return false + } + logger.V(log.DEBUG).Infof("Endpoint %q, host: %q belongs to a remote cluster", endpoint.Spec.ClusterID, endpoint.Spec.Hostname) @@ -166,6 +176,8 @@ func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, numR g.markRemoteClusterTraffic(remoteSubnet, AddRules) } + g.remoteEndpointTimeStamp[endpoint.Spec.ClusterID] = endpoint.CreationTimestamp + return false } @@ -211,6 +223,16 @@ func (g *gatewayMonitor) handleCreatedOrUpdatedEndpoint(obj runtime.Object, numR func (g *gatewayMonitor) handleRemovedEndpoint(obj runtime.Object, numRequeues int) bool { endpoint := obj.(*v1.Endpoint) + lastProcessedTime, ok := g.remoteEndpointTimeStamp[endpoint.Spec.ClusterID] + + if ok && lastProcessedTime.After(endpoint.CreationTimestamp.Time) { + logger.Infof("Ignoring deleted remote %#v since a later endpoint was already"+ + "processed", endpoint) + return false + } + + delete(g.remoteEndpointTimeStamp, endpoint.Spec.ClusterID) + logger.V(log.DEBUG).Infof("Informed of removed endpoint for gateway monitor: %v", endpoint) hostname, err := os.Hostname() diff --git a/pkg/globalnet/controllers/types.go b/pkg/globalnet/controllers/types.go index 2a65a5ed5..ad1a4fa20 100644 --- a/pkg/globalnet/controllers/types.go +++ b/pkg/globalnet/controllers/types.go @@ -90,16 +90,17 @@ type baseController struct { type gatewayMonitor struct { *baseController - syncerConfig *syncer.ResourceSyncerConfig - endpointWatcher watcher.Interface - spec Specification - ipt iptables.Interface - isGatewayNode bool - nodeName string - syncMutex sync.Mutex - localSubnets []string - remoteSubnets stringset.Interface - controllers []Interface + syncerConfig *syncer.ResourceSyncerConfig + endpointWatcher watcher.Interface + remoteEndpointTimeStamp map[string]metav1.Time + spec Specification + ipt iptables.Interface + isGatewayNode bool + nodeName string + syncMutex sync.Mutex + localSubnets []string + remoteSubnets stringset.Interface + controllers []Interface } type baseSyncerController struct {