Skip to content

Commit

Permalink
add workload processor restart test
Browse files Browse the repository at this point in the history
Signed-off-by: Zhonghu Xu <xuzhonghu@huawei.com>
  • Loading branch information
hzxuzhonghu committed Aug 31, 2024
1 parent 95319fe commit 8862597
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 32 deletions.
20 changes: 20 additions & 0 deletions pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,23 @@ func (c *Cache) RestoreEndpointKeys() {
}
}
}

// GetAllEndpointsForService returns all the endpoints for a service
// Note only used for testing
func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue {
log.Debugf("init endpoint keys")
var (
key = EndpointKey{}
value = EndpointValue{}
)

var res []EndpointValue

iter := c.bpfMap.KmeshEndpoint.Iterate()
for iter.Next(&key, &value) {
if key.ServiceId == serviceId {
res = append(res, value)
}
}
return res
}
43 changes: 21 additions & 22 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"istio.io/istio/pkg/util/sets"

"kmesh.net/kmesh/api/v2/workloadapi"
"kmesh.net/kmesh/api/v2/workloadapi/security"
Expand Down Expand Up @@ -285,24 +286,23 @@ func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValu
}

// handleWorkloadUnboundServices handles when a workload's belonging services removed
func (p *Processor) handleWorkloadUnboundServices(workload *workloadapi.Workload, services []string) error {
if len(services) == 0 {
return nil
}

log.Debugf("handleWorkloadUnboundServices %s: %v", workload.ResourceName(), services)
workloadUid := p.hashName.Hash(workload.GetUid())
allEks := p.bpf.GetEndpointKeys(workloadUid)
needRemove := make([]bpf.EndpointKey, 0, len(services))
for _, serviceName := range services {
serviceId := p.hashName.Hash(serviceName)
for ek := range allEks {
if ek.ServiceId == serviceId {
needRemove = append(needRemove, ek)
}
func (p *Processor) handleWorkloadUnboundServices(workload *workloadapi.Workload) error {
workloadUid := p.hashName.Hash(workload.Uid)
svcSets := sets.New[uint32]()
for svcKey := range workload.Services {
svcSets.Insert(p.hashName.Hash(svcKey))
}
unboundedServices := []uint32{}
eks := p.bpf.GetEndpointKeys(workloadUid)
needRemove := []bpf.EndpointKey{}
for ek := range eks {
if !svcSets.Contains(ek.ServiceId) {
unboundedServices = append(unboundedServices, ek.ServiceId)
needRemove = append(needRemove, ek)
}
}

log.Debugf("handleWorkloadUnboundServices %s: %v", workload.ResourceName(), unboundedServices)
err := p.deleteEndpointRecords(workloadUid, needRemove)
if err != nil {
log.Errorf("removeResidualServices delete endpoint failed:%v", err)
Expand Down Expand Up @@ -382,14 +382,13 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error {
}

func (p *Processor) handleWorkload(workload *workloadapi.Workload) error {
var deletedServices []string
var newServices []string
log.Debugf("handle workload: %s", workload.Uid)

deletedServices, newServices = p.WorkloadCache.AddOrUpdateWorkload(workload)
_, newServices = p.WorkloadCache.AddOrUpdateWorkload(workload)

// TODO: how can we know service on restart? maybe also rely on endpoint index
if err := p.handleWorkloadUnboundServices(workload, deletedServices); err != nil {
if err := p.handleWorkloadUnboundServices(workload); err != nil {
log.Errorf("handleWorkloadUnboundServices %s failed: %v", workload.ResourceName(), err)
return err
}
Expand Down Expand Up @@ -612,14 +611,14 @@ func (p *Processor) compareWorkloadAndServiceWithHashName() {
bk.BackendUid = num
sk.ServiceId = num
if err := p.bpf.BackendLookup(&bk, &bv); err == nil {
log.Debugf("Find BackendValue: [%#v] RemoveWorkloadResource", bv)
log.Debugf("found BackendValue: [%#v] and removeWorkloadFromBpfMap", bv)
if err := p.removeWorkloadFromBpfMap(str); err != nil {
log.Errorf("RemoveWorkloadResource failed: %v", err)
log.Errorf("removeWorkloadFromBpfMap failed: %v", err)
}
} else if err := p.bpf.ServiceLookup(&sk, &sv); err == nil {
log.Debugf("Find ServiceValue: [%#v] RemoveServiceResource", sv)
log.Debugf("found ServiceValue: [%#v] and removeServiceResourceFromBpfMap", sv)
if err := p.removeServiceResourceFromBpfMap(str); err != nil {
log.Errorf("RemoveServiceResource failed: %v", err)
log.Errorf("removeServiceResourceFromBpfMap failed: %v", err)
}
}
}
Expand Down
195 changes: 185 additions & 10 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
"net/netip"
"testing"

service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"istio.io/istio/pilot/pkg/util/protoconv"
"istio.io/istio/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/rand"

"kmesh.net/kmesh/api/v2/workloadapi"
Expand Down Expand Up @@ -202,6 +205,18 @@ func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workload
assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(fakeSvc.Waypoint.GetHboneMtlsPort()))
}

func checkEndpointMap(t *testing.T, p *Processor, fakeSvc *workloadapi.Service, backendUid []uint32) {
endpoints := p.bpf.GetAllEndpointsForService(p.hashName.Hash(fakeSvc.ResourceName()))
assert.Equal(t, len(endpoints), len(backendUid))

all := sets.New[uint32](backendUid...)
for _, endpoint := range endpoints {
if !all.Contains(endpoint.BackendUid) {
t.Fatalf("endpoint %v, unexpected", endpoint.BackendUid)
}
}
}

func checkBackendMap(t *testing.T, p *Processor, workloadID uint32, wl *workloadapi.Workload) {
var bv bpfcache.BackendValue
err := p.bpf.BackendLookup(&bpfcache.BackendKey{BackendUid: workloadID}, &bv)
Expand Down Expand Up @@ -370,30 +385,174 @@ func createFakeService(name, ip, waypoint string) *workloadapi.Service {
}
}

func createWorkload(name, ip string, networkload workloadapi.NetworkMode, services ...string) *workloadapi.Workload {
workload := workloadapi.Workload{
Uid: "cluster0//Pod/default/" + name,
Namespace: "default",
Name: name,
Addresses: [][]byte{netip.MustParseAddr(ip).AsSlice()},
Network: "testnetwork",
CanonicalName: "foo",
CanonicalRevision: "latest",
WorkloadType: workloadapi.WorkloadType_POD,
WorkloadName: "name",
Status: workloadapi.WorkloadStatus_HEALTHY,
ClusterId: "cluster0",
NetworkMode: networkload,
}
workload.Services = make(map[string]*workloadapi.PortList, len(services))
for _, svc := range services {
workload.Services["default/"+svc+".default.svc.cluster.local"] = &workloadapi.PortList{
Ports: []*workloadapi.Port{
{
ServicePort: 80,
TargetPort: 8080,
},
{
ServicePort: 81,
TargetPort: 8180,
},
{
ServicePort: 82,
TargetPort: 82,
},
},
}
}
return &workload
}

func TestRestart(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)

// 1. handle workload with service, but service not handled yet
// In this case, only frontend map and backend map should be updated.
wl := createTestWorkloadWithService(true)
err := p.handleWorkload(wl)
res := &service_discovery_v3.DeltaDiscoveryResponse{}

// 1. First simulate normal start
// 1.1 add related service
svc1 := createFakeService("svc1", "10.240.10.1", "10.240.10.200")
svc2 := createFakeService("svc2", "10.240.10.2", "10.240.10.200")
svc3 := createFakeService("svc3", "10.240.10.3", "10.240.10.200")
for _, svc := range []*workloadapi.Service{svc1, svc2, svc3} {
addr := serviceToAddress(svc)
res.Resources = append(res.Resources, &service_discovery_v3.Resource{
Resource: protoconv.MessageToAny(addr),
})
}

// 1.2 add workload
wl1 := createWorkload("wl1", "10.244.0.1", workloadapi.NetworkMode_STANDARD, "svc1", "svc2")
wl2 := createWorkload("wl2", "10.244.0.2", workloadapi.NetworkMode_STANDARD, "svc2", "svc3")
wl3 := createWorkload("wl3", "10.244.0.3", workloadapi.NetworkMode_STANDARD, "svc3")
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} {
addr := workloadToAddress(wl)
res.Resources = append(res.Resources, &service_discovery_v3.Resource{
Resource: protoconv.MessageToAny(addr),
})
}

err := p.handleAddressTypeResponse(res)
assert.NoError(t, err)

workloadID := checkFrontEndMap(t, wl.Addresses[0], p)
checkBackendMap(t, p, workloadID, wl)
// check front end map
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} {
checkFrontEndMap(t, wl.Addresses[0], p)
}
for _, svc := range []*workloadapi.Service{svc1, svc2, svc3} {
checkFrontEndMap(t, svc.Addresses[0].Address, p)
}
// check service map
t.Log("1. check service map")
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, 1)
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, 2)
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, 2)
// check endpoint map
t.Log("1. check endpoint map")
checkEndpointMap(t, p, svc1, []uint32{p.hashName.Hash(wl1.ResourceName())})
checkEndpointMap(t, p, svc2, []uint32{p.hashName.Hash(wl1.ResourceName()), p.hashName.Hash(wl2.ResourceName())})
checkEndpointMap(t, p, svc3, []uint32{p.hashName.Hash(wl2.ResourceName()), p.hashName.Hash(wl3.ResourceName())})
// check backend map
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl3} {
checkBackendMap(t, p, p.hashName.Hash(wl.ResourceName()), wl)
}

// 2. Second simulate restart
// Set a restart label and simulate missing data in the cache
bpf.SetStartType(bpf.Restart)
for key := range wl.GetServices() {
p.ServiceCache.DeleteService(key)
// reconstruct a new processor
p = newProcessor(workloadMap)
p.bpf.RestoreEndpointKeys()
// 2.1 simulate workload add/delete during restart
// simulate workload update during restart

// wl1 now only belong to svc1
delete(wl1.Services, "default/svc2.default.svc.cluster.local")
// wl2 now belong to svc1, svc2, svc3
wl2.Services["default/svc1.default.svc.cluster.local"] = &workloadapi.PortList{
Ports: []*workloadapi.Port{
{
ServicePort: 80,
TargetPort: 8080,
},
{
ServicePort: 81,
TargetPort: 8180,
},
{
ServicePort: 82,
TargetPort: 82,
},
},
}

p.compareWorkloadAndServiceWithHashName()
wl4 := createWorkload("wl4", "10.244.0.4", workloadapi.NetworkMode_STANDARD, "svc4")
svc4 := createFakeService("svc4", "10.240.10.4", "10.240.10.200")

// TODO: add more restart cases
res = &service_discovery_v3.DeltaDiscoveryResponse{}
// wl3 deleted during restart
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} {
addr := workloadToAddress(wl)
res.Resources = append(res.Resources, &service_discovery_v3.Resource{
Resource: protoconv.MessageToAny(addr),
})
}

for _, svc := range []*workloadapi.Service{svc1, svc2, svc3, svc4} {
addr := serviceToAddress(svc)
res.Resources = append(res.Resources, &service_discovery_v3.Resource{
Resource: protoconv.MessageToAny(addr),
})
}

err = p.handleAddressTypeResponse(res)
assert.NoError(t, err)

// check front end map
t.Log("2. check front end map")
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} {
checkFrontEndMap(t, wl.Addresses[0], p)
}
for _, svc := range []*workloadapi.Service{svc1, svc2, svc3, svc4} {
checkFrontEndMap(t, svc.Addresses[0].Address, p)
}
// TODO(hzxuzhonghu) check front end map elements number

// check service map
checkServiceMap(t, p, p.hashName.Hash(svc1.ResourceName()), svc1, 2) // svc1 has 2 wl1, wl2
checkServiceMap(t, p, p.hashName.Hash(svc2.ResourceName()), svc2, 1) // svc2 has 1 wl2
checkServiceMap(t, p, p.hashName.Hash(svc3.ResourceName()), svc3, 1) // svc3 has 1 wl2
checkServiceMap(t, p, p.hashName.Hash(svc4.ResourceName()), svc4, 1) // svc4 has 1 wl4
// check endpoint map
checkEndpointMap(t, p, svc1, []uint32{p.hashName.Hash(wl1.ResourceName()), p.hashName.Hash(wl2.ResourceName())})
checkEndpointMap(t, p, svc2, []uint32{p.hashName.Hash(wl2.ResourceName())})
checkEndpointMap(t, p, svc3, []uint32{p.hashName.Hash(wl2.ResourceName())})
checkEndpointMap(t, p, svc4, []uint32{p.hashName.Hash(wl4.ResourceName())})
// check backend map
for _, wl := range []*workloadapi.Workload{wl1, wl2, wl4} {
checkBackendMap(t, p, p.hashName.Hash(wl.ResourceName()), wl)
}

hashNameClean(p)
}
Expand All @@ -413,3 +572,19 @@ func hashNameClean(p *Processor) {
}
p.hashName.Reset()
}

func workloadToAddress(wl *workloadapi.Workload) *workloadapi.Address {
return &workloadapi.Address{
Type: &workloadapi.Address_Workload{
Workload: wl,
},
}
}

func serviceToAddress(service *workloadapi.Service) *workloadapi.Address {
return &workloadapi.Address{
Type: &workloadapi.Address_Service{
Service: service,
},
}
}

0 comments on commit 8862597

Please sign in to comment.