Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pods that have out of range IP #1963

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,12 +1803,16 @@ func TestEnableDegradedMode(t *testing.T) {
_, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false)
s.NegSyncerKey.NegName = tc.negName
s.needInit = false
addPodsToLister(s.podLister)
addPodsToLister(s.podLister, getDefaultEndpointSlices())
for i := 1; i <= 4; i++ {
s.nodeLister.Add(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("instance%v", i),
},
Spec: corev1.NodeSpec{
PodCIDR: fmt.Sprintf("10.100.%v.0/24", i),
PodCIDRs: []string{fmt.Sprintf("200%v:db8::/48", i), fmt.Sprintf("10.100.%v.0/24", i)},
},
})
}
for _, eps := range tc.testEndpointSlices {
Expand Down
137 changes: 107 additions & 30 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,15 +371,11 @@ func getEndpointPod(

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
// we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly
func toZoneNetworkEndpointMapDegradedMode(
eds []negtypes.EndpointsData,
zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer,
servicePortName string,
networkEndpointType negtypes.NetworkEndpointType,
) ZoneNetworkEndpointMapResult {
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
ipsForPod := ipsForPod(eds)
for _, ed := range eds {
matchPort := ""
for _, port := range ed.Ports {
Expand All @@ -392,7 +388,7 @@ func toZoneNetworkEndpointMapDegradedMode(
continue
}
for _, endpointAddress := range ed.Addresses {
if endpointAddress.AddressType != discovery.AddressTypeIPv4 {
if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 {
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
Expand All @@ -401,10 +397,6 @@ func toZoneNetworkEndpointMapDegradedMode(
klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr)
continue
}
if err := validatePod(pod, nodeLister); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
Expand All @@ -414,25 +406,46 @@ func toZoneNetworkEndpointMapDegradedMode(
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
}
for _, address := range endpointAddress.Addresses {
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address)
continue
}

podIPs := ipsForPod[types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}]
// TODO(cheungdavid): Remove this validation when single stack ipv6 endpoint is supported
if parseIPAddress(podIPs.IP) == "" {
klog.Errorf("For endpoint %q in pod %q, it has an invalid IPv4 address, err: %v, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, negtypes.ErrEPIPNotFromPod)
continue
}
networkEndpoint := negtypes.NetworkEndpoint{IP: podIPs.IP, Port: matchPort, Node: nodeName}
if enableDualStackNEG {
// Convert all addresses to a standard form as per rfc5952 to prevent
// accidental diffs resulting from different formats.
networkEndpoint.IPv6 = parseIPAddress(podIPs.IPv6)
}
// endpoint address should match to the IP of its pod
if err = podContainsEndpointAddress(networkEndpoint, pod); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s has IP(s) not match to its pod %s: %w, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, pod.Name, err)
continue
}
if err := validatePod(pod, nodeLister, networkEndpoint); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
dupCount += 1
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %v, save the pod information from the alphabetically higher pod", networkEndpoint)
continue // if existing name is alphabetically lower than current one, continue and don't replace
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
}

return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
Expand All @@ -443,7 +456,8 @@ func toZoneNetworkEndpointMapDegradedMode(
// it returns error if the pod:
// 1. is in terminal state
// 2. corresponds to a non-existent node
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint) error {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -453,10 +467,13 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
if err != nil || !exists {
return negtypes.ErrEPNodeNotFound
}
_, isNode := obj.(*apiv1.Node)
if !isNode {
node, ok := obj.(*apiv1.Node)
if !ok {
return negtypes.ErrEPNodeTypeAssertionFailed
}
if err = nodeContainsPodIP(node, networkEndpoint); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -486,6 +503,66 @@ func ipsForPod(eds []negtypes.EndpointsData) map[types.NamespacedName]negtypes.N
return result
}

// podContainsEndpointAddress checks the pod's existing PodIP(s)
// and return error if the given endpoint's IP address does not any of them.
// If this is a dual stack endpoint, we would validate both IPs
func podContainsEndpointAddress(networkEndpoint negtypes.NetworkEndpoint, pod *apiv1.Pod) error {
// TODO(cheungdavid): update ipv4 check when single stack ipv6 is supported
endpointIPs := []string{networkEndpoint.IP}
if networkEndpoint.IPv6 != "" {
endpointIPs = append(endpointIPs, networkEndpoint.IPv6)
}

matching := 0
for _, endpointIP := range endpointIPs {
// a pod can have at most two PodIPs, one for ipv4 and one for ipv6
for _, podIP := range pod.Status.PodIPs {
if endpointIP == podIP.IP {
matching += 1
}
}
}
if matching != len(endpointIPs) {
return fmt.Errorf("%w: endpoint %v has IP(s) not match to its pod's IP(s)", negtypes.ErrEPIPNotFromPod, endpointIPs)
}
return nil
}

// nodeContainsPodIP checks the node's existing PodCIDR(s),
// and return error if the pod IP used by the endpoint is not within one of the podCIDR ranges.
// If this is a dual stack endpoint, we would validate both pod IPs
func nodeContainsPodIP(node *apiv1.Node, networkEndpoint negtypes.NetworkEndpoint) error {
ipnets := []*net.IPNet{}
// a node can have at most two PodCIDRs, one for ipv4 and one for ipv6
for _, podCIDR := range node.Spec.PodCIDRs {
podCIDR = strings.TrimSpace(podCIDR)
_, ipnet, err := net.ParseCIDR(podCIDR)
if err != nil {
// swallow errors for CIDRs that are invalid
continue
}
ipnets = append(ipnets, ipnet)
}
podIPs := []net.IP{net.ParseIP(networkEndpoint.IP)}
if networkEndpoint.IPv6 != "" {
podIPs = append(podIPs, net.ParseIP(networkEndpoint.IPv6))
}

matching := 0
for _, podIP := range podIPs {
for _, net := range ipnets {
if net.Contains(podIP) {
matching += 1
break
}
}
}
if matching != len(podIPs) {
return fmt.Errorf("%w: podIP(s) used by endpoint %v not match to the node's PodCIDR range(s)", negtypes.ErrEPIPOutOfPodCIDR, podIPs)
}
return nil
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
Loading