From ccbd2a21af56d6466b083a69209c641da7cdbcd0 Mon Sep 17 00:00:00 2001 From: Marc Tuduri Date: Tue, 1 Oct 2024 15:45:04 +0200 Subject: [PATCH] working version --- pkg/internal/discover/watcher_kube.go | 3 ++- pkg/internal/kube/informer.go | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/internal/discover/watcher_kube.go b/pkg/internal/discover/watcher_kube.go index b21ffe072..76e3a0b14 100644 --- a/pkg/internal/discover/watcher_kube.go +++ b/pkg/internal/discover/watcher_kube.go @@ -186,6 +186,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs wk.processByContainer[containerInfo.ContainerID] = procInfo if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok { + wk.log.Info("Pod found for container", "containerID", containerInfo.ContainerID, "pod", pod.Name) procInfo = withMetadata(procInfo, pod) } return procInfo, true @@ -193,7 +194,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] { wk.updateNewPodsByOwnerIndex(pod) - + wk.log.Info("Pod added", "namespace", pod.Namespace, "name", pod.Name, "containers", pod.ContainerIDs) var events []Event[processAttrs] for _, containerID := range pod.ContainerIDs { if procInfo, ok := wk.processByContainer[containerID]; ok { diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index c376e4733..1e9596af9 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -116,6 +116,7 @@ var nodeIndexers = cache.Indexers{ // GetContainerPod fetches metadata from a Pod given the name of one of its containers func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID) + k.log.Info("fetching pod by container ID", "containerID", containerID, "objs", objs, "len", len(objs)) if err != nil { k.log.Warn("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID) return nil, false @@ -128,7 +129,6 @@ func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFactory) error { pods := informerFactory.Core().V1().Pods().Informer() - k.initContainerListeners(pods) // Transform any *v1.Pod instance into a *PodInfo instance to save space @@ -166,6 +166,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto if ip.IP != pod.Status.HostIP { ips = append(ips, ip.IP) } + k.log.Info("pod IPs", "pod", pod.Name, "ip", ip.IP) } owner := OwnerFrom(pod.OwnerReferences) @@ -267,16 +268,16 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac if syncTimeout <= 0 { syncTimeout = defaultSyncTimeout } - fieldSelector := fields.OneTermEqualSelector("spec.nodeName", "kind-kind").String() + fieldSelector := fields.OneTermEqualSelector("spec.nodeName", os.Getenv("HOSTNAME")).String() k.log.Info("using field selector", "selector", fieldSelector) opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector }) - informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts) - if err := k.initPodInformer(informerFactory); err != nil { + informerFactoryPod := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts) + if err := k.initPodInformer(informerFactoryPod); err != nil { return err } - informerFactory = informers.NewSharedInformerFactory(client, resyncTime) + informerFactory := informers.NewSharedInformerFactory(client, resyncTime) if err := k.initNodeIPInformer(informerFactory); err != nil { return err } @@ -287,15 +288,22 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac log := klog() log.Debug("starting kubernetes informers, waiting for syncronization") informerFactory.Start(ctx.Done()) + informerFactoryPod.Start(ctx.Done()) finishedCacheSync := make(chan struct{}) + finishedCacheSyncPod := make(chan struct{}) go func() { informerFactory.WaitForCacheSync(ctx.Done()) + informerFactoryPod.WaitForCacheSync(ctx.Done()) close(finishedCacheSync) + close(finishedCacheSyncPod) }() select { case <-finishedCacheSync: log.Debug("kubernetes informers started") return nil + case <-finishedCacheSyncPod: + log.Debug("kubernetes informers started") + return nil case <-time.After(syncTimeout): return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout) }