diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 11ee1f54b0e..71896837a62 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -88,6 +88,9 @@ featureGates: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "L7FlowExporter" "default" false) }} +# Enable NodeLatencyMonitor to monitor the latency between Nodes. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodeLatencyMonitor" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/crds/nodelatencymonitor.yaml b/build/charts/antrea/crds/nodelatencymonitor.yaml new file mode 100644 index 00000000000..3a90d6acd48 --- /dev/null +++ b/build/charts/antrea/crds/nodelatencymonitor.yaml @@ -0,0 +1,48 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 05ca58c1ab6..25bce70e3d5 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -174,6 +174,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5581d58a19a..73dcb502d4a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-agent @@ -5158,7 +5213,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index b41fc8e8e16..c449e2ced17 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -2667,6 +2667,55 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: name: supportbundlecollections.crd.antrea.io spec: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index e60b8f5c8f9..d0435aef30f 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-agent @@ -5159,7 +5214,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 47a8888bb99a5b1a08dea61e9315bacf613d869d718712ad0eb9964bb73dc0ec labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 89bc22a6b17..8401b0a94b0 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce labels: app: antrea component: antrea-agent @@ -5156,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: 47c353c05a3d0d6da5534e0cd1202fefb175fff41421eb3517bc9f3dd084e2ce labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 8fd77921733..91964dd82a3 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3637,6 +3688,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4272,6 +4326,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4933,7 +4988,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5215,7 +5270,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: ce4457f4d8a5bda332dbdd877c15ad152e3bbbcb0c9626c57ccd17518e407562 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index b7d129b0c88..53b32787a9b 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2686,6 +2686,57 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/nodelatencymonitor.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: nodelatencymonitors.crd.antrea.io +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - pingIntervalSeconds + properties: + pingIntervalSeconds: + type: integer + format: int32 + minimum: 1 + description: "Ping interval in seconds, must be at least 1." + default: 60 + metadata: + type: object + properties: + name: + type: string + pattern: '^default$' + additionalPrinterColumns: + - description: Specifies the interval between pings. + jsonPath: .spec.pingIntervalSeconds + name: PingIntervalSeconds + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + scope: Cluster + names: + plural: nodelatencymonitors + singular: nodelatencymonitor + kind: NodeLatencyMonitor + shortNames: + - nlm + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -3624,6 +3675,9 @@ data: # Enable L7FlowExporter on Pods and Namespaces to export the application layer flows such as HTTP flows. # L7FlowExporter: false + # Enable NodeLatencyMonitor to monitor the latency between Nodes. + # NodeLatencyMonitor: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4259,6 +4313,7 @@ rules: - externalippools - ippools - trafficcontrols + - nodelatencymonitors verbs: - get - watch @@ -4920,7 +4975,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 labels: app: antrea component: antrea-agent @@ -5156,7 +5211,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 7c86f86246f3b203e46613dacbd724e9ca8eae3428451acdc7bfe54123deb534 labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ea3a0e5fec5..246a1f19a0c 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -52,6 +52,7 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/metrics" + "antrea.io/antrea/pkg/agent/monitortool" "antrea.io/antrea/pkg/agent/multicast" mcroute "antrea.io/antrea/pkg/agent/multicluster" "antrea.io/antrea/pkg/agent/nodeip" @@ -123,6 +124,7 @@ func run(o *Options) error { endpointsInformer := informerFactory.Core().V1().Endpoints() endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices() namespaceInformer := informerFactory.Core().V1().Namespaces() + nodeLatencyMonitorInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -929,6 +931,17 @@ func run(o *Options) error { go flowExporter.Run(stopCh) } + // Start the node latency monitor. + if features.DefaultFeatureGate.Enabled(features.NodeLatencyMonitor) && o.nodeType == config.K8sNode { + nodeLatencyMonitor := monitortool.NewNodeLatencyMonitor( + nodeInformer, + nodeLatencyMonitorInformer, + nodeConfig, + networkConfig.TrafficEncapMode, + ) + go nodeLatencyMonitor.Run(stopCh) + } + <-stopCh klog.Info("Stopping Antrea agent") return nil diff --git a/pkg/agent/monitortool/latency_store.go b/pkg/agent/monitortool/latency_store.go new file mode 100644 index 00000000000..78882a72798 --- /dev/null +++ b/pkg/agent/monitortool/latency_store.go @@ -0,0 +1,235 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "errors" + "net" + "sync" + "time" + + "github.com/containernetworking/plugins/pkg/ip" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/util/k8s" +) + +// LatencyStore is a store for latency information of connections between Nodes. +type LatencyStore struct { + // Lock for the latency store + mutex sync.RWMutex + + // Whether the agent is running in networkPolicyOnly mode + isNetworkPolicyOnly bool + // The map of Node IP to latency entry, it will be changed by latency monitor + nodeIPLatencyMap map[string]*NodeIPLatencyEntry + // The map of Node name to Node IP(s), it will be changed by Node watcher + // If the agent is running in networkPolicyOnly mode, the value will be the transport IP of the Node. + // Otherwise, the value will be the gateway IP of the Node + nodeTargetIPsMap map[string][]net.IP +} + +// NodeIPLatencyEntry is the entry of the latency map. +type NodeIPLatencyEntry struct { + // The timestamp of the last sent packet + LastSendTime time.Time + // The timestamp of the last received packet + LastRecvTime time.Time + // The last valid rtt of the connection + LastMeasuredRTT time.Duration +} + +// NewLatencyStore creates a new LatencyStore. +func NewLatencyStore(isNetworkPolicyOnly bool) *LatencyStore { + store := &LatencyStore{ + nodeIPLatencyMap: make(map[string]*NodeIPLatencyEntry), + nodeTargetIPsMap: make(map[string][]net.IP), + isNetworkPolicyOnly: isNetworkPolicyOnly, + } + + return store +} + +// getNodeIPLatencyEntry returns the NodeIPLatencyEntry for the given Node IP +// For now, it is only used for testing purposes. +func (s *LatencyStore) getNodeIPLatencyEntry(nodeIP string) (NodeIPLatencyEntry, bool) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + entry, ok := s.nodeIPLatencyMap[nodeIP] + if !ok { + return NodeIPLatencyEntry{}, ok + } + + return *entry, ok +} + +// SetNodeIPLatencyEntry sets the NodeIPLatencyEntry for the given Node IP +func (s *LatencyStore) SetNodeIPLatencyEntry(nodeIP string, mutator func(entry *NodeIPLatencyEntry)) { + s.mutex.Lock() + defer s.mutex.Unlock() + + entry, ok := s.nodeIPLatencyMap[nodeIP] + if !ok { + entry = &NodeIPLatencyEntry{} + s.nodeIPLatencyMap[nodeIP] = entry + } + + mutator(entry) +} + +// addNode adds a Node to the latency store +func (s *LatencyStore) addNode(node *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.updateNodeMap(node) +} + +// deleteNode deletes a Node from the latency store +func (s *LatencyStore) deleteNode(node *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.nodeTargetIPsMap, node.Name) +} + +// updateNode updates a Node name in the latency store +func (s *LatencyStore) updateNode(new *corev1.Node) { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Node name will not be changed in the same Node update operation. + s.updateNodeMap(new) +} + +// updateNodeMap updates the nodeTargetIPsMap with the IPs of the given Node. +func (s *LatencyStore) updateNodeMap(node *corev1.Node) { + nodeIPs, err := s.getNodeIPs(node) + if err != nil { + klog.ErrorS(err, "Failed to get IPs for Node", "nodeName", node.Name) + return + } + + s.nodeTargetIPsMap[node.Name] = nodeIPs +} + +// getNodeIPs returns the target IPs of the given Node based on the agent mode. +func (s *LatencyStore) getNodeIPs(node *corev1.Node) ([]net.IP, error) { + if s.isNetworkPolicyOnly { + transportIPs, err := getTransportIPs(node) + if err != nil { + return nil, err + } + + return transportIPs, nil + } else { + gw0IPs, err := getGWIPs(node) + if err != nil { + return nil, err + } + + return gw0IPs, nil + } +} + +// getTransportIPs returns the transport IPs of the given Node. +func getTransportIPs(node *corev1.Node) ([]net.IP, error) { + var transportIPs []net.IP + ips, err := k8s.GetNodeTransportAddrs(node) + if err != nil { + return transportIPs, err + } + + if ips.IPv4 != nil { + transportIPs = append(transportIPs, ips.IPv4) + } + if ips.IPv6 != nil { + transportIPs = append(transportIPs, ips.IPv6) + } + + return transportIPs, nil +} + +// getGWIPs returns the gateway IPs of the given Node. +func getGWIPs(node *corev1.Node) ([]net.IP, error) { + var gwIPs []net.IP + + podCIDRStrs := getPodCIDRsOnNode(node) + if len(podCIDRStrs) == 0 { + return nil, errors.New("node does not have a PodCIDR") + } + + for _, podCIDR := range podCIDRStrs { + peerPodCIDRAddr, _, err := net.ParseCIDR(podCIDR) + if err != nil { + return nil, err + } + + // Add first IP in CIDR to the map + peerGatewayIP := ip.NextIP(peerPodCIDRAddr) + gwIPs = append(gwIPs, peerGatewayIP) + } + + return gwIPs, nil +} + +// getPodCIDRsOnNode returns the PodCIDRs of the given Node. +func getPodCIDRsOnNode(node *corev1.Node) []string { + if node.Spec.PodCIDRs != nil { + return node.Spec.PodCIDRs + } + + if node.Spec.PodCIDR == "" { + return nil + } + return []string{node.Spec.PodCIDR} +} + +// ListNodeIPs returns the list of all Node IPs in the latency store. +func (s *LatencyStore) ListNodeIPs() []net.IP { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // Allocate a slice with a capacity equal to twice the size of the map, + // as we can have up to 2 IP addresses per Node in dual-stack case. + nodeIPs := make([]net.IP, 0, 2*len(s.nodeTargetIPsMap)) + for _, ips := range s.nodeTargetIPsMap { + nodeIPs = append(nodeIPs, ips...) + } + + return nodeIPs +} + +// DeleteStaleNodeIPs deletes the stale Node IPs from the nodeIPLatencyMap. +func (s *LatencyStore) DeleteStaleNodeIPs() { + s.mutex.Lock() + defer s.mutex.Unlock() + + nodeIPSet := sets.New[string]() + for _, ips := range s.nodeTargetIPsMap { + for _, ip := range ips { + nodeIPSet.Insert(ip.String()) + } + } + + for nodeIP := range s.nodeIPLatencyMap { + if !nodeIPSet.Has(nodeIP) { + delete(s.nodeIPLatencyMap, nodeIP) + } + } +} diff --git a/pkg/agent/monitortool/latency_store_test.go b/pkg/agent/monitortool/latency_store_test.go new file mode 100644 index 00000000000..8ffdd96a7d5 --- /dev/null +++ b/pkg/agent/monitortool/latency_store_test.go @@ -0,0 +1,175 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + entry = NodeIPLatencyEntry{ + LastSendTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastRecvTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastMeasuredRTT: 1 * time.Second, + } + entry2 = NodeIPLatencyEntry{ + LastSendTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastRecvTime: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + LastMeasuredRTT: 2 * time.Second, + } +) + +func TestLatencyStore_getNodeIPLatencyEntry(t *testing.T) { + tests := []struct { + key string + expectedEntry NodeIPLatencyEntry + }{ + { + key: "10.244.2.1", + expectedEntry: entry, + }, + { + key: "10.244.2.2", + expectedEntry: NodeIPLatencyEntry{}, + }, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + } + + entry, _ := latencyStore.getNodeIPLatencyEntry(tt.key) + assert.Equal(t, tt.expectedEntry.LastMeasuredRTT, entry.LastMeasuredRTT) + assert.Equal(t, tt.expectedEntry.LastSendTime, entry.LastSendTime) + assert.Equal(t, tt.expectedEntry.LastRecvTime, entry.LastRecvTime) + }) + } +} + +func TestLatencyStore_SetNodeIPLatencyEntry(t *testing.T) { + tests := []struct { + key string + updatedEntry NodeIPLatencyEntry + expectedEntry NodeIPLatencyEntry + }{ + { + key: "10.244.2.1", + updatedEntry: entry, + expectedEntry: entry, + }, + { + key: "10.244.2.2", + updatedEntry: entry2, + expectedEntry: entry2, + }, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + } + + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastSendTime = tt.updatedEntry.LastSendTime + entry.LastRecvTime = tt.updatedEntry.LastRecvTime + entry.LastMeasuredRTT = tt.updatedEntry.LastMeasuredRTT + } + latencyStore.SetNodeIPLatencyEntry(tt.key, mutator) + entry, ok := latencyStore.getNodeIPLatencyEntry(tt.key) + assert.Equal(t, tt.expectedEntry, entry) + assert.True(t, ok) + }) + } +} + +func TestLatencyStore_DeleteStaleNodeIPs(t *testing.T) { + testKey := "10.244.2.1" + + latencyStore := &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + testKey: &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP(testKey)}, + }, + } + + // Remove Node + latencyStore.deleteNode(&corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "Node1", + }, + }) + + // Check that the entry is still present + _, ok := latencyStore.getNodeIPLatencyEntry(testKey) + assert.True(t, ok) + + // Check if that the entry has been deleted + latencyStore.DeleteStaleNodeIPs() + _, ok = latencyStore.getNodeIPLatencyEntry(testKey) + assert.False(t, ok) +} + +func TestLatencyStore_ListNodeIPs(t *testing.T) { + tests := []struct { + latentStore *LatencyStore + expectedList []net.IP + }{ + { + latentStore: &LatencyStore{ + isNetworkPolicyOnly: false, + nodeIPLatencyMap: map[string]*NodeIPLatencyEntry{ + "10.244.2.1": &entry, + }, + nodeTargetIPsMap: map[string][]net.IP{ + "Node1": {net.ParseIP("10.244.2.1")}, + }, + }, + expectedList: []net.IP{ + net.ParseIP("10.244.2.1"), + }, + }, + } + + for _, tt := range tests { + t.Run("List Node IPs", func(t *testing.T) { + nodeIPs := tt.latentStore.ListNodeIPs() + assert.Equal(t, tt.expectedList, nodeIPs) + }) + } +} diff --git a/pkg/agent/monitortool/monitor.go b/pkg/agent/monitortool/monitor.go new file mode 100644 index 00000000000..efd6693535d --- /dev/null +++ b/pkg/agent/monitortool/monitor.go @@ -0,0 +1,469 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "math/rand" + "net" + "sync" + "sync/atomic" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + corev1 "k8s.io/api/core/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" +) + +var ( + icmpSeq uint32 + // #nosec G404: random number generator not used for security purposes. + icmpEchoID = rand.Int31n(1 << 16) +) + +const ( + ipv4ProtocolICMPRaw = "ip4:icmp" + ipv6ProtocolICMPRaw = "ip6:ipv6-icmp" + protocolICMP = 1 + protocolICMPv6 = 58 +) + +// getICMPSeq returns the next sequence number as uint16, +// wrapping around to 0 after reaching the maximum value of uint16. +func getICMPSeq() uint16 { + // Increment the sequence number atomically and get the new value. + // We use atomic.AddUint32 and pass 1 as the increment. + // The returned value is the new value post-increment. + newVal := atomic.AddUint32(&icmpSeq, 1) + + return uint16(newVal) +} + +// NodeLatencyMonitor is a tool to monitor the latency of the Node. +type NodeLatencyMonitor struct { + // latencyStore is the cache to store the latency of each Nodes. + latencyStore *LatencyStore + // latencyConfigChanged is the channel to notify the latency config changed. + latencyConfigChanged chan latencyConfig + // isIPv4Enabled is the flag to indicate whether the IPv4 is enabled. + isIPv4Enabled bool + // isIPv6Enabled is the flag to indicate whether the IPv6 is enabled. + isIPv6Enabled bool + + // nodeName is the name of the current Node, used to filter out the current Node from the latency monitor. + nodeName string + nodeInformer coreinformers.NodeInformer + nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer +} + +// latencyConfig is the config for the latency monitor. +type latencyConfig struct { + // Enable is the flag to enable the latency monitor. + Enable bool + // Interval is the interval time to ping all Nodes. + Interval time.Duration +} + +// NewNodeLatencyMonitor creates a new NodeLatencyMonitor. +func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer, + nlmInformer crdinformers.NodeLatencyMonitorInformer, + nodeConfig *config.NodeConfig, + trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor { + m := &NodeLatencyMonitor{ + latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), + latencyConfigChanged: make(chan latencyConfig), + nodeInformer: nodeInformer, + nodeLatencyMonitorInformer: nlmInformer, + nodeName: nodeConfig.Name, + } + + m.isIPv4Enabled, _ = config.IsIPv4Enabled(nodeConfig, trafficEncapMode) + m.isIPv6Enabled, _ = config.IsIPv6Enabled(nodeConfig, trafficEncapMode) + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.onNodeAdd, + UpdateFunc: m.onNodeUpdate, + DeleteFunc: m.onNodeDelete, + }) + + nlmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: m.onNodeLatencyMonitorAdd, + UpdateFunc: m.onNodeLatencyMonitorUpdate, + DeleteFunc: m.onNodeLatencyMonitorDelete, + }) + + return m +} + +// Is current node +func (m *NodeLatencyMonitor) isCurrentNode(node *corev1.Node) bool { + return node.Name == m.nodeName +} + +// onNodeAdd is the event handler for adding Node. +func (m *NodeLatencyMonitor) onNodeAdd(obj interface{}) { + node := obj.(*corev1.Node) + if m.isCurrentNode(node) { + return + } + + m.latencyStore.addNode(node) + + klog.V(4).InfoS("Node added", "Node", klog.KObj(node)) +} + +// onNodeUpdate is the event handler for updating Node. +func (m *NodeLatencyMonitor) onNodeUpdate(oldObj, newObj interface{}) { + node := newObj.(*corev1.Node) + if m.isCurrentNode(node) { + return + } + + m.latencyStore.updateNode(node) + + klog.V(4).InfoS("Node updated", "Node", klog.KObj(node)) +} + +// onNodeDelete is the event handler for deleting Node. +func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) { + node, ok := obj.(*corev1.Node) + if !ok { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.ErrorS(nil, "Received unexpected object", "obj", obj) + return + } + node, ok = deletedState.Obj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Node object", "obj", deletedState.Obj) + return + } + } + + if m.isCurrentNode(node) { + return + } + + m.latencyStore.deleteNode(node) +} + +// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) { + nlm := obj.(*v1alpha1.NodeLatencyMonitor) + klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) + + m.updateLatencyConfig(nlm) +} + +// onNodeLatencyMonitorUpdate is the event handler for updating NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorUpdate(oldObj, newObj interface{}) { + oldNLM := oldObj.(*v1alpha1.NodeLatencyMonitor) + newNLM := newObj.(*v1alpha1.NodeLatencyMonitor) + klog.V(4).InfoS("NodeLatencyMonitor updated", "NodeLatencyMonitor", klog.KObj(newNLM)) + + if oldNLM.GetGeneration() == newNLM.GetGeneration() { + return + } + + m.updateLatencyConfig(newNLM) +} + +// updateLatencyConfig updates the latency config based on the NodeLatencyMonitor CRD. +func (m *NodeLatencyMonitor) updateLatencyConfig(nlm *v1alpha1.NodeLatencyMonitor) { + pingInterval := time.Duration(nlm.Spec.PingIntervalSeconds) * time.Second + + latencyConfig := latencyConfig{ + Enable: true, + Interval: pingInterval, + } + + m.latencyConfigChanged <- latencyConfig +} + +// onNodeLatencyMonitorDelete is the event handler for deleting NodeLatencyMonitor. +func (m *NodeLatencyMonitor) onNodeLatencyMonitorDelete(obj interface{}) { + klog.V(4).InfoS("NodeLatencyMonitor deleted") + latencyConfig := latencyConfig{Enable: false} + + m.latencyConfigChanged <- latencyConfig +} + +// sendPing sends an ICMP message to the target IP address. +func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error { + var requestType icmp.Type + + ip := &net.IPAddr{IP: addr} + + if addr.To4() == nil { + requestType = ipv6.ICMPTypeEchoRequest + } else { + requestType = ipv4.ICMPTypeEcho + } + + timeStart := time.Now() + seqID := getICMPSeq() + body := &icmp.Echo{ + ID: int(icmpEchoID), + Seq: int(seqID), + Data: []byte(timeStart.Format(time.RFC3339Nano)), + } + msg := icmp.Message{ + Type: requestType, + Code: 0, + Body: body, + } + klog.V(4).InfoS("Sending ICMP message", "IP", ip, "SeqID", seqID, "body", body) + + // Serialize the ICMP message + msgBytes, err := msg.Marshal(nil) + if err != nil { + return err + } + + // Send the ICMP message + _, err = socket.WriteTo(msgBytes, ip) + if err != nil { + return err + } + + // Create or update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastSendTime = timeStart + } + m.latencyStore.SetNodeIPLatencyEntry(addr.String(), mutator) + + return nil +} + +// recvPing receives an ICMP message from the target IP address. +func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { + // We only expect small packets, if we receive a larger packet, we will drop the extra data. + readBuffer := make([]byte, 128) + for { + n, peer, err := socket.ReadFrom(readBuffer) + if err != nil { + // When the socket is closed in the Run method, this error will be logged, which is not ideal. + // In the future, we may try setting a ReadDeadline on the socket before each ReadFrom and using + // a channel to signal that the loop should terminate. + klog.ErrorS(err, "Failed to read ICMP message") + return + } + + destIP := peer.String() + + // Parse the ICMP message + var msg *icmp.Message + if isIPv4 { + msg, err = icmp.ParseMessage(protocolICMP, readBuffer[:n]) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + continue + } + if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + continue + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv4.ICMPTypeEcho { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + continue + } + } else { + msg, err = icmp.ParseMessage(protocolICMPv6, readBuffer[:n]) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + continue + } + if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + continue + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv6.ICMPTypeEchoRequest { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + continue + } + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") + continue + } + if echo.ID != int(icmpEchoID) { + klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) + continue + } + + klog.V(4).InfoS("Received ICMP message", "IP", destIP, "msg", msg) + + // Parse the time from the ICMP data + sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) + if err != nil { + klog.ErrorS(err, "Failed to parse time from ICMP data") + continue + } + + // Calculate the round-trip time + end := time.Now() + rtt := end.Sub(sentTime) + klog.V(4).InfoS("Updating latency entry for Node IP", "IP", destIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) + + // Update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastRecvTime = end + entry.LastMeasuredRTT = rtt + } + m.latencyStore.SetNodeIPLatencyEntry(destIP, mutator) + } +} + +// pingAll sends ICMP messages to all the Nodes. +func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { + klog.V(4).InfoS("Pinging all Nodes") + nodeIPs := m.latencyStore.ListNodeIPs() + for _, toIP := range nodeIPs { + if toIP.To4() != nil && ipv4Socket != nil { + if err := m.sendPing(ipv4Socket, toIP); err != nil { + klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) + } + } else if toIP.To16() != nil && ipv6Socket != nil { + if err := m.sendPing(ipv6Socket, toIP); err != nil { + klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) + } + } else { + klog.ErrorS(nil, "Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) + } + } + klog.V(4).InfoS("Done pinging all Nodes") +} + +// Run starts the NodeLatencyMonitor. +func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { + go m.monitorLoop(stopCh) + + <-stopCh +} + +// monitorLoop is the main loop to monitor the latency of the Node. +func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { + klog.InfoS("NodeLatencyMonitor is running") + // Low level goroutine to handle ping loop + var ticker *time.Ticker + var tickerCh <-chan time.Time + var ipv4Socket, ipv6Socket net.PacketConn + var err error + + defer func() { + if ipv4Socket != nil { + ipv4Socket.Close() + } + if ipv6Socket != nil { + ipv6Socket.Close() + } + if ticker != nil { + ticker.Stop() + } + }() + + // Update current ticker based on the latencyConfig + updateTicker := func(interval time.Duration) { + if ticker != nil { + ticker.Stop() // Stop the current ticker + } + ticker = time.NewTicker(interval) + tickerCh = ticker.C + } + + wg := sync.WaitGroup{} + // Start the pingAll goroutine + for { + select { + case <-tickerCh: + // Try to send pingAll signal + m.pingAll(ipv4Socket, ipv6Socket) + // We no not delete IPs from nodeIPLatencyMap as part of the Node delete event handler + // to avoid consistency issues and because it would not be sufficient to avoid stale entries completely. + // This means that we have to periodically invoke DeleteStaleNodeIPs to avoid stale entries in the map. + m.latencyStore.DeleteStaleNodeIPs() + case <-stopCh: + return + case latencyConfig := <-m.latencyConfigChanged: + // Start or stop the pingAll goroutine based on the latencyConfig + if latencyConfig.Enable { + // latencyConfig changed + updateTicker(latencyConfig.Interval) + + // If the recvPing socket is closed, + // recreate it if it is closed(CRD is deleted). + if ipv4Socket == nil && m.isIPv4Enabled { + // Create a new socket for IPv4 when it is IPv4-only + ipv4Socket, err = icmp.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") + if err != nil { + klog.ErrorS(err, "Failed to create ICMP socket for IPv4") + return + } + wg.Add(1) + go func() { + defer wg.Done() + m.recvPing(ipv4Socket, true) + }() + } + if ipv6Socket == nil && m.isIPv6Enabled { + // Create a new socket for IPv6 when it is IPv6-only + ipv6Socket, err = icmp.ListenPacket(ipv6ProtocolICMPRaw, "::") + if err != nil { + klog.ErrorS(err, "Failed to create ICMP socket for IPv6") + return + } + wg.Add(1) + go func() { + defer wg.Done() + m.recvPing(ipv6Socket, false) + }() + } + } else { + // latencyConfig deleted + if ticker != nil { + ticker.Stop() + ticker = nil + } + tickerCh = nil + + // We close the sockets as a signal to recvPing that it needs to stop. + // Note that at that point, we are guaranteed that there is no ongoing Write + // to the socket, because pingAll runs in the same goroutine as this code. + if ipv4Socket != nil { + ipv4Socket.Close() + } + if ipv6Socket != nil { + ipv6Socket.Close() + } + + // After closing the sockets, wait for the recvPing goroutines to return + wg.Wait() + ipv4Socket = nil + ipv6Socket = nil + } + } + } +} diff --git a/pkg/apis/crd/v1alpha1/register.go b/pkg/apis/crd/v1alpha1/register.go index 5e5b95b4983..75b6c69cde6 100644 --- a/pkg/apis/crd/v1alpha1/register.go +++ b/pkg/apis/crd/v1alpha1/register.go @@ -53,6 +53,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ExternalNodeList{}, &SupportBundleCollection{}, &SupportBundleCollectionList{}, + &NodeLatencyMonitor{}, + &NodeLatencyMonitorList{}, ) metav1.AddToGroupVersion( diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 726c0b7c935..ac7a7557055 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -291,3 +291,37 @@ type TLSProtocol struct { // SNI (Server Name Indication) indicates the server domain name in the TLS/SSL hello message. SNI string `json:"sni,omitempty"` } + +// +genclient +// +genclient:nonNamespaced +// +genclient:noStatus +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeLatencyMonitor is used to monitor the latency between nodes in a Kubernetes cluster. It is a singleton resource, +// meaning only one instance of it can exist in the cluster. +type NodeLatencyMonitor struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec NodeLatencyMonitorSpec `json:"spec"` +} + +type NodeLatencyMonitorSpec struct { + // PingInterval specifies the interval in seconds between ping requests. + // Ping interval should be greater than or equal to 1s. + PingIntervalSeconds int32 `json:"pingIntervalSeconds"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeLatencyMonitor is only a singleton resource, so it does not use a list type. +// But current k8s client-gen does not support generating client for singleton informer resource, +// so we have to define a list type for CRD Informer. +// Maybe we will remove it in the future. +type NodeLatencyMonitorList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + + Items []NodeLatencyMonitor `json:"items"` +} diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index 820d4be306a..f06755a80a3 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -292,6 +292,82 @@ func (in *NetworkInterface) DeepCopy() *NetworkInterface { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitor) DeepCopyInto(out *NodeLatencyMonitor) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitor. +func (in *NodeLatencyMonitor) DeepCopy() *NodeLatencyMonitor { + if in == nil { + return nil + } + out := new(NodeLatencyMonitor) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeLatencyMonitor) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitorList) DeepCopyInto(out *NodeLatencyMonitorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]NodeLatencyMonitor, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitorList. +func (in *NodeLatencyMonitorList) DeepCopy() *NodeLatencyMonitorList { + if in == nil { + return nil + } + out := new(NodeLatencyMonitorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeLatencyMonitorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeLatencyMonitorSpec) DeepCopyInto(out *NodeLatencyMonitorSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeLatencyMonitorSpec. +func (in *NodeLatencyMonitorSpec) DeepCopy() *NodeLatencyMonitorSpec { + if in == nil { + return nil + } + out := new(NodeLatencyMonitorSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SupportBundleCollection) DeepCopyInto(out *SupportBundleCollection) { *out = *in diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 51f49a68ff5..3a8f3d78494 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -68,6 +68,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "Multicast", Status: multicastStatus, Version: "BETA"}, {Component: "agent", Name: "Multicluster", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"}, + {Component: "agent", Name: "NodeLatencyMonitor", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NodeNetworkPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NodePortLocal", Status: "Enabled", Version: "GA"}, {Component: "agent", Name: "SecondaryNetwork", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go index 64e4f3bd836..64b9ed11579 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go @@ -27,6 +27,7 @@ import ( type CrdV1alpha1Interface interface { RESTClient() rest.Interface ExternalNodesGetter + NodeLatencyMonitorsGetter SupportBundleCollectionsGetter } @@ -39,6 +40,10 @@ func (c *CrdV1alpha1Client) ExternalNodes(namespace string) ExternalNodeInterfac return newExternalNodes(c, namespace) } +func (c *CrdV1alpha1Client) NodeLatencyMonitors() NodeLatencyMonitorInterface { + return newNodeLatencyMonitors(c) +} + func (c *CrdV1alpha1Client) SupportBundleCollections() SupportBundleCollectionInterface { return newSupportBundleCollections(c) } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go index e442aae39df..036a1f768af 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go @@ -30,6 +30,10 @@ func (c *FakeCrdV1alpha1) ExternalNodes(namespace string) v1alpha1.ExternalNodeI return &FakeExternalNodes{c, namespace} } +func (c *FakeCrdV1alpha1) NodeLatencyMonitors() v1alpha1.NodeLatencyMonitorInterface { + return &FakeNodeLatencyMonitors{c} +} + func (c *FakeCrdV1alpha1) SupportBundleCollections() v1alpha1.SupportBundleCollectionInterface { return &FakeSupportBundleCollections{c} } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go new file mode 100644 index 00000000000..76a415f8958 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_nodelatencymonitor.go @@ -0,0 +1,119 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeNodeLatencyMonitors implements NodeLatencyMonitorInterface +type FakeNodeLatencyMonitors struct { + Fake *FakeCrdV1alpha1 +} + +var nodelatencymonitorsResource = v1alpha1.SchemeGroupVersion.WithResource("nodelatencymonitors") + +var nodelatencymonitorsKind = v1alpha1.SchemeGroupVersion.WithKind("NodeLatencyMonitor") + +// Get takes name of the nodeLatencyMonitor, and returns the corresponding nodeLatencyMonitor object, and an error if there is any. +func (c *FakeNodeLatencyMonitors) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(nodelatencymonitorsResource, name), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// List takes label and field selectors, and returns the list of NodeLatencyMonitors that match those selectors. +func (c *FakeNodeLatencyMonitors) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NodeLatencyMonitorList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(nodelatencymonitorsResource, nodelatencymonitorsKind, opts), &v1alpha1.NodeLatencyMonitorList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.NodeLatencyMonitorList{ListMeta: obj.(*v1alpha1.NodeLatencyMonitorList).ListMeta} + for _, item := range obj.(*v1alpha1.NodeLatencyMonitorList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested nodeLatencyMonitors. +func (c *FakeNodeLatencyMonitors) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(nodelatencymonitorsResource, opts)) +} + +// Create takes the representation of a nodeLatencyMonitor and creates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *FakeNodeLatencyMonitors) Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(nodelatencymonitorsResource, nodeLatencyMonitor), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// Update takes the representation of a nodeLatencyMonitor and updates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *FakeNodeLatencyMonitors) Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(nodelatencymonitorsResource, nodeLatencyMonitor), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} + +// Delete takes name of the nodeLatencyMonitor and deletes it. Returns an error if one occurs. +func (c *FakeNodeLatencyMonitors) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(nodelatencymonitorsResource, name, opts), &v1alpha1.NodeLatencyMonitor{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeNodeLatencyMonitors) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(nodelatencymonitorsResource, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.NodeLatencyMonitorList{}) + return err +} + +// Patch applies the patch and returns the patched nodeLatencyMonitor. +func (c *FakeNodeLatencyMonitors) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(nodelatencymonitorsResource, name, pt, data, subresources...), &v1alpha1.NodeLatencyMonitor{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NodeLatencyMonitor), err +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go index 05db7473c1b..34fa36922dd 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ package v1alpha1 type ExternalNodeExpansion interface{} +type NodeLatencyMonitorExpansion interface{} + type SupportBundleCollectionExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..cfaef7eebd1 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,166 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + scheme "antrea.io/antrea/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// NodeLatencyMonitorsGetter has a method to return a NodeLatencyMonitorInterface. +// A group's client should implement this interface. +type NodeLatencyMonitorsGetter interface { + NodeLatencyMonitors() NodeLatencyMonitorInterface +} + +// NodeLatencyMonitorInterface has methods to work with NodeLatencyMonitor resources. +type NodeLatencyMonitorInterface interface { + Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (*v1alpha1.NodeLatencyMonitor, error) + Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (*v1alpha1.NodeLatencyMonitor, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.NodeLatencyMonitor, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.NodeLatencyMonitorList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) + NodeLatencyMonitorExpansion +} + +// nodeLatencyMonitors implements NodeLatencyMonitorInterface +type nodeLatencyMonitors struct { + client rest.Interface +} + +// newNodeLatencyMonitors returns a NodeLatencyMonitors +func newNodeLatencyMonitors(c *CrdV1alpha1Client) *nodeLatencyMonitors { + return &nodeLatencyMonitors{ + client: c.RESTClient(), + } +} + +// Get takes name of the nodeLatencyMonitor, and returns the corresponding nodeLatencyMonitor object, and an error if there is any. +func (c *nodeLatencyMonitors) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Get(). + Resource("nodelatencymonitors"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of NodeLatencyMonitors that match those selectors. +func (c *nodeLatencyMonitors) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NodeLatencyMonitorList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.NodeLatencyMonitorList{} + err = c.client.Get(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested nodeLatencyMonitors. +func (c *nodeLatencyMonitors) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a nodeLatencyMonitor and creates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *nodeLatencyMonitors) Create(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.CreateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Post(). + Resource("nodelatencymonitors"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(nodeLatencyMonitor). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a nodeLatencyMonitor and updates it. Returns the server's representation of the nodeLatencyMonitor, and an error, if there is any. +func (c *nodeLatencyMonitors) Update(ctx context.Context, nodeLatencyMonitor *v1alpha1.NodeLatencyMonitor, opts v1.UpdateOptions) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Put(). + Resource("nodelatencymonitors"). + Name(nodeLatencyMonitor.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(nodeLatencyMonitor). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the nodeLatencyMonitor and deletes it. Returns an error if one occurs. +func (c *nodeLatencyMonitors) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Resource("nodelatencymonitors"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *nodeLatencyMonitors) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("nodelatencymonitors"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched nodeLatencyMonitor. +func (c *nodeLatencyMonitors) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NodeLatencyMonitor, err error) { + result = &v1alpha1.NodeLatencyMonitor{} + err = c.client.Patch(pt). + Resource("nodelatencymonitors"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go index 89897d70a68..c83d379324d 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( type Interface interface { // ExternalNodes returns a ExternalNodeInformer. ExternalNodes() ExternalNodeInformer + // NodeLatencyMonitors returns a NodeLatencyMonitorInformer. + NodeLatencyMonitors() NodeLatencyMonitorInformer // SupportBundleCollections returns a SupportBundleCollectionInformer. SupportBundleCollections() SupportBundleCollectionInformer } @@ -44,6 +46,11 @@ func (v *version) ExternalNodes() ExternalNodeInformer { return &externalNodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// NodeLatencyMonitors returns a NodeLatencyMonitorInformer. +func (v *version) NodeLatencyMonitors() NodeLatencyMonitorInformer { + return &nodeLatencyMonitorInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // SupportBundleCollections returns a SupportBundleCollectionInformer. func (v *version) SupportBundleCollections() SupportBundleCollectionInformer { return &supportBundleCollectionInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..69cb82ef0b3 --- /dev/null +++ b/pkg/client/informers/externalversions/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,87 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + versioned "antrea.io/antrea/pkg/client/clientset/versioned" + internalinterfaces "antrea.io/antrea/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// NodeLatencyMonitorInformer provides access to a shared informer and lister for +// NodeLatencyMonitors. +type NodeLatencyMonitorInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.NodeLatencyMonitorLister +} + +type nodeLatencyMonitorInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewNodeLatencyMonitorInformer constructs a new informer for NodeLatencyMonitor type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewNodeLatencyMonitorInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredNodeLatencyMonitorInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredNodeLatencyMonitorInformer constructs a new informer for NodeLatencyMonitor type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredNodeLatencyMonitorInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().NodeLatencyMonitors().List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().NodeLatencyMonitors().Watch(context.TODO(), options) + }, + }, + &crdv1alpha1.NodeLatencyMonitor{}, + resyncPeriod, + indexers, + ) +} + +func (f *nodeLatencyMonitorInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredNodeLatencyMonitorInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *nodeLatencyMonitorInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&crdv1alpha1.NodeLatencyMonitor{}, f.defaultInformer) +} + +func (f *nodeLatencyMonitorInformer) Lister() v1alpha1.NodeLatencyMonitorLister { + return v1alpha1.NewNodeLatencyMonitorLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index f5bef826e02..e692c359dd2 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -55,6 +55,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=crd.antrea.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("externalnodes"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().ExternalNodes().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("nodelatencymonitors"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().NodeLatencyMonitors().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("supportbundlecollections"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().SupportBundleCollections().Informer()}, nil diff --git a/pkg/client/listers/crd/v1alpha1/expansion_generated.go b/pkg/client/listers/crd/v1alpha1/expansion_generated.go index fe8777f36ed..e3a50a984f1 100644 --- a/pkg/client/listers/crd/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/crd/v1alpha1/expansion_generated.go @@ -24,6 +24,10 @@ type ExternalNodeListerExpansion interface{} // ExternalNodeNamespaceLister. type ExternalNodeNamespaceListerExpansion interface{} +// NodeLatencyMonitorListerExpansion allows custom methods to be added to +// NodeLatencyMonitorLister. +type NodeLatencyMonitorListerExpansion interface{} + // SupportBundleCollectionListerExpansion allows custom methods to be added to // SupportBundleCollectionLister. type SupportBundleCollectionListerExpansion interface{} diff --git a/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go b/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go new file mode 100644 index 00000000000..214da973843 --- /dev/null +++ b/pkg/client/listers/crd/v1alpha1/nodelatencymonitor.go @@ -0,0 +1,66 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// NodeLatencyMonitorLister helps list NodeLatencyMonitors. +// All objects returned here must be treated as read-only. +type NodeLatencyMonitorLister interface { + // List lists all NodeLatencyMonitors in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.NodeLatencyMonitor, err error) + // Get retrieves the NodeLatencyMonitor from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.NodeLatencyMonitor, error) + NodeLatencyMonitorListerExpansion +} + +// nodeLatencyMonitorLister implements the NodeLatencyMonitorLister interface. +type nodeLatencyMonitorLister struct { + indexer cache.Indexer +} + +// NewNodeLatencyMonitorLister returns a new NodeLatencyMonitorLister. +func NewNodeLatencyMonitorLister(indexer cache.Indexer) NodeLatencyMonitorLister { + return &nodeLatencyMonitorLister{indexer: indexer} +} + +// List lists all NodeLatencyMonitors in the indexer. +func (s *nodeLatencyMonitorLister) List(selector labels.Selector) (ret []*v1alpha1.NodeLatencyMonitor, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.NodeLatencyMonitor)) + }) + return ret, err +} + +// Get retrieves the NodeLatencyMonitor from the index for a given name. +func (s *nodeLatencyMonitorLister) Get(name string) (*v1alpha1.NodeLatencyMonitor, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("nodelatencymonitor"), name) + } + return obj.(*v1alpha1.NodeLatencyMonitor), nil +} diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 36fb997ffdc..fe673654552 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -158,6 +158,10 @@ const ( // alpha: v1.15 // Enable layer 7 flow export on Pods and Namespaces L7FlowExporter featuregate.Feature = "L7FlowExporter" + + // alpha: v2.1 + // Enable the NodeLatencyMonitor feature. + NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor" ) var ( @@ -199,6 +203,7 @@ var ( EgressSeparateSubnet: {Default: false, PreRelease: featuregate.Alpha}, NodeNetworkPolicy: {Default: false, PreRelease: featuregate.Alpha}, L7FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, + NodeLatencyMonitor: {Default: false, PreRelease: featuregate.Alpha}, } // AgentGates consists of all known feature gates for the Antrea Agent. @@ -229,6 +234,7 @@ var ( EgressSeparateSubnet, NodeNetworkPolicy, L7FlowExporter, + NodeLatencyMonitor, ) // ControllerGates consists of all known feature gates for the Antrea Controller. @@ -276,6 +282,7 @@ var ( EgressSeparateSubnet: {}, NodeNetworkPolicy: {}, L7FlowExporter: {}, + NodeLatencyMonitor: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an