Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
marctc committed Oct 1, 2024
1 parent 31cb9d9 commit ccbd2a2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
3 changes: 2 additions & 1 deletion pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,15 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs
wk.processByContainer[containerInfo.ContainerID] = procInfo

if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok {
wk.log.Info("Pod found for container", "containerID", containerInfo.ContainerID, "pod", pod.Name)
procInfo = withMetadata(procInfo, pod)
}
return procInfo, true
}

func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] {
wk.updateNewPodsByOwnerIndex(pod)

wk.log.Info("Pod added", "namespace", pod.Namespace, "name", pod.Name, "containers", pod.ContainerIDs)
var events []Event[processAttrs]
for _, containerID := range pod.ContainerIDs {
if procInfo, ok := wk.processByContainer[containerID]; ok {
Expand Down
18 changes: 13 additions & 5 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ var nodeIndexers = cache.Indexers{
// GetContainerPod fetches metadata from a Pod given the name of one of its containers
func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {
objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID)
k.log.Info("fetching pod by container ID", "containerID", containerID, "objs", objs, "len", len(objs))
if err != nil {
k.log.Warn("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID)
return nil, false
Expand All @@ -128,7 +129,6 @@ func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {

func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFactory) error {
pods := informerFactory.Core().V1().Pods().Informer()

k.initContainerListeners(pods)

// Transform any *v1.Pod instance into a *PodInfo instance to save space
Expand Down Expand Up @@ -166,6 +166,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
}
k.log.Info("pod IPs", "pod", pod.Name, "ip", ip.IP)
}

owner := OwnerFrom(pod.OwnerReferences)
Expand Down Expand Up @@ -267,16 +268,16 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", "kind-kind").String()
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", os.Getenv("HOSTNAME")).String()
k.log.Info("using field selector", "selector", fieldSelector)
opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
})
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts)
if err := k.initPodInformer(informerFactory); err != nil {
informerFactoryPod := informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts)
if err := k.initPodInformer(informerFactoryPod); err != nil {
return err
}
informerFactory = informers.NewSharedInformerFactory(client, resyncTime)
informerFactory := informers.NewSharedInformerFactory(client, resyncTime)
if err := k.initNodeIPInformer(informerFactory); err != nil {
return err
}
Expand All @@ -287,15 +288,22 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(ctx.Done())
informerFactoryPod.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
finishedCacheSyncPod := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(ctx.Done())
informerFactoryPod.WaitForCacheSync(ctx.Done())
close(finishedCacheSync)
close(finishedCacheSyncPod)
}()
select {
case <-finishedCacheSync:
log.Debug("kubernetes informers started")
return nil
case <-finishedCacheSyncPod:
log.Debug("kubernetes informers started")
return nil
case <-time.After(syncTimeout):
return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout)
}
Expand Down

0 comments on commit ccbd2a2

Please sign in to comment.