Skip to content

Commit

Permalink
Merge pull request #1963 from sawsa307/filter-pod-using-podcidr
Browse files Browse the repository at this point in the history
Filter pods that have out of range IP
  • Loading branch information
k8s-ci-robot authored Apr 27, 2023
2 parents 0380d0b + e00401e commit 743ff5b
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 138 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,12 +1803,16 @@ func TestEnableDegradedMode(t *testing.T) {
_, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false)
s.NegSyncerKey.NegName = tc.negName
s.needInit = false
addPodsToLister(s.podLister)
addPodsToLister(s.podLister, getDefaultEndpointSlices())
for i := 1; i <= 4; i++ {
s.nodeLister.Add(&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("instance%v", i),
},
Spec: corev1.NodeSpec{
PodCIDR: fmt.Sprintf("10.100.%v.0/24", i),
PodCIDRs: []string{fmt.Sprintf("200%v:db8::/48", i), fmt.Sprintf("10.100.%v.0/24", i)},
},
})
}
for _, eps := range tc.testEndpointSlices {
Expand Down
137 changes: 107 additions & 30 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,15 +371,11 @@ func getEndpointPod(

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
// we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly
func toZoneNetworkEndpointMapDegradedMode(
eds []negtypes.EndpointsData,
zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer,
servicePortName string,
networkEndpointType negtypes.NetworkEndpointType,
) ZoneNetworkEndpointMapResult {
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
ipsForPod := ipsForPod(eds)
for _, ed := range eds {
matchPort := ""
for _, port := range ed.Ports {
Expand All @@ -392,7 +388,7 @@ func toZoneNetworkEndpointMapDegradedMode(
continue
}
for _, endpointAddress := range ed.Addresses {
if endpointAddress.AddressType != discovery.AddressTypeIPv4 {
if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 {
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
Expand All @@ -401,10 +397,6 @@ func toZoneNetworkEndpointMapDegradedMode(
klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr)
continue
}
if err := validatePod(pod, nodeLister); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
Expand All @@ -414,25 +406,46 @@ func toZoneNetworkEndpointMapDegradedMode(
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
}
for _, address := range endpointAddress.Addresses {
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address)
continue
}

podIPs := ipsForPod[types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}]
// TODO(cheungdavid): Remove this validation when single stack ipv6 endpoint is supported
if parseIPAddress(podIPs.IP) == "" {
klog.Errorf("For endpoint %q in pod %q, it has an invalid IPv4 address, err: %v, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, negtypes.ErrEPIPNotFromPod)
continue
}
networkEndpoint := negtypes.NetworkEndpoint{IP: podIPs.IP, Port: matchPort, Node: nodeName}
if enableDualStackNEG {
// Convert all addresses to a standard form as per rfc5952 to prevent
// accidental diffs resulting from different formats.
networkEndpoint.IPv6 = parseIPAddress(podIPs.IPv6)
}
// endpoint address should match to the IP of its pod
if err = podContainsEndpointAddress(networkEndpoint, pod); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s has IP(s) not match to its pod %s: %w, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, pod.Name, err)
continue
}
if err := validatePod(pod, nodeLister, networkEndpoint); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
dupCount += 1
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %v, save the pod information from the alphabetically higher pod", networkEndpoint)
continue // if existing name is alphabetically lower than current one, continue and don't replace
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
}

return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
Expand All @@ -443,7 +456,8 @@ func toZoneNetworkEndpointMapDegradedMode(
// it returns error if the pod:
// 1. is in terminal state
// 2. corresponds to a non-existent node
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint) error {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -453,10 +467,13 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
if err != nil || !exists {
return negtypes.ErrEPNodeNotFound
}
_, isNode := obj.(*apiv1.Node)
if !isNode {
node, ok := obj.(*apiv1.Node)
if !ok {
return negtypes.ErrEPNodeTypeAssertionFailed
}
if err = nodeContainsPodIP(node, networkEndpoint); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -486,6 +503,66 @@ func ipsForPod(eds []negtypes.EndpointsData) map[types.NamespacedName]negtypes.N
return result
}

// podContainsEndpointAddress checks the pod's existing PodIP(s)
// and return error if the given endpoint's IP address does not any of them.
// If this is a dual stack endpoint, we would validate both IPs
func podContainsEndpointAddress(networkEndpoint negtypes.NetworkEndpoint, pod *apiv1.Pod) error {
// TODO(cheungdavid): update ipv4 check when single stack ipv6 is supported
endpointIPs := []string{networkEndpoint.IP}
if networkEndpoint.IPv6 != "" {
endpointIPs = append(endpointIPs, networkEndpoint.IPv6)
}

matching := 0
for _, endpointIP := range endpointIPs {
// a pod can have at most two PodIPs, one for ipv4 and one for ipv6
for _, podIP := range pod.Status.PodIPs {
if endpointIP == podIP.IP {
matching += 1
}
}
}
if matching != len(endpointIPs) {
return fmt.Errorf("%w: endpoint %v has IP(s) not match to its pod's IP(s)", negtypes.ErrEPIPNotFromPod, endpointIPs)
}
return nil
}

// nodeContainsPodIP checks the node's existing PodCIDR(s),
// and return error if the pod IP used by the endpoint is not within one of the podCIDR ranges.
// If this is a dual stack endpoint, we would validate both pod IPs
func nodeContainsPodIP(node *apiv1.Node, networkEndpoint negtypes.NetworkEndpoint) error {
ipnets := []*net.IPNet{}
// a node can have at most two PodCIDRs, one for ipv4 and one for ipv6
for _, podCIDR := range node.Spec.PodCIDRs {
podCIDR = strings.TrimSpace(podCIDR)
_, ipnet, err := net.ParseCIDR(podCIDR)
if err != nil {
// swallow errors for CIDRs that are invalid
continue
}
ipnets = append(ipnets, ipnet)
}
podIPs := []net.IP{net.ParseIP(networkEndpoint.IP)}
if networkEndpoint.IPv6 != "" {
podIPs = append(podIPs, net.ParseIP(networkEndpoint.IPv6))
}

matching := 0
for _, podIP := range podIPs {
for _, net := range ipnets {
if net.Contains(podIP) {
matching += 1
break
}
}
}
if matching != len(podIPs) {
return fmt.Errorf("%w: podIP(s) used by endpoint %v not match to the node's PodCIDR range(s)", negtypes.ErrEPIPOutOfPodCIDR, podIPs)
}
return nil
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
Loading

0 comments on commit 743ff5b

Please sign in to comment.