Skip to content

Commit

Permalink
feat (processor/k8sattributes): wait for synced when starting
Browse files Browse the repository at this point in the history
  • Loading branch information
h0cheung committed Aug 8, 2024
1 parent 5f4d198 commit a6a1720
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sattributes-block.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 3 additions & 2 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
}

// Start is a noop for FakeClient.
func (f *fakeClient) Start() {
func (f *fakeClient) Start() error {
if f.Informer != nil {
f.Informer.Run(f.StopCh)
go f.Informer.Run(f.StopCh)
}
return nil
}

// Stop is a noop for FakeClient.
Expand Down
30 changes: 21 additions & 9 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
Expand Down Expand Up @@ -189,50 +190,61 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr
}

// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() {
_, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
func (c *WatchClient) Start() error {
synced := make([]cache.InformerSynced, 0)
reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
DeleteFunc: c.handlePodDelete,
})
if err != nil {
c.logger.Error("error adding event handler to pod informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.informer.Run(c.stopCh)

_, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
})
if err != nil {
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.namespaceInformer.Run(c.stopCh)

if c.Rules.DeploymentName || c.Rules.DeploymentUID {
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleReplicaSetAdd,
UpdateFunc: c.handleReplicaSetUpdate,
DeleteFunc: c.handleReplicaSetDelete,
})
if err != nil {
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.replicasetInformer.Run(c.stopCh)
}

if c.nodeInformer != nil {
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNodeAdd,
UpdateFunc: c.handleNodeUpdate,
DeleteFunc: c.handleNodeDelete,
})
if err != nil {
c.logger.Error("error adding event handler to node informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.nodeInformer.Run(c.stopCh)
}

if !cache.WaitForCacheSync(c.stopCh, synced...) {
return errors.New("failed to wait for caches to sync")
}

return nil
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestClientStartStop(t *testing.T) {
done := make(chan struct{})
assert.False(t, fctr.HasStopped())
go func() {
c.Start()
assert.NoError(t, c.Start())
close(done)
}()
c.Stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach
}

func (f *FakeInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
return nil, nil
return f, nil
}

func (f *FakeInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (f *NoOpInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach
return f.AddEventHandlerWithResyncPeriod(handler, time.Second)
}
func (f *NoOpInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
return nil, nil
return f, nil
}

func (f *NoOpInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error {
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
GetPod(PodIdentifier) (*Pod, bool)
GetNamespace(string) (*Namespace, bool)
GetNode(string) (*Node, bool)
Start()
Start() error
Stop()
}

Expand Down
6 changes: 5 additions & 1 deletion processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error
}
}
if !kp.passthroughMode {
go kp.kc.Start()
err := kp.kc.Start()
if err != nil {
kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
return nil
}
}
return nil
}
Expand Down

0 comments on commit a6a1720

Please sign in to comment.