From c5b6f8df60b8c8cf878332b2b4f46c75bb45413e Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Fri, 6 Sep 2024 16:33:26 +0800 Subject: [PATCH] Filterout unhealthy endpoints otherwise there may be connection error Signed-off-by: Zhonghu Xu --- pkg/controller/workload/workload_processor.go | 47 ++++++++++++------- .../workload/workload_processor_test.go | 37 +++++++++++---- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 43b27ad83..21c6b987b 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -144,18 +144,27 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { return nil } -func (p *Processor) removeWorkloadResource(removedResources []string) error { +func (p *Processor) removeWorkloadResources(removedResources []string) error { for _, uid := range removedResources { - wl := p.WorkloadCache.GetWorkloadByUid(uid) - p.WorkloadCache.DeleteWorkload(uid) - telemetry.DeleteWorkloadMetric(wl) - if err := p.removeWorkloadFromBpfMap(uid); err != nil { - return err + err := p.removeWorkload(uid) + if err != nil { + log.Warnf("removeWorkload %s failed: %v", uid, err) + continue } } return nil } +func (p *Processor) removeWorkload(uid string) error { + wl := p.WorkloadCache.GetWorkloadByUid(uid) + if wl == nil { + return nil + } + p.WorkloadCache.DeleteWorkload(uid) + telemetry.DeleteWorkloadMetric(wl) + return p.removeWorkloadFromBpfMap(uid) +} + func (p *Processor) removeWorkloadFromBpfMap(uid string) error { var ( err error @@ -220,7 +229,7 @@ func (p *Processor) deleteServiceFrontendData(service *workloadapi.Service, id u return nil } -func (p *Processor) removeServiceResource(resources []string) error { +func (p *Processor) removeServiceResources(resources []string) error { for _, name := range resources { telemetry.DeleteServiceMetric(name) svc := p.ServiceCache.GetService(name) @@ -368,10 +377,18 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { } func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { - log.Debugf("handle workload: %s", workload.Uid) + log.Debugf("handle workload: %s", workload.ResourceName()) + // Keep track of the workload no matter it is healthy, unhealthy workload is just for debugging p.WorkloadCache.AddOrUpdateWorkload(workload) + // Exclude unhealthy workload, which is not ready to serve traffic + if workload.Status == workloadapi.WorkloadStatus_UNHEALTHY { + log.Debugf("workload %s is unhealthy", workload.ResourceName()) + // If the workload is updated to unhealthy, we should remove it from the bpf map + return p.removeWorkloadFromBpfMap(workload.Uid) + } + unboundedEndpointKeys, newServices := p.compareWorkloadServices(workload) if err := p.handleWorkloadUnboundServices(workload, unboundedEndpointKeys); err != nil { log.Errorf("handleWorkloadUnboundServices %s failed: %v", workload.ResourceName(), err) @@ -527,10 +544,10 @@ func (p *Processor) handleRemovedAddresses(removed []string) { } } - if err := p.removeWorkloadResource(workloadNames); err != nil { - log.Errorf("RemoveWorkloadResource failed: %v", err) + if err := p.removeWorkloadResources(workloadNames); err != nil { + log.Errorf("removeWorkloadResources failed: %v", err) } - if err := p.removeServiceResource(serviceNames); err != nil { + if err := p.removeServiceResources(serviceNames); err != nil { log.Errorf("RemoveServiceResource failed: %v", err) } } @@ -552,21 +569,19 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis case *workloadapi.Address_Service: services = append(services, address.GetService()) default: - log.Errorf("unknown type") + log.Errorf("unknown type, should not reach here") } } for _, service := range services { - log.Debugf("handle service %v", service.ResourceName()) if err = p.handleService(service); err != nil { - log.Errorf("handle service failed, err: %v", err) + log.Errorf("handle service %v failed, err: %v", service.ResourceName(), err) } } for _, workload := range workloads { - log.Debugf("handle workload %v", workload.ResourceName()) if err = p.handleWorkload(workload); err != nil { - log.Errorf("handle workload failed, err: %v", err) + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) } } diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index d3cc65bff..b35c26709 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -53,12 +53,12 @@ func Test_handleWorkload(t *testing.T) { _ = p.handleService(fakeSvc) // 2. add workload - wl := createTestWorkloadWithService(true) - err := p.handleWorkload(wl) + workload1 := createTestWorkloadWithService(true) + err := p.handleWorkload(workload1) assert.NoError(t, err) - workloadID := checkFrontEndMap(t, wl.Addresses[0], p) - checkBackendMap(t, p, workloadID, wl) + workloadID := checkFrontEndMap(t, workload1.Addresses[0], p) + checkBackendMap(t, p, workloadID, workload1) // 2.1 check front end map contains service svcID := checkFrontEndMap(t, fakeSvc.Addresses[0].Address, p) @@ -118,9 +118,9 @@ func Test_handleWorkload(t *testing.T) { checkBackendMap(t, p, workload2ID, workload2) // 5 update workload to remove the bound services - wl3 := proto.Clone(wl).(*workloadapi.Workload) - wl3.Services = nil - err = p.handleWorkload(wl3) + workload1Updated := proto.Clone(workload1).(*workloadapi.Workload) + workload1Updated.Services = nil + err = p.handleWorkload(workload1Updated) assert.NoError(t, err) // 5.1 check service map @@ -142,7 +142,28 @@ func Test_handleWorkload(t *testing.T) { // 6.2 check service map contains service, but no waypoint address checkServiceMap(t, p, svcID, wpSvc, 0) - // 7. delete service + // 7. test add unhealthy workload + workload3 := createFakeWorkload("1.2.3.7", workloadapi.NetworkMode_STANDARD) + workload3.Status = workloadapi.WorkloadStatus_UNHEALTHY + _ = p.handleWorkload(workload3) + + addr, _ := netip.AddrFromSlice(workload3.Addresses[0]) + networkAddress := cache.NetworkAddress{ + Network: workload3.Network, + Address: addr, + } + got := p.WorkloadCache.GetWorkloadByAddr(networkAddress) + t.Logf("workload3: %v", got) + assert.NotNil(t, got) + assert.Equal(t, got.Status, workloadapi.WorkloadStatus_UNHEALTHY) + checkNotExistInFrontEndMap(t, workload3.Addresses[0], p) + + // 8. update workload from healthy to unhealthy, should remove it from bpf map + workload2.Status = workloadapi.WorkloadStatus_UNHEALTHY + _ = p.handleWorkload(workload2) + checkNotExistInFrontEndMap(t, workload2.Addresses[0], p) + + // 9. delete service p.handleRemovedAddresses([]string{fakeSvc.ResourceName()}) checkNotExistInFrontEndMap(t, fakeSvc.Addresses[0].Address, p)