From 50b4405e7e65223e4de93a5eb1172eb683f3ff8d Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Sat, 31 Aug 2024 11:15:29 +0800 Subject: [PATCH] add workload processor restart test Signed-off-by: Zhonghu Xu --- pkg/controller/workload/bpfcache/endpoint.go | 20 ++ pkg/controller/workload/workload_processor.go | 43 ++-- .../workload/workload_processor_test.go | 195 +++++++++++++++++- 3 files changed, 226 insertions(+), 32 deletions(-) diff --git a/pkg/controller/workload/bpfcache/endpoint.go b/pkg/controller/workload/bpfcache/endpoint.go index 22410a17a..3481e6dda 100644 --- a/pkg/controller/workload/bpfcache/endpoint.go +++ b/pkg/controller/workload/bpfcache/endpoint.go @@ -129,3 +129,23 @@ func (c *Cache) RestoreEndpointKeys() { } } } + +// GetAllEndpointsForService returns all the endpoints for a service +// Note only used for testing +func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue { + log.Debugf("init endpoint keys") + var ( + key = EndpointKey{} + value = EndpointValue{} + ) + + var res []EndpointValue + + iter := c.bpfMap.KmeshEndpoint.Iterate() + for iter.Next(&key, &value) { + if key.ServiceId == serviceId { + res = append(res, value) + } + } + return res +} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 3c6fb5f40..a640b4d92 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -26,6 +26,7 @@ import ( service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "istio.io/istio/pkg/util/sets" "kmesh.net/kmesh/api/v2/workloadapi" "kmesh.net/kmesh/api/v2/workloadapi/security" @@ -285,24 +286,23 @@ func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValu } // handleWorkloadUnboundServices handles when a workload's belonging services removed -func (p *Processor) handleWorkloadUnboundServices(workload *workloadapi.Workload, services []string) error { - if len(services) == 0 { - return nil - } - - log.Debugf("handleWorkloadUnboundServices %s: %v", workload.ResourceName(), services) - workloadUid := p.hashName.Hash(workload.GetUid()) - allEks := p.bpf.GetEndpointKeys(workloadUid) - needRemove := make([]bpf.EndpointKey, 0, len(services)) - for _, serviceName := range services { - serviceId := p.hashName.Hash(serviceName) - for ek := range allEks { - if ek.ServiceId == serviceId { - needRemove = append(needRemove, ek) - } +func (p *Processor) handleWorkloadUnboundServices(workload *workloadapi.Workload) error { + workloadUid := p.hashName.Hash(workload.Uid) + svcSets := sets.New[uint32]() + for svcKey := range workload.Services { + svcSets.Insert(p.hashName.Hash(svcKey)) + } + unboundedServices := []uint32{} + eks := p.bpf.GetEndpointKeys(workloadUid) + needRemove := []bpf.EndpointKey{} + for ek := range eks { + if !svcSets.Contains(ek.ServiceId) { + unboundedServices = append(unboundedServices, ek.ServiceId) + needRemove = append(needRemove, ek) } } + log.Debugf("handleWorkloadUnboundServices %s: %v", workload.ResourceName(), unboundedServices) err := p.deleteEndpointRecords(workloadUid, needRemove) if err != nil { log.Errorf("removeResidualServices delete endpoint failed:%v", err) @@ -382,14 +382,13 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error { } func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { - var deletedServices []string var newServices []string log.Debugf("handle workload: %s", workload.Uid) - deletedServices, newServices = p.WorkloadCache.AddOrUpdateWorkload(workload) + _, newServices = p.WorkloadCache.AddOrUpdateWorkload(workload) // TODO: how can we know service on restart? maybe also rely on endpoint index - if err := p.handleWorkloadUnboundServices(workload, deletedServices); err != nil { + if err := p.handleWorkloadUnboundServices(workload); err != nil { log.Errorf("handleWorkloadUnboundServices %s failed: %v", workload.ResourceName(), err) return err } @@ -612,14 +611,14 @@ func (p *Processor) compareWorkloadAndServiceWithHashName() { bk.BackendUid = num sk.ServiceId = num if err := p.bpf.BackendLookup(&bk, &bv); err == nil { - log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv) + log.Debugf("found BackendValue: [%#v] and removeWorkloadFromBpfMap", bv) if err := p.removeWorkloadFromBpfMap(str); err != nil { - log.Errorf("RemoveWorkloadResource failed: %v", err) + log.Errorf("removeWorkloadFromBpfMap failed: %v", err) } } else if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { - log.Debugf("Find ServiceValue: [%#v] RemoveServiceResource", sv) + log.Debugf("found ServiceValue: [%#v] and removeServiceResourceFromBpfMap", sv) if err := p.removeServiceResourceFromBpfMap(str); err != nil { - log.Errorf("RemoveServiceResource failed: %v", err) + log.Errorf("removeServiceResourceFromBpfMap failed: %v", err) } } } diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 55c2cd62d..4e2594c0d 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -20,8 +20,11 @@ import ( "net/netip" "testing" + service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + "istio.io/istio/pilot/pkg/util/protoconv" + "istio.io/istio/pkg/util/sets" "k8s.io/apimachinery/pkg/util/rand" "kmesh.net/kmesh/api/v2/workloadapi" @@ -202,6 +205,18 @@ func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workload assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(fakeSvc.Waypoint.GetHboneMtlsPort())) } +func checkEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) { + endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName())) + assert.Equal(t, len(endpoints), len(backendUid)) + + all := sets.New[uint32](backendUid...) + for _, endpoint := range endpoints { + if !all.Contains(endpoint.BackendUid) { + t.Fatalf("endpoint %v, unexpected", endpoint.BackendUid) + } + } +} + func checkBackendMap(t *testing.T, p *Processor, workloadID uint32, wl *workloadapi.Workload) { var bv bpfcache.BackendValue err := p.bpf.BackendLookup(&bpfcache.BackendKey{BackendUid: workloadID}, &bv) @@ -370,30 +385,174 @@ func createFakeService(name, ip, waypoint string) *workloadapi.Service { } } +func createWorkload(name, ip string, networkload workloadapi.NetworkMode, services ...string) *workloadapi.Workload { + workload := workloadapi.Workload{ + Uid: "cluster0//Pod/default/" + name, + Namespace: "default", + Name: name, + Addresses: [][]byte{netip.MustParseAddr(ip).AsSlice()}, + Network: "testnetwork", + CanonicalName: "foo", + CanonicalRevision: "latest", + WorkloadType: workloadapi.WorkloadType_POD, + WorkloadName: "name", + Status: workloadapi.WorkloadStatus_HEALTHY, + ClusterId: "cluster0", + NetworkMode: networkload, + } + workload.Services = make(map[string]*workloadapi.PortList, len(services)) + for _, svc := range services { + workload.Services["default/"+svc+".default.svc.cluster.local"] = &workloadapi.PortList{ + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 8180, + }, + { + ServicePort: 82, + TargetPort: 82, + }, + }, + } + } + return &workload +} + func TestRestart(t *testing.T) { workloadMap := bpfcache.NewFakeWorkloadMap(t) defer bpfcache.CleanupFakeWorkloadMap(workloadMap) p := newProcessor(workloadMap) - // 1. handle workload with service, but service not handled yet - // In this case, only frontend map and backend map should be updated. - wl := createTestWorkloadWithService(true) - err := p.handleWorkload(wl) + res := &service_discovery_v3.DeltaDiscoveryResponse{} + + // 1. First simulate normal start + // 1.1 add related service + svc1 := createFakeService("svc1", "10.240.10.1", "10.240.10.200") + svc2 := createFakeService("svc2", "10.240.10.2", "10.240.10.200") + svc3 := createFakeService("svc3", "10.240.10.3", "10.240.10.200") + for _, svc := range []*workloadapi.Service{svc1, svc2, svc3} { + addr := serviceToAddress(svc) + res.Resources = append(res.Resources, &service_discovery_v3.Resource{ + Resource: protoconv.MessageToAny(addr), + }) + } + + // 1.2 add workload + wl1 := createWorkload("wl1", "10.244.0.1", workloadapi.NetworkMode_STANDARD, "svc1", "svc2") + wl2 := createWorkload("wl2", "10.244.0.2", workloadapi.NetworkMode_STANDARD, "svc2", "svc3") + wl3 := createWorkload("wl3", "10.244.0.3", workloadapi.NetworkMode_STANDARD, "svc3") + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} { + addr := workloadToAddress(wl) + res.Resources = append(res.Resources, &service_discovery_v3.Resource{ + Resource: protoconv.MessageToAny(addr), + }) + } + + err := p.handleAddressTypeResponse(res) assert.NoError(t, err) - workloadID := checkFrontEndMap(t, wl.Addresses[0], p) - checkBackendMap(t, p, workloadID, wl) + // check front end map + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} { + checkFrontEndMap(t, wl.Addresses[0], p) + } + for _, svc := range []*workloadapi.Service{svc1, svc2, svc3} { + checkFrontEndMap(t, svc.Addresses[0].Address, p) + } + // check service map + t.Log("1. check service map") + checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, 1) + checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, 2) + checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, 2) + // check endpoint map + t.Log("1. check endpoint map") + checkEndpointMap(t, p, svc1, []uint32{p.hashName.Hash(wl1.ResourceName())}) + checkEndpointMap(t, p, svc2, []uint32{p.hashName.Hash(wl1.ResourceName()), p.hashName.Hash(wl2.ResourceName())}) + checkEndpointMap(t, p, svc3, []uint32{p.hashName.Hash(wl2.ResourceName()), p.hashName.Hash(wl3.ResourceName())}) + // check backend map + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} { + checkBackendMap(t, p, p.hashName.Hash(wl.ResourceName()), wl) + } + // 2. Second simulate restart // Set a restart label and simulate missing data in the cache bpf.SetStartType(bpf.Restart) - for key := range wl.GetServices() { - p.ServiceCache.DeleteService(key) + // reconstruct a new processor + p = newProcessor(workloadMap) + p.bpf.RestoreEndpointKeys() + // 2.1 simulate workload add/delete during restart + // simulate workload update during restart + + // wl1 now only belong to svc1 + delete(wl1.Services, "default/svc2.default.svc.cluster.local") + // wl2 now belong to svc1, svc2, svc3 + wl2.Services["default/svc1.default.svc.cluster.local"] = &workloadapi.PortList{ + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 8180, + }, + { + ServicePort: 82, + TargetPort: 82, + }, + }, } - p.compareWorkloadAndServiceWithHashName() + wl4 := createWorkload("wl4", "10.244.0.4", workloadapi.NetworkMode_STANDARD, "svc4") + svc4 := createFakeService("svc4", "10.240.10.4", "10.240.10.200") - // TODO: add more restart cases + res = &service_discovery_v3.DeltaDiscoveryResponse{} + // wl3 deleted during restart + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} { + addr := workloadToAddress(wl) + res.Resources = append(res.Resources, &service_discovery_v3.Resource{ + Resource: protoconv.MessageToAny(addr), + }) + } + + for _, svc := range []*workloadapi.Service{svc1, svc2, svc3, svc4} { + addr := serviceToAddress(svc) + res.Resources = append(res.Resources, &service_discovery_v3.Resource{ + Resource: protoconv.MessageToAny(addr), + }) + } + + err = p.handleAddressTypeResponse(res) + assert.NoError(t, err) + + // check front end map + t.Log("2. check front end map") + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} { + checkFrontEndMap(t, wl.Addresses[0], p) + } + for _, svc := range []*workloadapi.Service{svc1, svc2, svc3, svc4} { + checkFrontEndMap(t, svc.Addresses[0].Address, p) + } + // TODO(hzxuzhonghu) check front end map elements number + + // check service map + checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, 2) // svc1 has 2 wl1, wl2 + checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, 1) // svc2 has 1 wl2 + checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, 1) // svc3 has 1 wl2 + checkServiceMap(t, p, p.hashName.Hash(svc4.ResourceName()), svc4, 1) // svc4 has 1 wl4 + // check endpoint map + checkEndpointMap(t, p, svc1, []uint32{p.hashName.Hash(wl1.ResourceName()), p.hashName.Hash(wl2.ResourceName())}) + checkEndpointMap(t, p, svc2, []uint32{p.hashName.Hash(wl2.ResourceName())}) + checkEndpointMap(t, p, svc3, []uint32{p.hashName.Hash(wl2.ResourceName())}) + checkEndpointMap(t, p, svc4, []uint32{p.hashName.Hash(wl4.ResourceName())}) + // check backend map + for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} { + checkBackendMap(t, p, p.hashName.Hash(wl.ResourceName()), wl) + } hashNameClean(p) } @@ -413,3 +572,19 @@ func hashNameClean(p *Processor) { } p.hashName.Reset() } + +func workloadToAddress(wl *workloadapi.Workload) *workloadapi.Address { + return &workloadapi.Address{ + Type: &workloadapi.Address_Workload{ + Workload: wl, + }, + } +} + +func serviceToAddress(service *workloadapi.Service) *workloadapi.Address { + return &workloadapi.Address{ + Type: &workloadapi.Address_Service{ + Service: service, + }, + } +}