Skip to content

Commit

Permalink
Merge pull request #783 from hzxuzhonghu/endpointIndex-optimize
Browse files Browse the repository at this point in the history
Optimize endpoint index getter and also rename some methods to be mor…
  • Loading branch information
hzxuzhonghu authored Sep 2, 2024
2 parents b620b4f + 50b4405 commit 3bb72e2
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 433 deletions.
100 changes: 94 additions & 6 deletions pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bpfcache

import (
"github.com/cilium/ebpf"
"istio.io/istio/pkg/util/sets"
)

type EndpointKey struct {
Expand All @@ -31,33 +32,120 @@ type EndpointValue struct {

func (c *Cache) EndpointUpdate(key *EndpointKey, value *EndpointValue) error {
log.Debugf("EndpointUpdate [%#v], [%#v]", *key, *value)
// update endpointKeys index
if c.endpointKeys[value.BackendUid] == nil {
c.endpointKeys[value.BackendUid] = sets.New[EndpointKey](*key)
} else {
c.endpointKeys[value.BackendUid].Insert(*key)
}

return c.bpfMap.KmeshEndpoint.Update(key, value, ebpf.UpdateAny)
}

func (c *Cache) EndpointDelete(key *EndpointKey) error {
log.Debugf("EndpointDelete [%#v]", *key)
value := &EndpointValue{}
// update endpointKeys index
if err := c.bpfMap.KmeshEndpoint.Lookup(key, value); err != nil {
log.Infof("endpoint [%#v] does not exist", key)
return nil
}
c.endpointKeys[value.BackendUid].Delete(*key)
if len(c.endpointKeys[value.BackendUid]) == 0 {
delete(c.endpointKeys, value.BackendUid)
}

return c.bpfMap.KmeshEndpoint.Delete(key)
}

// EndpointSwap update the last endpoint index and remove the current endpoint
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
if currentIndex == lastIndex {
return c.EndpointDelete(&EndpointKey{
ServiceId: serviceId,
BackendIndex: lastIndex,
})
}
lastKey := &EndpointKey{
ServiceId: serviceId,
BackendIndex: lastIndex,
}
lastValue := &EndpointValue{}
if err := c.EndpointLookup(lastKey, lastValue); err != nil {
return err
}

currentKey := &EndpointKey{
ServiceId: serviceId,
BackendIndex: currentIndex,
}
currentValue := &EndpointValue{}
if err := c.EndpointLookup(currentKey, currentValue); err != nil {
return err
}

// update the last endpoint's index, in other word delete the current endpoint
if err := c.bpfMap.KmeshEndpoint.Update(currentKey, lastValue, ebpf.UpdateAny); err != nil {
return err
}

// delete the duplicate last endpoint
if err := c.bpfMap.KmeshEndpoint.Delete(lastKey); err != nil {
return err
}

// delete index for the current endpoint
c.endpointKeys[currentValue.BackendUid].Delete(*currentKey)
if len(c.endpointKeys[currentValue.BackendUid]) == 0 {
delete(c.endpointKeys, currentValue.BackendUid)
}

// update the last endpoint index
c.endpointKeys[lastValue.BackendUid].Delete(*lastKey)
c.endpointKeys[lastValue.BackendUid].Insert(*currentKey)
return nil
}

func (c *Cache) EndpointLookup(key *EndpointKey, value *EndpointValue) error {
log.Debugf("EndpointLookup [%#v]", *key)
return c.bpfMap.KmeshEndpoint.Lookup(key, value)
}

func (c *Cache) EndpointIterFindKey(workloadUid uint32) []EndpointKey {
log.Debugf("EndpointIterFindKey [%#v]", workloadUid)
// RestoreEndpointKeys called on restart to construct endpoint indexes from bpf map
func (c *Cache) RestoreEndpointKeys() {
log.Debugf("init endpoint keys")
var (
key = EndpointKey{}
value = EndpointValue{}
iter = c.bpfMap.KmeshEndpoint.Iterate()
)

res := make([]EndpointKey, 0)
iter := c.bpfMap.KmeshEndpoint.Iterate()
for iter.Next(&key, &value) {
if value.BackendUid == workloadUid {
res = append(res, key)
// update endpointKeys index
if c.endpointKeys[value.BackendUid] == nil {
c.endpointKeys[value.BackendUid] = sets.New[EndpointKey](key)
} else {
c.endpointKeys[value.BackendUid].Insert(key)
}
}
}

// GetAllEndpointsForService returns all the endpoints for a service
// Note only used for testing
func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue {
log.Debugf("init endpoint keys")
var (
key = EndpointKey{}
value = EndpointValue{}
)

var res []EndpointValue

iter := c.bpfMap.KmeshEndpoint.Iterate()
for iter.Next(&key, &value) {
if key.ServiceId == serviceId {
res = append(res, value)
}
}
return res
}
15 changes: 14 additions & 1 deletion pkg/controller/workload/bpfcache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package bpfcache

import (
"istio.io/istio/pkg/util/sets"

"kmesh.net/kmesh/bpf/kmesh/bpf2go"
"kmesh.net/kmesh/pkg/logger"
)
Expand All @@ -25,10 +27,21 @@ var log = logger.NewLoggerField("workload_bpfcache")

type Cache struct {
bpfMap bpf2go.KmeshCgroupSockWorkloadMaps
// endpointKeys by workload uid
endpointKeys map[uint32]sets.Set[EndpointKey]
}

func NewCache(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Cache {
return &Cache{
bpfMap: workloadMap,
bpfMap: workloadMap,
endpointKeys: make(map[uint32]sets.Set[EndpointKey]),
}
}

func (c *Cache) GetEndpointKeys(workloadID uint32) sets.Set[EndpointKey] {
if c == nil {
return nil
}

return c.endpointKeys[workloadID]
}
120 changes: 22 additions & 98 deletions pkg/controller/workload/cache/workload_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"google.golang.org/protobuf/proto"
"istio.io/istio/pkg/util/sets"

"kmesh.net/kmesh/api/v2/workloadapi"
)
Expand All @@ -31,86 +32,23 @@ type WorkloadCache interface {
AddOrUpdateWorkload(workload *workloadapi.Workload) (deletedServices []string, newServices []string)
DeleteWorkload(uid string)
List() []*workloadapi.Workload
GetRelationShip(workloadId uint32, serviceId uint32) (uint32, bool)
UpdateRelationShip(workloadId uint32, serviceId uint32, relationId uint32)
DeleteRelationShip(serviceId uint32, relationId uint32)
}

type NetworkAddress struct {
Network string
Address netip.Addr
}

type ServiceRelationShipByWorkload struct {
workloadId uint32
serviceId uint32
}

type ServiceRelationShipById struct {
serviceId uint32
relationId uint32
}

type cache struct {
byUid map[string]*workloadapi.Workload
byAddr map[NetworkAddress]*workloadapi.Workload
relationShipByWorkload map[ServiceRelationShipByWorkload]uint32
relationShipById map[ServiceRelationShipById]uint32
mutex sync.RWMutex
byUid map[string]*workloadapi.Workload
byAddr map[NetworkAddress]*workloadapi.Workload
mutex sync.RWMutex
}

func NewWorkloadCache() *cache {
return &cache{
byUid: make(map[string]*workloadapi.Workload),
byAddr: make(map[NetworkAddress]*workloadapi.Workload),
relationShipByWorkload: make(map[ServiceRelationShipByWorkload]uint32),
relationShipById: make(map[ServiceRelationShipById]uint32),
}
}

func (w *cache) GetRelationShip(workloadId uint32, serviceId uint32) (uint32, bool) {
var relationKey = ServiceRelationShipByWorkload{
workloadId: workloadId,
serviceId: serviceId,
}

w.mutex.RLock()
defer w.mutex.RUnlock()
relationId, exist := w.relationShipByWorkload[relationKey]
return relationId, exist
}

func (w *cache) UpdateRelationShip(workloadId uint32, serviceId uint32, relationId uint32) {
var relationKey = ServiceRelationShipByWorkload{
workloadId: workloadId,
serviceId: serviceId,
}
var relationKeyById = ServiceRelationShipById{
serviceId: serviceId,
relationId: relationId,
}

w.mutex.Lock()
defer w.mutex.Unlock()
w.relationShipByWorkload[relationKey] = relationId
w.relationShipById[relationKeyById] = workloadId
}

func (w *cache) DeleteRelationShip(serviceId uint32, relationId uint32) {
var relationKeyById = ServiceRelationShipById{
serviceId: serviceId,
relationId: relationId,
}

w.mutex.Lock()
defer w.mutex.Unlock()
if workloadId, ok := w.relationShipById[relationKeyById]; ok {
delete(w.relationShipById, relationKeyById)
var relationKey = ServiceRelationShipByWorkload{
workloadId: workloadId,
serviceId: serviceId,
}
delete(w.relationShipByWorkload, relationKey)
byUid: make(map[string]*workloadapi.Workload),
byAddr: make(map[NetworkAddress]*workloadapi.Workload),
}
}

Expand All @@ -133,62 +71,48 @@ func composeNetworkAddress(network string, addr netip.Addr) NetworkAddress {
}
}

func (w *cache) getUniqueServicesOnLeftWorkload(workload1, workload2 *workloadapi.Workload) []string {
var diff []string
if workload1 == nil {
return diff
func getServicesOnWorkload(workload *workloadapi.Workload) sets.String {
if workload == nil {
return nil
}

for key := range workload1.Services {
if workload2 == nil {
diff = append(diff, key)
continue
}
if _, exist := workload2.Services[key]; !exist {
diff = append(diff, key)
}
sets := sets.New[string]()
for key := range workload.Services {
sets.Insert(key)
}
return diff
return sets
}

func (w *cache) compareWorkloadServices(workload1, workload2 *workloadapi.Workload) ([]string, []string) {
dels := w.getUniqueServicesOnLeftWorkload(workload1, workload2)
news := w.getUniqueServicesOnLeftWorkload(workload2, workload1)
return dels, news
left := getServicesOnWorkload(workload1)
right := getServicesOnWorkload(workload2)
return left.Diff(right)
}

func (w *cache) AddOrUpdateWorkload(workload *workloadapi.Workload) ([]string, []string) {
var deletedServices []string
var newServices []string
var deletedServices, newServices []string

if workload == nil {
return nil, nil
}
uid := workload.Uid

w.mutex.Lock()
defer w.mutex.Unlock()

oldWorkload, exist := w.byUid[uid]
oldWorkload, exist := w.byUid[workload.Uid]
if exist {
if proto.Equal(workload, oldWorkload) {
return nil, nil
}
// remove same uid but old address workload, avoid leak workload by address.
for _, ip := range oldWorkload.Addresses {
addr, _ := netip.AddrFromSlice(ip)
networkAddress := composeNetworkAddress(oldWorkload.Network, addr)
delete(w.byAddr, networkAddress)
}

// compare services
deletedServices, newServices = w.compareWorkloadServices(oldWorkload, workload)
} else {
deletedServices = nil
newServices = w.getUniqueServicesOnLeftWorkload(workload, oldWorkload)
for key := range workload.Services {
newServices = append(newServices, key)
}
}

w.byUid[uid] = workload
w.byUid[workload.Uid] = workload

// We should exclude the workloads that use host network mode
// Since they are using the host ip, we can not use address to identify them
Expand Down
Loading

0 comments on commit 3bb72e2

Please sign in to comment.