Skip to content

Commit

Permalink
feat (processor/k8sattributes): add timeout for waiting.
Browse files Browse the repository at this point in the history
  • Loading branch information
h0cheung committed Aug 8, 2024
1 parent a6a1720 commit 33db79c
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .chloggen/k8sattributes-block.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ component: processor/k8sattributes
note: Block when starting util the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: []
issues: [32556]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
Expand Down
15 changes: 15 additions & 0 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,21 @@ the processor associates the received trace to the pod, based on the connection
}
```

By default, the processor will be ready as soon as it starts, even no metadata has been fetched yet.
If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with.

To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`.
Then the processor will not be ready until the metadata has been synced.
If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit.
The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option.

example for setting the processor to wait for metadata to be synced before it is ready:

```yaml
wait_for_metadata: true
wait_for_metadata_timeout: 10s
```

## Extracting attributes from pod labels and annotations

The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.
Expand Down
4 changes: 3 additions & 1 deletion processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package k8sattributesprocessor

import (
"time"

"go.opentelemetry.io/collector/component"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
import (
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -46,6 +47,12 @@ type Config struct {
// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`

// WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting.
WaitForMetadata bool `mapstructure:"wait_for_metadata"`

// WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"`
}

func (cfg *Config) Validate() error {
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down Expand Up @@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -38,6 +39,7 @@ func createDefaultConfig() component.Config {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
}
}

Expand Down Expand Up @@ -165,5 +167,10 @@ func createProcessorOpts(cfg component.Config) []option {

opts = append(opts, withExcludes(oCfg.Exclude))

opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout))
if oCfg.WaitForMetadata {
opts = append(opts, withWaitForMetadata(true))
}

return opts
}
60 changes: 35 additions & 25 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister(

// WatchClient is the main interface provided by this package to a kubernetes cluster.
type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
waitForMetadata bool
waitForMetadataTimeout time.Duration

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand Down Expand Up @@ -85,21 +87,23 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet, waitForMetadata bool, waitForMetadataTimeout time.Duration) (Client, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
}
c := &WatchClient{
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
waitForMetadata: waitForMetadata,
waitForMetadataTimeout: waitForMetadataTimeout,
}
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

Expand Down Expand Up @@ -240,10 +244,16 @@ func (c *WatchClient) Start() error {
go c.nodeInformer.Run(c.stopCh)
}

if !cache.WaitForCacheSync(c.stopCh, synced...) {
return errors.New("failed to wait for caches to sync")
if c.waitForMetadata {
timeoutCh := make(chan struct{})
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
close(timeoutCh)
})
defer t.Stop()
if !cache.WaitForCacheSync(timeoutCh, synced...) {
return errors.New("failed to wait for caches to sync")
}
}

return nil
}

Expand Down
21 changes: 5 additions & 16 deletions processor/k8sattributesprocessor/internal/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,18 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) {
}

func TestDefaultClientset(t *testing.T) {
c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil)
c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil, false, 10*time.Second)
assert.Error(t, err)
assert.Equal(t, "invalid authType for kubernetes: ", err.Error())
assert.Nil(t, c)

c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil)
c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil, false, 10*time.Second)
assert.NoError(t, err)
assert.NotNil(t, c)
}

func TestBadFilters(t *testing.T) {
c, err := New(
componenttest.NewNopTelemetrySettings(),
k8sconfig.APIConfig{},
ExtractionRules{},
Filters{Fields: []FieldFilter{{Op: selection.Exists}}},
[]Association{},
Excludes{},
newFakeAPIClientset,
NewFakeInformer,
NewFakeNamespaceInformer,
NewFakeReplicaSetInformer,
)
c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, Excludes{}, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second)
assert.Error(t, err)
assert.Nil(t, c)
}
Expand Down Expand Up @@ -202,7 +191,7 @@ func TestConstructorErrors(t *testing.T) {
gotAPIConfig = c
return nil, fmt.Errorf("error creating k8s client")
}
c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil)
c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil, false, 10*time.Second)
assert.Nil(t, c)
assert.Error(t, err)
assert.Equal(t, "error creating k8s client", err.Error())
Expand Down Expand Up @@ -1883,7 +1872,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o
},
},
}
c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer)
c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second)
require.NoError(t, err)
return c.(*WatchClient), logs
}
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Client interface {
}

// ClientProvider defines a func type that returns a new Client.
type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error)
type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet, bool, time.Duration) (Client, error)

// APIClientsetProvider defines a func type that initializes and return a new kubernetes
// Clientset object.
Expand Down
17 changes: 17 additions & 0 deletions processor/k8sattributesprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"regexp"
"time"

conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"k8s.io/apimachinery/pkg/selection"
Expand Down Expand Up @@ -381,3 +382,19 @@ func withExcludes(podExclude ExcludeConfig) option {
return nil
}
}

// withWaitForMetadata allows specifying whether to wait for pod metadata to be synced.
func withWaitForMetadata(wait bool) option {
return func(p *kubernetesprocessor) error {
p.waitForMetadata = wait
return nil
}
}

// withWaitForMetadataTimeout allows specifying the timeout for waiting for pod metadata to be synced.
func withWaitForMetadataTimeout(timeout time.Duration) option {
return func(p *kubernetesprocessor) error {
p.waitForMetadataTimeout = timeout
return nil
}
}
33 changes: 18 additions & 15 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strconv"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -25,25 +26,27 @@ const (
)

type kubernetesprocessor struct {
cfg component.Config
options []option
telemetrySettings component.TelemetrySettings
logger *zap.Logger
apiConfig k8sconfig.APIConfig
kc kube.Client
passthroughMode bool
rules kube.ExtractionRules
filters kube.Filters
podAssociations []kube.Association
podIgnore kube.Excludes
cfg component.Config
options []option
telemetrySettings component.TelemetrySettings
logger *zap.Logger
apiConfig k8sconfig.APIConfig
kc kube.Client
passthroughMode bool
rules kube.ExtractionRules
filters kube.Filters
podAssociations []kube.Association
podIgnore kube.Excludes
waitForMetadata bool
waitForMetadataTimeout time.Duration
}

func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error {
if kubeClient == nil {
kubeClient = kube.New
}
if !kp.passthroughMode {
kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil)
kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil, kp.waitForMetadata, kp.waitForMetadataTimeout)
if err != nil {
return err
}
Expand All @@ -58,7 +61,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error
for _, opt := range allOptions {
if err := opt(kp); err != nil {
kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
return nil
return err
}
}

Expand All @@ -67,14 +70,14 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error
err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider)
if err != nil {
kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
return nil
return err
}
}
if !kp.passthroughMode {
err := kp.kc.Start()
if err != nil {
kp.telemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
return nil
return err
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestNewProcessor(t *testing.T) {
}

func TestProcessorBadClientProvider(t *testing.T) {
clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) {
return nil, fmt.Errorf("bad client error")
}

Expand Down

0 comments on commit 33db79c

Please sign in to comment.