Skip to content

Commit

Permalink
[receiver/k8sobjects] improving watch reliability on recoverable issu…
Browse files Browse the repository at this point in the history
…es (#18828)

When using k8sobjects it was not recovering when API servers hangs or timeout.
For more information about RetryWatcher please check the official docs.
  • Loading branch information
nicolastakashi authored Feb 22, 2023
1 parent 1eb7a27 commit 9ddd683
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 19 deletions.
16 changes: 16 additions & 0 deletions .chloggen/k8sobjectsreceiver-improve-watch-reliability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improving watch mode reliability to handle recoverable issue such as API timeouts.

# One or more tracking issues related to the change
issues: [18078]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions receiver/k8sobjectsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
- `label_selector`: select objects by label(s)
- `field_selector`: select objects by field(s)
- `interval`: the interval at which object is pulled, default 60 minutes. Only useful for `pull` mode.
- `resource_version` allows watch resources starting from a specific version (default = `1`). Only available for `watch` mode.
- `namespaces`: An array of `namespaces` to collect events from. (default = `all`)
- `group`: API group name. It is an optional config. When given resource object is present in multiple groups,
use this config to specify the group to select. By default, it will select the first group.
Expand Down
30 changes: 20 additions & 10 deletions receiver/k8sobjectsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const (
PullMode mode = "pull"
WatchMode mode = "watch"

defaultPullInterval time.Duration = time.Hour
defaultMode mode = PullMode
defaultPullInterval time.Duration = time.Hour
defaultMode mode = PullMode
defaultResourceVersion = "1"
)

var modeMap = map[mode]bool{
Expand All @@ -42,14 +43,15 @@ var modeMap = map[mode]bool{
}

type K8sObjectsConfig struct {
Name string `mapstructure:"name"`
Group string `mapstructure:"group"`
Namespaces []string `mapstructure:"namespaces"`
Mode mode `mapstructure:"mode"`
LabelSelector string `mapstructure:"label_selector"`
FieldSelector string `mapstructure:"field_selector"`
Interval time.Duration `mapstructure:"interval"`
gvr *schema.GroupVersionResource
Name string `mapstructure:"name"`
Group string `mapstructure:"group"`
Namespaces []string `mapstructure:"namespaces"`
Mode mode `mapstructure:"mode"`
LabelSelector string `mapstructure:"label_selector"`
FieldSelector string `mapstructure:"field_selector"`
Interval time.Duration `mapstructure:"interval"`
ResourceVersion string `mapstructure:"resource_version"`
gvr *schema.GroupVersionResource
}

type Config struct {
Expand Down Expand Up @@ -96,6 +98,14 @@ func (c *Config) Validate() error {
object.Interval = defaultPullInterval
}

if object.Mode == PullMode && object.ResourceVersion != "" {
return fmt.Errorf("resource version is invalid for mode: %v", object.Mode)
}

if object.Mode == WatchMode && object.ResourceVersion == "" {
object.ResourceVersion = defaultResourceVersion
}

object.gvr = gvr
}
return nil
Expand Down
85 changes: 81 additions & 4 deletions receiver/k8sobjectsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ func TestLoadConfig(t *testing.T) {
},
},
{
Name: "events",
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
Name: "events",
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "1",
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Expand Down Expand Up @@ -130,3 +131,79 @@ func TestValidateResourceConflict(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "group2", rCfg.Objects[0].gvr.Group)
}

func TestInvalidPullConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_pull_config.yaml"))
require.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sub, err := cm.Sub("k8sobjects")
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))
require.NotNil(t, cfg)

err = component.ValidateConfig(cfg)
require.Error(t, err)

cfg.makeDiscoveryClient = getMockDiscoveryClient

err = component.ValidateConfig(cfg)
require.Error(t, err)
require.Equal(t, err.Error(), "resource version is invalid for mode: pull")
}

func TestWatchResourceVersion(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_watch_resource_version.yaml"))
require.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sub, err := cm.Sub("k8sobjects")
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))
require.NotNil(t, cfg)

err = component.ValidateConfig(cfg)
require.Error(t, err)

cfg.makeDiscoveryClient = getMockDiscoveryClient

err = component.ValidateConfig(cfg)
require.NoError(t, err)

expected := []*K8sObjectsConfig{
{
Name: "events",
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "1",
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Resource: "events",
},
},
{
Name: "events",
Mode: WatchMode,
Namespaces: []string{"default"},
Group: "events.k8s.io",
ResourceVersion: "2",
gvr: &schema.GroupVersionResource{
Group: "events.k8s.io",
Version: "v1",
Resource: "events",
},
},
}
assert.EqualValues(t, expected, cfg.Objects)

}
1 change: 1 addition & 0 deletions receiver/k8sobjectsreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion receiver/k8sobjectsreceiver/mock_dynamic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c mockDynamicClient) createPods(objects ...*unstructured.Unstructured) {
}

func generatePod(name, namespace string, labels map[string]interface{}) *unstructured.Unstructured {
return &unstructured.Unstructured{
pod := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Pods",
Expand All @@ -71,4 +71,6 @@ func generatePod(name, namespace string, labels map[string]interface{}) *unstruc
},
}

pod.SetResourceVersion("1")
return &pod
}
15 changes: 11 additions & 4 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
)

type k8sobjectsreceiver struct {
Expand Down Expand Up @@ -145,10 +148,14 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()

watch, err := resource.Watch(ctx, metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
})
watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
return resource.Watch(ctx, metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
})
}

watch, err := watch.NewRetryWatcher(config.ResourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
kr.setting.Logger.Error("error in watching object", zap.String("resource", config.gvr.String()), zap.Error(err))
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
k8sobjects:
objects:
- name: events
mode: watch
group: events.k8s.io
namespaces: [default]
- name: events
mode: watch
group: events.k8s.io
namespaces: [default]
resource_version: "2"
6 changes: 6 additions & 0 deletions receiver/k8sobjectsreceiver/testdata/invalid_pull_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
k8sobjects:
objects:
- name: pods
mode: pull
resource_version: "1"

0 comments on commit 9ddd683

Please sign in to comment.