From bde28a951521affa45305fa4b6ef3f41319350e8 Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Wed, 16 Nov 2022 16:40:24 -0800 Subject: [PATCH 1/5] [BPF] cleaning up maps is for both TC and XDP Move it from tc package to bpf package with some code reorganization for simplicity. Also lock the clean up lock whenupdating XDP. It wasn't synced till now. --- felix/bpf/bpf.go | 173 ++++++++++++++++++++++++++++ felix/bpf/tc/attach.go | 167 --------------------------- felix/bpf/ut/attach_test.go | 8 +- felix/dataplane/linux/bpf_ep_mgr.go | 34 ++++-- 4 files changed, 203 insertions(+), 179 deletions(-) diff --git a/felix/bpf/bpf.go b/felix/bpf/bpf.go index dae57a07c62..6e1c4166753 100644 --- a/felix/bpf/bpf.go +++ b/felix/bpf/bpf.go @@ -22,6 +22,7 @@ package bpf import ( "bufio" + "bytes" "encoding/binary" "encoding/json" "errors" @@ -45,6 +46,7 @@ import ( "github.com/projectcalico/calico/felix/environment" "github.com/projectcalico/calico/felix/labelindex" "github.com/projectcalico/calico/felix/proto" + "github.com/projectcalico/calico/libcalico-go/lib/set" ) // Hook is the hook to which a BPF program should be attached. This is relative to @@ -2312,3 +2314,174 @@ func MapPinPath(typ int, name, iface string, hook Hook) string { } return path.Join(PinBaseDir, subDir, name) } + +type TcList []struct { + DevName string `json:"devname"` + ID int `json:"id"` +} + +type XDPList []struct { + DevName string `json:"devname"` + IfIndex int `json:"ifindex"` + Mode string `json:"mode"` + ID int `json:"id"` +} + +// ListTcXDPAttachedProgs returns all programs attached to TC or XDP hooks. +func ListTcXDPAttachedProgs() (TcList, XDPList, error) { + // Find all the programs that are attached to interfaces. + out, err := exec.Command("bpftool", "net", "-j").Output() + if err != nil { + return nil, nil, fmt.Errorf("failed to list attached bpf programs: %w", err) + } + + var attached []struct { + TC TcList `json:"tc"` + XDP XDPList `json:"xdp"` + } + + err = json.Unmarshal(out, &attached) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse list of attached BPF programs: %w\n%s", err, out) + } + + return attached[0].TC, attached[0].XDP, nil +} + +func ListPerEPMaps() (map[int]string, error) { + mapIDToPath := make(map[int]string) + err := filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.HasPrefix(info.Name(), JumpMapName()) || + strings.HasPrefix(info.Name(), CountersMapName()) { + log.WithField("path", p).Debug("Examining map") + + out, err := exec.Command("bpftool", "map", "show", "pinned", p).Output() + if err != nil { + log.WithError(err).Panic("Failed to show map") + } + log.WithField("dump", string(out)).Debug("Map show before deletion") + idStr := string(bytes.Split(out, []byte(":"))[0]) + id, err := strconv.Atoi(idStr) + if err != nil { + log.WithError(err).WithField("dump", string(out)).Error("Failed to parse bpftool output.") + return err + } + mapIDToPath[id] = p + } + return nil + }) + + return mapIDToPath, err +} + +// pinDirRegex matches tc's and xdp's auto-created directory names, directories created when using libbpf +// so we can clean them up when removing maps without accidentally removing other user-created dirs.. +var pinDirRegex = regexp.MustCompile(`([0-9a-f]{40})|(.*_(igr|egr|xdp))`) + +// CleanUpMaps scans for cali_jump maps that are still pinned to the filesystem but no longer referenced by +// our BPF programs. +func CleanUpMaps() { + // Find the maps we care about by walking the BPF filesystem. + mapIDToPath, err := ListPerEPMaps() + if os.IsNotExist(err) { + log.WithError(err).Warn("tc directory missing from BPF file system?") + return + } + if err != nil { + log.WithError(err).Error("Error while looking for maps.") + return + } + + aTc, aXdp, err := ListTcXDPAttachedProgs() + if err != nil { + log.WithError(err).Warn("Failed to list attached programs.") + return + } + log.WithFields(log.Fields{"tc": aTc, "xdp": aXdp}).Debug("Attached BPF programs") + + attachedProgs := set.New[int]() + for _, prog := range aTc { + log.WithField("prog", prog).Debug("Adding TC prog to attached set") + attachedProgs.Add(prog.ID) + } + for _, prog := range aXdp { + log.WithField("prog", prog).Debug("Adding XDP prog to attached set") + attachedProgs.Add(prog.ID) + } + + // Find all the maps that the attached programs refer to and remove them from consideration. + progsJSON, err := exec.Command("bpftool", "prog", "list", "--json").Output() + if err != nil { + log.WithError(err).Info("Failed to list BPF programs, assuming there's nothing to clean up.") + return + } + var progs []struct { + ID int `json:"id"` + Name string `json:"name"` + Maps []int `json:"map_ids"` + } + err = json.Unmarshal(progsJSON, &progs) + if err != nil { + log.WithError(err).Info("Failed to parse bpftool output. Assuming nothing to clean up.") + return + } + for _, p := range progs { + if !attachedProgs.Contains(p.ID) { + log.WithField("prog", p).Debug("Prog is not in the attached set, skipping") + continue + } + for _, id := range p.Maps { + log.WithField("mapID", id).WithField("prog", p).Debugf("Map is still in use: %v", mapIDToPath[id]) + delete(mapIDToPath, id) + } + } + + // Remove the pins. + for id, p := range mapIDToPath { + log.WithFields(log.Fields{"id": id, "path": p}).Debug("Removing stale BPF map pin.") + err := os.Remove(p) + if err != nil { + log.WithError(err).Warn("Removed stale BPF map pin.") + } + log.WithFields(log.Fields{"id": id, "path": p}).Info("Removed stale BPF map pin.") + } + + // Look for empty dirs. + emptyAutoDirs := set.New[string]() + err = filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() && pinDirRegex.MatchString(info.Name()) { + p := path.Clean(p) + log.WithField("path", p).Debug("Found tc auto-created dir.") + emptyAutoDirs.Add(p) + } else { + dirPath := path.Clean(path.Dir(p)) + if emptyAutoDirs.Contains(dirPath) { + log.WithField("path", dirPath).Debug("tc dir is not empty.") + emptyAutoDirs.Discard(dirPath) + } + } + return nil + }) + if os.IsNotExist(err) { + log.WithError(err).Warn("tc directory missing from BPF file system?") + return + } + if err != nil { + log.WithError(err).Error("Error while looking for maps.") + } + + emptyAutoDirs.Iter(func(p string) error { + log.WithField("path", p).Debug("Removing empty dir.") + err := os.Remove(p) + if err != nil { + log.WithError(err).Error("Error while removing empty dir.") + } + return nil + }) +} diff --git a/felix/bpf/tc/attach.go b/felix/bpf/tc/attach.go index 9d0cb9f9d56..1911ec2d369 100644 --- a/felix/bpf/tc/attach.go +++ b/felix/bpf/tc/attach.go @@ -15,9 +15,7 @@ package tc import ( - "bytes" "encoding/binary" - "encoding/json" "errors" "fmt" "io/ioutil" @@ -25,16 +23,12 @@ import ( "os" "os/exec" "path" - "path/filepath" "regexp" "strconv" "strings" - "sync" log "github.com/sirupsen/logrus" - "github.com/projectcalico/calico/libcalico-go/lib/set" - "github.com/projectcalico/calico/felix/bpf" "github.com/projectcalico/calico/felix/bpf/bpfutils" "github.com/projectcalico/calico/felix/bpf/libbpf" @@ -66,7 +60,6 @@ type AttachPoint struct { NATout uint32 } -var tcLock sync.RWMutex var ErrDeviceNotFound = errors.New("device not found") var ErrInterrupted = errors.New("dump interrupted") var prefHandleRe = regexp.MustCompile(`pref ([^ ]+) .* handle ([^ ]+)`) @@ -137,13 +130,6 @@ func (ap *AttachPoint) AttachProgram() (int, error) { binaryToLoad = tempBinary } - // Using the RLock allows multiple attach calls to proceed in parallel unless - // CleanUpMaps() (which takes the writer lock) is running. - logCxt.Debug("AttachProgram waiting for lock...") - tcLock.RLock() - defer tcLock.RUnlock() - logCxt.Debug("AttachProgram got lock.") - progsToClean, err := ap.listAttachedPrograms() if err != nil { return -1, err @@ -388,159 +374,6 @@ func (ap *AttachPoint) IsAttached() (bool, error) { return len(progs) > 0, nil } -// tcDirRegex matches tc's and xdp's auto-created directory names, directories created when using libbpf -// so we can clean them up when removing maps without accidentally removing other user-created dirs.. -var tcDirRegex = regexp.MustCompile(`([0-9a-f]{40})|(.*_(igr|egr|xdp))`) - -// CleanUpMaps scans for cali_jump maps that are still pinned to the filesystem but no longer referenced by -// our BPF programs. -func CleanUpMaps() { - // So that we serialise with AttachProgram() - log.Debug("CleanUpMaps waiting for lock...") - tcLock.Lock() - defer tcLock.Unlock() - log.Debug("CleanUpMaps got lock, cleaning up...") - - // Find the maps we care about by walking the BPF filesystem. - mapIDToPath := make(map[int]string) - err := filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if strings.HasPrefix(info.Name(), bpf.JumpMapName()) || - strings.HasPrefix(info.Name(), bpf.CountersMapName()) { - log.WithField("path", p).Debug("Examining map") - - out, err := exec.Command("bpftool", "map", "show", "pinned", p).Output() - if err != nil { - log.WithError(err).Panic("Failed to show map") - } - log.WithField("dump", string(out)).Debug("Map show before deletion") - idStr := string(bytes.Split(out, []byte(":"))[0]) - id, err := strconv.Atoi(idStr) - if err != nil { - log.WithError(err).WithField("dump", string(out)).Error("Failed to parse bpftool output.") - return err - } - mapIDToPath[id] = p - } - return nil - }) - if os.IsNotExist(err) { - log.WithError(err).Warn("tc directory missing from BPF file system?") - return - } - if err != nil { - log.WithError(err).Error("Error while looking for maps.") - } - - // Find all the programs that are attached to interfaces. - out, err := exec.Command("bpftool", "net", "-j").Output() - if err != nil { - log.WithError(err).Panic("Failed to list attached bpf programs") - } - log.WithField("dump", string(out)).Debug("Attached BPF programs") - - var attached []struct { - TC []struct { - DevName string `json:"devname"` - ID int `json:"id"` - } `json:"tc"` - XDP []struct { - DevName string `json:"devname"` - IfIndex int `json:"ifindex"` - Mode string `json:"mode"` - ID int `json:"id"` - } `json:"xdp"` - } - err = json.Unmarshal(out, &attached) - if err != nil { - log.WithError(err).WithField("dump", string(out)).Error("Failed to parse list of attached BPF programs") - } - attachedProgs := set.New[int]() - for _, prog := range attached[0].TC { - log.WithField("prog", prog).Debug("Adding TC prog to attached set") - attachedProgs.Add(prog.ID) - } - for _, prog := range attached[0].XDP { - log.WithField("prog", prog).Debug("Adding XDP prog to attached set") - attachedProgs.Add(prog.ID) - } - - // Find all the maps that the attached programs refer to and remove them from consideration. - progsJSON, err := exec.Command("bpftool", "prog", "list", "--json").Output() - if err != nil { - log.WithError(err).Info("Failed to list BPF programs, assuming there's nothing to clean up.") - return - } - var progs []struct { - ID int `json:"id"` - Name string `json:"name"` - Maps []int `json:"map_ids"` - } - err = json.Unmarshal(progsJSON, &progs) - if err != nil { - log.WithError(err).Info("Failed to parse bpftool output. Assuming nothing to clean up.") - return - } - for _, p := range progs { - if !attachedProgs.Contains(p.ID) { - log.WithField("prog", p).Debug("Prog is not in the attached set, skipping") - continue - } - for _, id := range p.Maps { - log.WithField("mapID", id).WithField("prog", p).Debugf("Map is still in use: %v", mapIDToPath[id]) - delete(mapIDToPath, id) - } - } - - // Remove the pins. - for id, p := range mapIDToPath { - log.WithFields(log.Fields{"id": id, "path": p}).Debug("Removing stale BPF map pin.") - err := os.Remove(p) - if err != nil { - log.WithError(err).Warn("Removed stale BPF map pin.") - } - log.WithFields(log.Fields{"id": id, "path": p}).Info("Removed stale BPF map pin.") - } - - // Look for empty dirs. - emptyAutoDirs := set.New[string]() - err = filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() && tcDirRegex.MatchString(info.Name()) { - p := path.Clean(p) - log.WithField("path", p).Debug("Found tc auto-created dir.") - emptyAutoDirs.Add(p) - } else { - dirPath := path.Clean(path.Dir(p)) - if emptyAutoDirs.Contains(dirPath) { - log.WithField("path", dirPath).Debug("tc dir is not empty.") - emptyAutoDirs.Discard(dirPath) - } - } - return nil - }) - if os.IsNotExist(err) { - log.WithError(err).Warn("tc directory missing from BPF file system?") - return - } - if err != nil { - log.WithError(err).Error("Error while looking for maps.") - } - - emptyAutoDirs.Iter(func(p string) error { - log.WithField("path", p).Debug("Removing empty dir.") - err := os.Remove(p) - if err != nil { - log.WithError(err).Error("Error while removing empty dir.") - } - return nil - }) -} - // EnsureQdisc makes sure that qdisc is attached to the given interface func EnsureQdisc(ifaceName string) error { hasQdisc, err := HasQdisc(ifaceName) diff --git a/felix/bpf/ut/attach_test.go b/felix/bpf/ut/attach_test.go index 7c71eab3d30..93d1a2369f4 100644 --- a/felix/bpf/ut/attach_test.go +++ b/felix/bpf/ut/attach_test.go @@ -74,7 +74,7 @@ func TestReattachPrograms(t *testing.T) { // Start with a clean base state in case another test left something behind. t.Log("Doing initial clean up") - tc.CleanUpMaps() + bpf.CleanUpMaps() bpf.CleanAttachedProgDir() startingJumpMaps := countJumpMaps() @@ -141,7 +141,7 @@ func TestReattachPrograms(t *testing.T) { // Clean up maps, but nothing should change t.Log("Cleaning up, should remove the first map") - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps+3), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs+3), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles+3), "unexpected number of hash files") @@ -155,7 +155,7 @@ func TestReattachPrograms(t *testing.T) { Expect(err).NotTo(HaveOccurred()) err = tc.RemoveQdisc(vethName2) Expect(err).NotTo(HaveOccurred()) - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps+1), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs+1), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles+1), "unexpected number of hash files") @@ -180,7 +180,7 @@ func TestReattachPrograms(t *testing.T) { t.Log("Removing the XDP program and cleaning up its jump map, should return to base state") err = ap3.DetachProgram() Expect(err).NotTo(HaveOccurred()) - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles), "unexpected number of hash files") diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index e07888cb9f3..a474273a3e2 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -170,6 +170,10 @@ type bpfEndpointManager struct { ifacesLock sync.Mutex nameToIface map[string]bpfInterface + // Using the RLock allows multiple attach calls to proceed in parallel unless + // CleanUpMaps() (which takes the writer lock) is running. + cleanupLock sync.RWMutex + allWEPs map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint happyWEPs map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint happyWEPsDirty bool @@ -311,14 +315,10 @@ func newBPFEndpointManager( )), ruleRenderer: iptablesRuleRenderer, iptablesFilterTable: iptablesFilterTable, - mapCleanupRunner: ratelimited.NewRunner(mapCleanupInterval, func(ctx context.Context) { - log.Debug("TC maps cleanup triggered.") - tc.CleanUpMaps() - }), - onStillAlive: livenessCallback, - hostIfaceToEpMap: map[string]proto.HostEndpoint{}, - ifaceToIpMap: map[string]net.IP{}, - opReporter: opReporter, + onStillAlive: livenessCallback, + hostIfaceToEpMap: map[string]proto.HostEndpoint{}, + ifaceToIpMap: map[string]net.IP{}, + opReporter: opReporter, // ipv6Enabled Should be set to config.Ipv6Enabled, but for now it is better // to set it to BPFIpv6Enabled which is a dedicated flag for development of IPv6. // TODO: set ipv6Enabled to config.Ipv6Enabled when IPv6 support is complete @@ -351,6 +351,17 @@ func newBPFEndpointManager( m.dp = m } + m.mapCleanupRunner = ratelimited.NewRunner(mapCleanupInterval, func(ctx context.Context) { + log.Debug("TC maps cleanup triggered.") + // So that we serialise with AttachProgram() + log.Debug("CleanUpMaps waiting for lock...") + m.cleanupLock.Lock() + defer m.cleanupLock.Unlock() + log.Debug("CleanUpMaps got lock, cleaning up...") + + bpf.CleanUpMaps() + }) + if config.FeatureGates != nil { switch config.FeatureGates["BPFConnectTimeLoadBalancingWorkaround"] { case "enabled": @@ -1793,7 +1804,14 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.MapFD, e if jumpMapFD == 0 { ap.Log().Info("Need to attach program") // We don't have a program attached to this interface yet, attach one now. + + ap.Log().Debug("AttachProgram waiting for lock...") + m.cleanupLock.RLock() + ap.Log().Debug("AttachProgram got lock.") + progID, err := ap.AttachProgram() + m.cleanupLock.RUnlock() + if err != nil { return 0, err } From 1794d1509b68ff650f9c49dc7d1d57733572df4e Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Wed, 16 Nov 2022 16:53:28 -0800 Subject: [PATCH 2/5] [BPF] bpfEndpointManager.ensureStarted should exec once in tests too. --- felix/dataplane/linux/bpf_ep_mgr.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index a474273a3e2..0d8cde0fb89 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -752,7 +752,7 @@ func (m *bpfEndpointManager) markExistingWEPDirty(wlID proto.WorkloadEndpointID, func (m *bpfEndpointManager) CompleteDeferredWork() error { // Do one-off initialisation. - m.dp.ensureStarted() + m.startupOnce.Do(m.dp.ensureStarted()) m.applyProgramsToDirtyDataInterfaces() m.updateWEPsInDataplane() @@ -1679,10 +1679,8 @@ func (m *bpfEndpointManager) setRPFilter(iface string, val int) error { } func (m *bpfEndpointManager) ensureStarted() { - m.startupOnce.Do(func() { - log.Info("Starting map cleanup runner.") - m.mapCleanupRunner.Start(context.Background()) - }) + log.Info("Starting map cleanup runner.") + m.mapCleanupRunner.Start(context.Background()) } func (m *bpfEndpointManager) ensureBPFDevices() error { From a3dee7df2cddaa41ebccf91c1d059b16622740ae Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Thu, 17 Nov 2022 11:39:57 -0800 Subject: [PATCH 3/5] [BPF] add ListCalicoAttached() to list calico programs We need that to list programs only attached by calico and not attached to other ifaces by someone else. We may want to clean up someof these if we are not interested in a device anymore. --- felix/bpf/attach.go | 71 +++++++++++++++++++++++++++++++++++++ felix/bpf/bpf.go | 16 +++++---- felix/bpf/ut/attach_test.go | 13 +++++++ 3 files changed, 94 insertions(+), 6 deletions(-) diff --git a/felix/bpf/attach.go b/felix/bpf/attach.go index 7f2cc9b90bf..fac1bb760ae 100644 --- a/felix/bpf/attach.go +++ b/felix/bpf/attach.go @@ -201,3 +201,74 @@ func sha256OfFile(name string) (string, error) { } return hex.EncodeToString(hasher.Sum(nil)), nil } + +// EPAttachInfo tells what programs are attached to an endpoint. +type EPAttachInfo struct { + TCId int + XDPId int + XDPMode string +} + +// ListCalicoAttached list all programs that are calico attached to TC or XDP +// and are rlated to Calico. That is, they have jumpmap pinned in our dir +// hierarchy. +func ListCalicoAttached() (map[string]EPAttachInfo, error) { + aTC, aXDP, err := ListTcXDPAttachedProgs() + if err != nil { + return nil, err + } + + attachedProgIDs := set.New[int]() + + for _, p := range aTC { + attachedProgIDs.Add(p.ID) + } + + for _, p := range aXDP { + attachedProgIDs.Add(p.ID) + } + + maps, err := ListPerEPMaps() + if err != nil { + return nil, err + } + + allProgs, err := GetAllProgs() + if err != nil { + return nil, err + } + + caliProgs := set.New[int]() + + for _, p := range allProgs { + if !attachedProgIDs.Contains(p.Id) { + continue + } + + for _, m := range p.MapIds { + if _, ok := maps[m]; ok { + caliProgs.Add(p.Id) + break + } + } + } + + ai := make(map[string]EPAttachInfo) + + for _, p := range aTC { + if caliProgs.Contains(p.ID) { + ai[p.DevName] = EPAttachInfo{TCId: p.ID} + } + } + + for _, p := range aXDP { + if caliProgs.Contains(p.ID) { + info := ai[p.DevName] + info.XDPId = p.ID + info.XDPMode = p.Mode + ai[p.DevName] = info + } + } + + return ai, nil +} diff --git a/felix/bpf/bpf.go b/felix/bpf/bpf.go index 6e1c4166753..8dd2f2cb7f0 100644 --- a/felix/bpf/bpf.go +++ b/felix/bpf/bpf.go @@ -530,7 +530,7 @@ type perCpuMapEntry []struct { } `json:"values"` } -type progInfo struct { +type ProgInfo struct { Id int `json:"id"` Type string `json:"type"` Tag string `json:"tag"` @@ -1098,7 +1098,7 @@ func (b *BPFLib) GetXDPTag(ifName string) (string, error) { return "", fmt.Errorf("failed to show XDP program (%s): %s\n%s", progPath, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return "", fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1188,7 +1188,7 @@ func (b *BPFLib) GetMapsFromXDP(ifName string) ([]int, error) { if err != nil { return nil, fmt.Errorf("failed to show XDP program (%s): %s\n%s", progPath, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return nil, fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1604,7 +1604,11 @@ func (b *BPFLib) getSkMsgID() (int, error) { return -1, nil } -func getAllProgs() ([]progInfo, error) { +func GetAllProgs() ([]ProgInfo, error) { + return getAllProgs() +} + +func getAllProgs() ([]ProgInfo, error) { prog := "bpftool" args := []string{ "--json", @@ -1619,7 +1623,7 @@ func getAllProgs() ([]progInfo, error) { return nil, fmt.Errorf("failed to get progs: %s\n%s", err, output) } - var progs []progInfo + var progs []ProgInfo err = json.Unmarshal(output, &progs) if err != nil { return nil, fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1674,7 +1678,7 @@ func (b *BPFLib) getSockMapID(progID int) (int, error) { return -1, fmt.Errorf("failed to get sockmap ID for prog %d: %s\n%s", progID, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return -1, fmt.Errorf("cannot parse json output: %v\n%s", err, output) diff --git a/felix/bpf/ut/attach_test.go b/felix/bpf/ut/attach_test.go index 93d1a2369f4..e9c37d41789 100644 --- a/felix/bpf/ut/attach_test.go +++ b/felix/bpf/ut/attach_test.go @@ -139,6 +139,19 @@ func TestReattachPrograms(t *testing.T) { Expect(bpf.RuntimeJSONFilename(ap2.IfaceName(), "egress")).To(BeARegularFile()) Expect(bpf.RuntimeJSONFilename(ap3.IfaceName(), "xdp")).To(BeARegularFile()) + list, err := bpf.ListCalicoAttached() + Expect(err).NotTo(HaveOccurred()) + Expect(list).To(HaveLen(3)) + Expect(list).To(HaveKey(vethName1)) + Expect(list[vethName1].TCId).NotTo(Equal(0)) + Expect(list[vethName1].XDPId).To(Equal(0)) + Expect(list).To(HaveKey(vethName2)) + Expect(list[vethName2].TCId).NotTo(Equal(0)) + Expect(list[vethName2].XDPId).To(Equal(0)) + Expect(list).To(HaveKey(vethName3)) + Expect(list[vethName3].TCId).To(Equal(0)) + Expect(list[vethName3].XDPId).NotTo(Equal(0)) + // Clean up maps, but nothing should change t.Log("Cleaning up, should remove the first map") bpf.CleanUpMaps() From ff4844589375c7790baa8374cc804422767bebd0 Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Fri, 18 Nov 2022 09:47:51 -0800 Subject: [PATCH 4/5] [BPF] detach cali programs from devices no longer in regex It can happen that the device regexp changes and calico programs then remain attached to devices that we do not observe anymore. For instance, user has the default ethX in the regexp, but those devices are bonded and the user does not have bondX in the regexp. The user fixes the regexp to include bondX only, but then the programs stay attached to ethX. User has to remove the programs manually or restart the node. We figure out that the programs on devices not in regexp are ours when those programs have a jump map pinned inthe calico bpffs hierarchy. This is not really a problem for workload devices as they usually disapear and programs get detached then. --- felix/bpf/attach.go | 5 +- felix/bpf/tc/attach.go | 63 ++++++++++------- felix/dataplane/linux/bpf_ep_mgr.go | 81 +++++++++++++++++++-- felix/dataplane/linux/bpf_ep_mgr_test.go | 89 ++++++++++++++++++++++-- 4 files changed, 198 insertions(+), 40 deletions(-) diff --git a/felix/bpf/attach.go b/felix/bpf/attach.go index fac1bb760ae..262f250c7fc 100644 --- a/felix/bpf/attach.go +++ b/felix/bpf/attach.go @@ -209,9 +209,8 @@ type EPAttachInfo struct { XDPMode string } -// ListCalicoAttached list all programs that are calico attached to TC or XDP -// and are rlated to Calico. That is, they have jumpmap pinned in our dir -// hierarchy. +// ListCalicoAttached list all programs that are attached to TC or XDP and are +// related to Calico. That is, they have jumpmap pinned in our dir hierarchy. func ListCalicoAttached() (map[string]EPAttachInfo, error) { aTC, aXDP, err := ListTcXDPAttachedProgs() if err != nil { diff --git a/felix/bpf/tc/attach.go b/felix/bpf/tc/attach.go index 1911ec2d369..fa686336e7c 100644 --- a/felix/bpf/tc/attach.go +++ b/felix/bpf/tc/attach.go @@ -187,30 +187,8 @@ func (ap *AttachPoint) AttachProgram() (int, error) { } logCxt.Info("Program attached to TC.") - var progErrs []error - for _, p := range progsToClean { - log.WithField("prog", p).Debug("Cleaning up old calico program") - attemptCleanup := func() error { - _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") - return err - } - err = attemptCleanup() - if errors.Is(err, ErrInterrupted) { - // This happens if the interface is deleted in the middle of calling tc. - log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") - err = attemptCleanup() - } - if errors.Is(err, ErrDeviceNotFound) { - continue - } - if err != nil { - log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") - progErrs = append(progErrs, err) - } - } - - if len(progErrs) != 0 { - return -1, fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + if err := ap.detachPrograms(progsToClean); err != nil { + return -1, err } // Store information of object in a json file so in future we can skip reattaching it. @@ -240,8 +218,41 @@ func (ap *AttachPoint) patchLogPrefix(logCtx *log.Entry, ifile, ofile string) er } func (ap *AttachPoint) DetachProgram() error { - // We never detach TC programs, so this should not be called. - ap.Log().Panic("DetachProgram is not implemented for TC") + progsToClean, err := ap.listAttachedPrograms() + if err != nil { + return err + } + + return ap.detachPrograms(progsToClean) +} + +func (ap *AttachPoint) detachPrograms(progsToClean []attachedProg) error { + var progErrs []error + for _, p := range progsToClean { + log.WithField("prog", p).Debug("Cleaning up old calico program") + attemptCleanup := func() error { + _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") + return err + } + err := attemptCleanup() + if errors.Is(err, ErrInterrupted) { + // This happens if the interface is deleted in the middle of calling tc. + log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") + err = attemptCleanup() + } + if errors.Is(err, ErrDeviceNotFound) { + continue + } + if err != nil { + log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") + progErrs = append(progErrs, err) + } + } + + if len(progErrs) != 0 { + return fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + } + return nil } diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index 0d8cde0fb89..1f29b2bf23b 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -166,6 +166,9 @@ const ( ) type bpfEndpointManager struct { + initAttaches map[string]bpf.EPAttachInfo + initUnknownIfaces set.Set[string] + // Main store of information about interfaces; indexed on interface name. ifacesLock sync.Mutex nameToIface map[string]bpfInterface @@ -284,6 +287,7 @@ func newBPFEndpointManager( livenessCallback = func() {} } m := &bpfEndpointManager{ + initUnknownIfaces: set.New[string](), dp: dp, allWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, happyWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, @@ -555,6 +559,38 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac } } +func (m *bpfEndpointManager) cleanupOldAttach(iface string, ai bpf.EPAttachInfo) error { + if ai.XDPId != 0 { + ap := xdp.AttachPoint{ + Iface: iface, + // Try all modes in this order + Modes: []bpf.XDPMode{bpf.XDPGeneric, bpf.XDPDriver, bpf.XDPOffload}, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("xdp: %w", err) + } + } + if ai.TCId != 0 { + ap := tc.AttachPoint{ + Iface: iface, + Hook: bpf.HookEgress, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc egress: %w", err) + } + + ap.Hook = bpf.HookIngress + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc ingress: %w", err) + } + } + + return nil +} + func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { log.Debugf("Interface update for %v, state %v", update.Name, update.State) // Should be safe without the lock since there shouldn't be any active background threads @@ -569,6 +605,18 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { } if !m.isDataIface(update.Name) && !m.isWorkloadIface(update.Name) && !m.isL3Iface(update.Name) { + if update.State == ifacemonitor.StateUp { + if ai, ok := m.initAttaches[update.Name]; ok { + if err := m.cleanupOldAttach(update.Name, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", update.Name) + } else { + delete(m.initAttaches, update.Name) + } + } + } + if m.initUnknownIfaces != nil { + m.initUnknownIfaces.Add(update.Name) + } log.WithField("update", update).Debug("Ignoring interface that's neither data nor workload nor L3.") return } @@ -579,6 +627,7 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { // For specific host endpoints OnHEPUpdate doesn't depend on iface state, and has // already stored and mapped as needed. if ifaceIsUp { + delete(m.initAttaches, update.Name) // We require host interfaces to be in non-strict RPF mode so that // packets can return straight to host for services bypassing CTLB. switch update.Name { @@ -752,7 +801,23 @@ func (m *bpfEndpointManager) markExistingWEPDirty(wlID proto.WorkloadEndpointID, func (m *bpfEndpointManager) CompleteDeferredWork() error { // Do one-off initialisation. - m.startupOnce.Do(m.dp.ensureStarted()) + m.startupOnce.Do(func() { + m.dp.ensureStarted() + + m.initUnknownIfaces.Iter(func(iface string) error { + if ai, ok := m.initAttaches[iface]; ok { + if err := m.cleanupOldAttach(iface, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", iface) + } else { + delete(m.initAttaches, iface) + return set.RemoveItem + } + } + return nil + }) + + m.initUnknownIfaces = nil + }) m.applyProgramsToDirtyDataInterfaces() m.updateWEPsInDataplane() @@ -1681,6 +1746,13 @@ func (m *bpfEndpointManager) setRPFilter(iface string, val int) error { func (m *bpfEndpointManager) ensureStarted() { log.Info("Starting map cleanup runner.") m.mapCleanupRunner.Start(context.Background()) + + var err error + + m.initAttaches, err = bpf.ListCalicoAttached() + if err != nil { + log.WithError(err).Warn("Failed to list previously attached programs. We may not clean up some.") + } } func (m *bpfEndpointManager) ensureBPFDevices() error { @@ -1824,8 +1896,7 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.MapFD, e return jumpMapFD, nil } -// Ensure that the specified interface does not have our XDP program, in any mode, but avoid -// touching anyone else's XDP program(s). +// Ensure that the specified attach point does not have our program. func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { // Clean up jump map FD if there is one. @@ -1840,11 +1911,11 @@ func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { } } - // Ensure interface does not have our XDP program attached in any mode. + // Ensure interface does not have our program attached. err := ap.DetachProgram() // Forget the policy debug info - m.removePolicyDebugInfo(ap.IfaceName(), 4, bpf.HookXDP) + m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName()) return err } diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index 0afc490022e..4bd6fae724e 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -53,6 +53,8 @@ type mockDataplane struct { fds map[string]uint32 state map[uint32]polprog.Rules routes map[ip.V4CIDR]struct{} + + ensureStartedFn func() } func newMockDataplane() *mockDataplane { @@ -65,6 +67,9 @@ func newMockDataplane() *mockDataplane { } func (m *mockDataplane) ensureStarted() { + if m.ensureStartedFn != nil { + m.ensureStartedFn() + } } func (m *mockDataplane) ensureBPFDevices() error { @@ -140,6 +145,12 @@ func (m *mockDataplane) setAndReturn(vari **polprog.Rules, key string) func() *p } } +func (m *mockDataplane) programAttached(key string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.fds[key] != 0 +} + func (m *mockDataplane) setRoute(cidr ip.V4CIDR) { m.mutex.Lock() defer m.mutex.Unlock() @@ -168,7 +179,6 @@ var _ = Describe("BPF Endpoint Manager", func() { fibLookupEnabled bool endpointToHostAction string dataIfacePattern string - l3IfacePattern string workloadIfaceRegex string ipSetIDAllocator *idalloc.IDAllocator vxlanMTU int @@ -183,8 +193,7 @@ var _ = Describe("BPF Endpoint Manager", func() { BeforeEach(func() { fibLookupEnabled = true endpointToHostAction = "DROP" - dataIfacePattern = "^((en|wl|ww|sl|ib)[opsx].*|(eth|wlan|wwan).*|tunl0$|wireguard.cali$)" - l3IfacePattern = "^(wireguard.cali$)" + dataIfacePattern = "^eth0" workloadIfaceRegex = "cali" ipSetIDAllocator = idalloc.New() vxlanMTU = 0 @@ -225,7 +234,6 @@ var _ = Describe("BPF Endpoint Manager", func() { Hostname: "uthost", BPFLogLevel: "info", BPFDataIfacePattern: regexp.MustCompile(dataIfacePattern), - BPFL3IfacePattern: regexp.MustCompile(l3IfacePattern), VXLANMTU: vxlanMTU, VXLANPort: rrConfigNormal.VXLANPort, BPFNodePortDSREnabled: nodePortDSR, @@ -255,8 +263,8 @@ var _ = Describe("BPF Endpoint Manager", func() { bpfEpMgr.OnUpdate(&ifaceUpdate{Name: name, State: state, Index: index}) err := bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - if state == ifacemonitor.StateUp { - Expect(ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) + if state == ifacemonitor.StateUp && (bpfEpMgr.isDataIface(name) || bpfEpMgr.isWorkloadIface(name)) { + ExpectWithOffset(1, ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) } } } @@ -415,6 +423,75 @@ var _ = Describe("BPF Endpoint Manager", func() { genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() }) + It("should attach to eth0", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + }) + + Context("with dataIfacePattern changed to eth1", func() { + JustBeforeEach(func() { + dataIfacePattern = "^eth1" + newBpfEpMgr() + + dp.ensureStartedFn = func() { + bpfEpMgr.initAttaches = map[string]bpf.EPAttachInfo{ + "eth0": {TCId: 12345}, + } + } + + }) + + It("should detach from eth0 when eth0 up before first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + + It("should detach from eth0 when eth0 up after first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. We should see it. + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + + err = bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + }) + Context("with eth0 host endpoint", func() { JustBeforeEach(genHEPUpdate("eth0", hostEp)) From 05ea9a94c767e60c3a30c35362a3047e21db6132 Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Fri, 18 Nov 2022 11:49:38 -0800 Subject: [PATCH 5/5] [BPF] fv test for prog cleanups --- felix/docker-image/calico-felix-wrapper | 2 ++ felix/fv/bpf_attach_test.go | 37 +++++++++++++++++++------ felix/fv/infrastructure/felix.go | 20 +++++++++++++ 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/felix/docker-image/calico-felix-wrapper b/felix/docker-image/calico-felix-wrapper index 3c678fe08aa..fb94af42a9e 100755 --- a/felix/docker-image/calico-felix-wrapper +++ b/felix/docker-image/calico-felix-wrapper @@ -42,6 +42,8 @@ while true; do done fi + source /extra-env.sh + echo "calico-felix-wrapper: Starting calico-felix" calico-felix & pid=$! diff --git a/felix/fv/bpf_attach_test.go b/felix/fv/bpf_attach_test.go index 09ff9fecd68..43b6c394562 100644 --- a/felix/fv/bpf_attach_test.go +++ b/felix/fv/bpf_attach_test.go @@ -35,8 +35,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", } var ( - infra infrastructure.DatastoreInfra - felixes []*infrastructure.Felix + infra infrastructure.DatastoreInfra + felix *infrastructure.Felix ) BeforeEach(func() { @@ -51,7 +51,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", }, } - felixes, _ = infrastructure.StartNNodeTopology(1, opts, infra) + felixes, _ := infrastructure.StartNNodeTopology(1, opts, infra) + felix = felixes[0] err := infra.AddAllowToDatastore("host-endpoint=='true'") Expect(err).NotTo(HaveOccurred()) @@ -62,15 +63,11 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", infra.DumpErrorData() } - for _, felix := range felixes { - felix.Stop() - } - + felix.Stop() infra.Stop() }) It("should not reattach bpf programs", func() { - felix := felixes[0] // This should not happen at initial execution of felix, since there is no program attached firstRunBase := felix.WatchStdoutFor(regexp.MustCompile("Program already attached, skip reattaching")) @@ -94,4 +91,28 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", Eventually(secondRunProg2, "10s", "100ms").Should(BeClosed()) Expect(secondRunBase).NotTo(BeClosed()) }) + + It("should clean up programs when BPFDataIfacePattern changes", func() { + By("Starting Felix") + felix.TriggerDelayedStart() + + By("Checking that eth0 has a program") + + Eventually(func() string { + out, _ := felix.ExecOutput("bpftool", "-jp", "net") + return out + }, "15s", "1s").Should(ContainSubstring("eth0")) + + By("Changing env and restarting felix") + + felix.SetEvn(map[string]string{"FELIX_BPFDataIfacePattern": "eth1"}) + felix.Restart() + + By("Checking that eth0 does not have a program anymore") + + Eventually(func() string { + out, _ := felix.ExecOutput("bpftool", "-jp", "net") + return out + }, "15s", "1s").ShouldNot(ContainSubstring("eth0")) + }) }) diff --git a/felix/fv/infrastructure/felix.go b/felix/fv/infrastructure/felix.go index 40574629224..703b8d1f91a 100644 --- a/felix/fv/infrastructure/felix.go +++ b/felix/fv/infrastructure/felix.go @@ -15,6 +15,7 @@ package infrastructure import ( + "bufio" "fmt" "os" "path" @@ -229,6 +230,25 @@ func (f *Felix) Restart() { Eventually(f.GetFelixPID, "10s", "100ms").ShouldNot(Equal(oldPID)) } +func (f *Felix) SetEvn(env map[string]string) { + fn := "extra-env.sh" + + file, err := os.OpenFile("./"+fn, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) + Expect(err).NotTo(HaveOccurred()) + + fw := bufio.NewWriter(file) + + for k, v := range env { + fmt.Fprintf(fw, "export %s=%v\n", k, v) + } + + fw.Flush() + file.Close() + + err = f.CopyFileIntoContainer("./"+fn, "/"+fn) + Expect(err).NotTo(HaveOccurred()) +} + // AttachTCPDump returns tcpdump attached to the container func (f *Felix) AttachTCPDump(iface string) *tcpdump.TCPDump { return tcpdump.Attach(f.Container.Name, "", iface)