Skip to content

Commit

Permalink
[chore] move k8seventsreceiver to generated lifecycle tests (open-tel…
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme authored Dec 17, 2023
1 parent 8d74e22 commit af4a948
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 24 deletions.
7 changes: 1 addition & 6 deletions receiver/k8seventsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,5 @@ func createLogsReceiver(
) (receiver.Logs, error) {
rCfg := cfg.(*Config)

k8sInterface, err := rCfg.getK8sClient()
if err != nil {
return nil, err
}

return newReceiver(params, rCfg, consumer, k8sInterface)
return newReceiver(params, rCfg, consumer)
}
7 changes: 5 additions & 2 deletions receiver/k8seventsreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
k8s "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -42,8 +43,9 @@ func TestCreateReceiver(t *testing.T) {
context.Background(), receivertest.NewNopCreateSettings(),
rCfg, consumertest.NewNop(),
)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, r)

// Override for test.
rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) {
Expand All @@ -54,6 +56,7 @@ func TestCreateReceiver(t *testing.T) {
receivertest.NewNopCreateSettings(),
rCfg, consumertest.NewNop(),
)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
assert.NotNil(t, r)
}
89 changes: 89 additions & 0 deletions receiver/k8seventsreceiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions receiver/k8seventsreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ status:
distributions: [contrib, splunk, observiq, sumo]
codeowners:
active: [dmitryax, TylerHelmuth]

tests:
config:
skip_lifecycle: true
19 changes: 12 additions & 7 deletions receiver/k8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
type k8seventsReceiver struct {
config *Config
settings receiver.CreateSettings
client k8s.Interface
logsConsumer consumer.Logs
stopperChanList []chan struct{}
startTime time.Time
Expand All @@ -36,7 +35,6 @@ func newReceiver(
set receiver.CreateSettings,
config *Config,
consumer consumer.Logs,
client k8s.Interface,
) (receiver.Logs, error) {
transport := "http"

Expand All @@ -52,7 +50,6 @@ func newReceiver(
return &k8seventsReceiver{
settings: set,
config: config,
client: client,
logsConsumer: consumer,
startTime: time.Now(),
obsrecv: obsrecv,
Expand All @@ -62,19 +59,27 @@ func newReceiver(
func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error {
kr.ctx, kr.cancel = context.WithCancel(ctx)

k8sInterface, err := kr.config.getK8sClient()
if err != nil {
return err
}

kr.settings.Logger.Info("starting to watch namespaces for the events.")
if len(kr.config.Namespaces) == 0 {
kr.startWatch(corev1.NamespaceAll)
kr.startWatch(corev1.NamespaceAll, k8sInterface)
} else {
for _, ns := range kr.config.Namespaces {
kr.startWatch(ns)
kr.startWatch(ns, k8sInterface)
}
}

return nil
}

func (kr *k8seventsReceiver) Shutdown(context.Context) error {
if kr.cancel == nil {
return nil
}
// Stop watching all the namespaces by closing all the stopper channels.
for _, stopperChan := range kr.stopperChanList {
close(stopperChan)
Expand All @@ -86,10 +91,10 @@ func (kr *k8seventsReceiver) Shutdown(context.Context) error {
// Add the 'Event' handler and trigger the watch for a specific namespace.
// For new and updated events, the code is relying on the following k8s code implementation:
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327
func (kr *k8seventsReceiver) startWatch(ns string) {
func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) {
stopperChan := make(chan struct{})
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.startWatchingNamespace(kr.client, cache.ResourceEventHandlerFuncs{
kr.startWatchingNamespace(client, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
ev := obj.(*corev1.Event)
kr.handleEvent(ev)
Expand Down
15 changes: 6 additions & 9 deletions receiver/k8seventsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

func TestNewReceiver(t *testing.T) {
rCfg := createDefaultConfig().(*Config)
client := fake.NewSimpleClientset()
rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) {
return fake.NewSimpleClientset(), nil
}
r, err := newReceiver(
receivertest.NewNopCreateSettings(),
rCfg,
consumertest.NewNop(),
client,
)

require.NoError(t, err)
Expand All @@ -39,7 +43,6 @@ func TestNewReceiver(t *testing.T) {
receivertest.NewNopCreateSettings(),
rCfg,
consumertest.NewNop(),
client,
)

require.NoError(t, err)
Expand All @@ -50,13 +53,11 @@ func TestNewReceiver(t *testing.T) {

func TestHandleEvent(t *testing.T) {
rCfg := createDefaultConfig().(*Config)
client := fake.NewSimpleClientset()
sink := new(consumertest.LogsSink)
r, err := newReceiver(
receivertest.NewNopCreateSettings(),
rCfg,
sink,
client,
)
require.NoError(t, err)
require.NotNil(t, r)
Expand All @@ -70,13 +71,11 @@ func TestHandleEvent(t *testing.T) {

func TestDropEventsOlderThanStartupTime(t *testing.T) {
rCfg := createDefaultConfig().(*Config)
client := fake.NewSimpleClientset()
sink := new(consumertest.LogsSink)
r, err := newReceiver(
receivertest.NewNopCreateSettings(),
rCfg,
sink,
client,
)
require.NoError(t, err)
require.NotNil(t, r)
Expand Down Expand Up @@ -108,12 +107,10 @@ func TestGetEventTimestamp(t *testing.T) {

func TestAllowEvent(t *testing.T) {
rCfg := createDefaultConfig().(*Config)
client := fake.NewSimpleClientset()
r, err := newReceiver(
receivertest.NewNopCreateSettings(),
rCfg,
consumertest.NewNop(),
client,
)
require.NoError(t, err)
require.NotNil(t, r)
Expand Down

0 comments on commit af4a948

Please sign in to comment.