From a6a172072282c1846d7e97f5bf72e6ba5acf1803 Mon Sep 17 00:00:00 2001 From: zhanghonghaobj Date: Thu, 18 Apr 2024 11:40:59 +0800 Subject: [PATCH] feat (processor/k8sattributes): wait for synced when starting --- .chloggen/k8sattributes-block.yaml | 27 +++++++++++++++++ .../k8sattributesprocessor/client_test.go | 5 ++-- .../internal/kube/client.go | 30 +++++++++++++------ .../internal/kube/client_test.go | 2 +- .../internal/kube/fake_informer.go | 4 +-- .../internal/kube/kube.go | 2 +- processor/k8sattributesprocessor/processor.go | 6 +++- 7 files changed, 60 insertions(+), 16 deletions(-) create mode 100644 .chloggen/k8sattributes-block.yaml diff --git a/.chloggen/k8sattributes-block.yaml b/.chloggen/k8sattributes-block.yaml new file mode 100644 index 000000000000..f07469326c4d --- /dev/null +++ b/.chloggen/k8sattributes-block.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/k8sattributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 2a893e52790c..78903fbeeda1 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -70,10 +70,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) { } // Start is a noop for FakeClient. -func (f *fakeClient) Start() { +func (f *fakeClient) Start() error { if f.Informer != nil { - f.Informer.Run(f.StopCh) + go f.Informer.Run(f.StopCh) } + return nil } // Stop is a noop for FakeClient. diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a43049f09ebf..4fdcd3edc7db 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + "errors" "fmt" "regexp" "strings" @@ -189,50 +190,61 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr } // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. -func (c *WatchClient) Start() { - _, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ +func (c *WatchClient) Start() error { + synced := make([]cache.InformerSynced, 0) + reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, DeleteFunc: c.handlePodDelete, }) if err != nil { - c.logger.Error("error adding event handler to pod informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.informer.Run(c.stopCh) - _, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, }) if err != nil { - c.logger.Error("error adding event handler to namespace informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.namespaceInformer.Run(c.stopCh) if c.Rules.DeploymentName || c.Rules.DeploymentUID { - _, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, DeleteFunc: c.handleReplicaSetDelete, }) if err != nil { - c.logger.Error("error adding event handler to replicaset informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.replicasetInformer.Run(c.stopCh) } if c.nodeInformer != nil { - _, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, }) if err != nil { - c.logger.Error("error adding event handler to node informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.nodeInformer.Run(c.stopCh) } + + if !cache.WaitForCacheSync(c.stopCh, synced...) { + return errors.New("failed to wait for caches to sync") + } + + return nil } // Stop signals the the k8s watcher/informer to stop watching for new events. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index de6f1dace5d6..f6fcd8eeeba9 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -181,7 +181,7 @@ func TestClientStartStop(t *testing.T) { done := make(chan struct{}) assert.False(t, fctr.HasStopped()) go func() { - c.Start() + assert.NoError(t, c.Start()) close(done) }() c.Stop() diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index 20a9ca879afc..d25d0f513e0c 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -40,7 +40,7 @@ func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach } func (f *FakeInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *FakeInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { @@ -164,7 +164,7 @@ func (f *NoOpInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach return f.AddEventHandlerWithResyncPeriod(handler, time.Second) } func (f *NoOpInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *NoOpInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 6145cb972235..fc4b775edd00 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -91,7 +91,7 @@ type Client interface { GetPod(PodIdentifier) (*Pod, bool) GetNamespace(string) (*Namespace, bool) GetNode(string) (*Node, bool) - Start() + Start() error Stop() } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 669006690a3b..a6daeaa66906 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -71,7 +71,11 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error } } if !kp.passthroughMode { - go kp.kc.Start() + err := kp.kc.Start() + if err != nil { + kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) + return nil + } } return nil }