diff --git a/receiver/k8seventsreceiver/factory.go b/receiver/k8seventsreceiver/factory.go index d4bf18300779..5965f992bcf9 100644 --- a/receiver/k8seventsreceiver/factory.go +++ b/receiver/k8seventsreceiver/factory.go @@ -38,10 +38,5 @@ func createLogsReceiver( ) (receiver.Logs, error) { rCfg := cfg.(*Config) - k8sInterface, err := rCfg.getK8sClient() - if err != nil { - return nil, err - } - - return newReceiver(params, rCfg, consumer, k8sInterface) + return newReceiver(params, rCfg, consumer) } diff --git a/receiver/k8seventsreceiver/factory_test.go b/receiver/k8seventsreceiver/factory_test.go index 35cb96508de1..c79739ee58d9 100644 --- a/receiver/k8seventsreceiver/factory_test.go +++ b/receiver/k8seventsreceiver/factory_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" k8s "k8s.io/client-go/kubernetes" @@ -42,8 +43,9 @@ func TestCreateReceiver(t *testing.T) { context.Background(), receivertest.NewNopCreateSettings(), rCfg, consumertest.NewNop(), ) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, r) // Override for test. rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) { @@ -54,6 +56,7 @@ func TestCreateReceiver(t *testing.T) { receivertest.NewNopCreateSettings(), rCfg, consumertest.NewNop(), ) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) - assert.NotNil(t, r) } diff --git a/receiver/k8seventsreceiver/generated_component_test.go b/receiver/k8seventsreceiver/generated_component_test.go new file mode 100644 index 000000000000..2a91346ba1e2 --- /dev/null +++ b/receiver/k8seventsreceiver/generated_component_test.go @@ -0,0 +1,89 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package k8seventsreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +// assertNoErrorHost implements a component.Host that asserts that there were no errors. +type assertNoErrorHost struct { + component.Host + *testing.T +} + +var _ component.Host = (*assertNoErrorHost)(nil) + +// newAssertNoErrorHost returns a new instance of assertNoErrorHost. +func newAssertNoErrorHost(t *testing.T) component.Host { + return &assertNoErrorHost{ + componenttest.NewNopHost(), + t, + } +} + +func (aneh *assertNoErrorHost) ReportFatalError(err error) { + assert.NoError(aneh, err) +} + +func Test_ComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + + t.Run(test.name+"-lifecycle", func(t *testing.T) { + + // TODO support lifecycle + t.SkipNow() + + firstRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := newAssertNoErrorHost(t) + require.NoError(t, err) + require.NoError(t, firstRcvr.Start(context.Background(), host)) + require.NoError(t, firstRcvr.Shutdown(context.Background())) + secondRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + require.NoError(t, secondRcvr.Start(context.Background(), host)) + require.NoError(t, secondRcvr.Shutdown(context.Background())) + }) + } +} diff --git a/receiver/k8seventsreceiver/metadata.yaml b/receiver/k8seventsreceiver/metadata.yaml index c8e1e10b1127..febae1aca702 100644 --- a/receiver/k8seventsreceiver/metadata.yaml +++ b/receiver/k8seventsreceiver/metadata.yaml @@ -7,3 +7,7 @@ status: distributions: [contrib, splunk, observiq, sumo] codeowners: active: [dmitryax, TylerHelmuth] + +tests: + config: + skip_lifecycle: true diff --git a/receiver/k8seventsreceiver/receiver.go b/receiver/k8seventsreceiver/receiver.go index acaf7d461c2b..dc0e6f2d6450 100644 --- a/receiver/k8seventsreceiver/receiver.go +++ b/receiver/k8seventsreceiver/receiver.go @@ -22,7 +22,6 @@ import ( type k8seventsReceiver struct { config *Config settings receiver.CreateSettings - client k8s.Interface logsConsumer consumer.Logs stopperChanList []chan struct{} startTime time.Time @@ -36,7 +35,6 @@ func newReceiver( set receiver.CreateSettings, config *Config, consumer consumer.Logs, - client k8s.Interface, ) (receiver.Logs, error) { transport := "http" @@ -52,7 +50,6 @@ func newReceiver( return &k8seventsReceiver{ settings: set, config: config, - client: client, logsConsumer: consumer, startTime: time.Now(), obsrecv: obsrecv, @@ -62,12 +59,17 @@ func newReceiver( func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error { kr.ctx, kr.cancel = context.WithCancel(ctx) + k8sInterface, err := kr.config.getK8sClient() + if err != nil { + return err + } + kr.settings.Logger.Info("starting to watch namespaces for the events.") if len(kr.config.Namespaces) == 0 { - kr.startWatch(corev1.NamespaceAll) + kr.startWatch(corev1.NamespaceAll, k8sInterface) } else { for _, ns := range kr.config.Namespaces { - kr.startWatch(ns) + kr.startWatch(ns, k8sInterface) } } @@ -75,6 +77,9 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error } func (kr *k8seventsReceiver) Shutdown(context.Context) error { + if kr.cancel == nil { + return nil + } // Stop watching all the namespaces by closing all the stopper channels. for _, stopperChan := range kr.stopperChanList { close(stopperChan) @@ -86,10 +91,10 @@ func (kr *k8seventsReceiver) Shutdown(context.Context) error { // Add the 'Event' handler and trigger the watch for a specific namespace. // For new and updated events, the code is relying on the following k8s code implementation: // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327 -func (kr *k8seventsReceiver) startWatch(ns string) { +func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) { stopperChan := make(chan struct{}) kr.stopperChanList = append(kr.stopperChanList, stopperChan) - kr.startWatchingNamespace(kr.client, cache.ResourceEventHandlerFuncs{ + kr.startWatchingNamespace(client, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { ev := obj.(*corev1.Event) kr.handleEvent(ev) diff --git a/receiver/k8seventsreceiver/receiver_test.go b/receiver/k8seventsreceiver/receiver_test.go index 1f81b9082489..49cff154707f 100644 --- a/receiver/k8seventsreceiver/receiver_test.go +++ b/receiver/k8seventsreceiver/receiver_test.go @@ -16,17 +16,21 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) func TestNewReceiver(t *testing.T) { rCfg := createDefaultConfig().(*Config) - client := fake.NewSimpleClientset() + rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) { + return fake.NewSimpleClientset(), nil + } r, err := newReceiver( receivertest.NewNopCreateSettings(), rCfg, consumertest.NewNop(), - client, ) require.NoError(t, err) @@ -39,7 +43,6 @@ func TestNewReceiver(t *testing.T) { receivertest.NewNopCreateSettings(), rCfg, consumertest.NewNop(), - client, ) require.NoError(t, err) @@ -50,13 +53,11 @@ func TestNewReceiver(t *testing.T) { func TestHandleEvent(t *testing.T) { rCfg := createDefaultConfig().(*Config) - client := fake.NewSimpleClientset() sink := new(consumertest.LogsSink) r, err := newReceiver( receivertest.NewNopCreateSettings(), rCfg, sink, - client, ) require.NoError(t, err) require.NotNil(t, r) @@ -70,13 +71,11 @@ func TestHandleEvent(t *testing.T) { func TestDropEventsOlderThanStartupTime(t *testing.T) { rCfg := createDefaultConfig().(*Config) - client := fake.NewSimpleClientset() sink := new(consumertest.LogsSink) r, err := newReceiver( receivertest.NewNopCreateSettings(), rCfg, sink, - client, ) require.NoError(t, err) require.NotNil(t, r) @@ -108,12 +107,10 @@ func TestGetEventTimestamp(t *testing.T) { func TestAllowEvent(t *testing.T) { rCfg := createDefaultConfig().(*Config) - client := fake.NewSimpleClientset() r, err := newReceiver( receivertest.NewNopCreateSettings(), rCfg, consumertest.NewNop(), - client, ) require.NoError(t, err) require.NotNil(t, r)