Skip to content

Commit

Permalink
Merging app and network K8s informers (grafana#956)
Browse files Browse the repository at this point in the history
* merging informers into a single one

* merged informers and compiled but not yet works

* merged informers and compiled but not yet works

* compiles and passes unit tests

* some comments and minor refactors

* Fix wrong type cast

* Fixed Kind in owner data
  • Loading branch information
mariomac authored Jun 25, 2024
1 parent 4f02bf8 commit cf71d5d
Show file tree
Hide file tree
Showing 25 changed files with 476 additions and 549 deletions.
3 changes: 2 additions & 1 deletion pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/beyla/pkg/internal/filter"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/services"
"github.com/grafana/beyla/pkg/transform"
Expand Down Expand Up @@ -104,7 +105,7 @@ var DefaultConfig = Config{
HostnameDNSResolution: true,
},
Kubernetes: transform.KubernetesDecorator{
Enable: transform.EnabledDefault,
Enable: kube.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
},
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/beyla/pkg/internal/export/prom"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/netolly/transform/cidr"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/transform"
Expand Down Expand Up @@ -161,7 +162,7 @@ network:
},
Kubernetes: transform.KubernetesDecorator{
KubeconfigPath: "/foo/bar",
Enable: transform.EnabledTrue,
Enable: kube.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
},
Select: attributes.Selection{
Expand Down
9 changes: 7 additions & 2 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/beyla/pkg/internal/connector"
"github.com/grafana/beyla/pkg/internal/export/attributes"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/netolly/agent"
"github.com/grafana/beyla/pkg/internal/netolly/flow"
"github.com/grafana/beyla/pkg/internal/pipe/global"
Expand Down Expand Up @@ -85,7 +86,11 @@ func buildCommonContextInfo(
promMgr := &connector.PrometheusManager{}
ctxInfo := &global.ContextInfo{
Prometheus: promMgr,
K8sEnabled: config.Attributes.Kubernetes.Enabled(),
K8sInformer: kube.NewMetadataProvider(
config.Attributes.Kubernetes.Enable,
config.Attributes.Kubernetes.KubeconfigPath,
config.Attributes.Kubernetes.InformersSyncTimeout,
),
}
if config.InternalMetrics.Prometheus.Port != 0 {
slog.Debug("reporting internal metrics as Prometheus")
Expand All @@ -106,7 +111,7 @@ func buildCommonContextInfo(
// attributeGroups specifies, based in the provided configuration, which groups of attributes
// need to be enabled by default for the diverse metrics
func attributeGroups(config *beyla.Config, ctxInfo *global.ContextInfo) {
if ctxInfo.K8sEnabled {
if ctxInfo.K8sInformer.IsKubeEnabled() {
ctxInfo.MetricAttributeGroups.Add(attributes.GroupKubernetes)
}
if config.Routes != nil {
Expand Down
34 changes: 7 additions & 27 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ import (
"fmt"
"log/slog"

"k8s.io/client-go/kubernetes"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover"
kube2 "github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/transform/kube"
"github.com/grafana/beyla/pkg/transform"
)

func log() *slog.Logger {
Expand Down Expand Up @@ -117,43 +113,27 @@ func (i *Instrumenter) ReadAndForward() error {

func setupFeatureContextInfo(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) {
ctxInfo.AppO11y.ReportRoutes = config.Routes != nil
setupKubernetes(ctx, ctxInfo, &config.Attributes.Kubernetes)
setupKubernetes(ctx, ctxInfo)
}

// setupKubernetes sets up common Kubernetes database and API clients that need to be accessed
// from different stages in the Beyla pipeline
func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo, k8sCfg *transform.KubernetesDecorator) {
if !ctxInfo.K8sEnabled {
return
}
config, err := kube2.LoadConfig(k8sCfg.KubeconfigPath)
if err != nil {
slog.Error("can't read kubernetes config. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sEnabled = false
func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo) {
if !ctxInfo.K8sInformer.IsKubeEnabled() {
return
}

kubeClient, err := kubernetes.NewForConfig(config)
informer, err := ctxInfo.K8sInformer.Get(ctx)
if err != nil {
slog.Error("can't init Kubernetes client. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sEnabled = false
return
}

ctxInfo.AppO11y.K8sInformer = &kube2.Metadata{}
if err := ctxInfo.AppO11y.K8sInformer.InitFromClient(ctx, kubeClient, k8sCfg.InformersSyncTimeout); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.AppO11y.K8sInformer = nil
ctxInfo.K8sEnabled = false
ctxInfo.K8sInformer.ForceDisable()
return
}

if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(ctxInfo.AppO11y.K8sInformer); err != nil {
if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sEnabled = false
ctxInfo.K8sInformer.ForceDisable()
}
}
4 changes: 2 additions & 2 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrument
gb := pipe.NewBuilder(&nodesMap{}, pipe.ChannelBufferLen(pf.cfg.ChannelBufferLen))
pipe.AddStart(gb, processWatcher, ProcessWatcherFunc(pf.ctx, pf.cfg))
pipe.AddMiddleProvider(gb, ptrWatcherKubeEnricher,
WatcherKubeEnricherProvider(pf.ctxInfo.K8sEnabled, pf.ctxInfo.AppO11y.K8sInformer))
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sEnabled, pf.ctxInfo.AppO11y.K8sDatabase))
ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
Expand Down
19 changes: 17 additions & 2 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package discover

import (
"context"
"fmt"
"log/slog"

Expand Down Expand Up @@ -51,11 +52,25 @@ type nsName struct {
name string
}

func WatcherKubeEnricherProvider(enabled bool, informer kubeMetadata) pipe.MiddleProvider[[]Event[processAttrs], []Event[processAttrs]] {
// kubeMetadataProvider abstracts kube.MetadataProvider for easier dependency
// injection in tests
type kubeMetadataProvider interface {
IsKubeEnabled() bool
Get(context.Context) (*kube.Metadata, error)
}

func WatcherKubeEnricherProvider(
ctx context.Context,
informerProvider kubeMetadataProvider,
) pipe.MiddleProvider[[]Event[processAttrs], []Event[processAttrs]] {
return func() (pipe.MiddleFunc[[]Event[processAttrs], []Event[processAttrs]], error) {
if !enabled {
if !informerProvider.IsKubeEnabled() {
return pipe.Bypass[[]Event[processAttrs]](), nil
}
informer, err := informerProvider.Get(ctx)
if err != nil {
return nil, fmt.Errorf("instantiating WatcherKubeEnricher: %w", err)
}
wk := watcherKubeEnricher{informer: informer}
if err := wk.init(); err != nil {
return nil, err
Expand Down
16 changes: 14 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
defer close(inputCh)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(true, &informer)()
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
Expand Down Expand Up @@ -308,3 +308,15 @@ func fakeProcessInfo(pp processAttrs) (*services.ProcessInfo, error) {
ExePath: fmt.Sprintf("/bin/process%d", pp.pid),
}, nil
}

type informerProvider struct {
informer *kube.Metadata
}

func (*informerProvider) IsKubeEnabled() bool {
return true
}

func (ip *informerProvider) Get(_ context.Context) (*kube.Metadata, error) {
return ip.informer, nil
}
12 changes: 8 additions & 4 deletions pkg/internal/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ type metricsReporter struct {

is instrumentations.InstrumentationSelection

kubeEnabled bool

serviceCache *expirable.LRU[svc.UID, svc.ID]
}

Expand Down Expand Up @@ -261,12 +263,14 @@ func newReporter(
}

clock := expire.NewCachedClock(timeNow)
kubeEnabled := ctxInfo.K8sInformer.IsKubeEnabled()
// If service name is not explicitly set, we take the service name as set by the
// executable inspector
mr := &metricsReporter{
bgCtx: ctx,
ctxInfo: ctxInfo,
cfg: cfg,
kubeEnabled: kubeEnabled,
clock: clock,
is: is,
promConnect: ctxInfo.Prometheus,
Expand Down Expand Up @@ -408,7 +412,7 @@ func newReporter(
return NewExpirer[prometheus.Gauge](prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: TracesTargetInfo,
Help: "target service information in trace span metric format",
}, labelNamesTargetInfo(ctxInfo)).MetricVec, clock.Time, cfg.TTL)
}, labelNamesTargetInfo(kubeEnabled)).MetricVec, clock.Time, cfg.TTL)
}),
serviceGraphClient: optionalHistogramProvider(cfg.ServiceGraphMetricsEnabled(), func() *Expirer[prometheus.Histogram] {
return NewExpirer[prometheus.Histogram](prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -683,10 +687,10 @@ func (r *metricsReporter) labelValuesSpans(span *request.Span) []string {
}
}

func labelNamesTargetInfo(ctxInfo *global.ContextInfo) []string {
func labelNamesTargetInfo(kubeEnabled bool) []string {
names := []string{serviceKey, serviceNamespaceKey, serviceInstanceKey, serviceJobKey, telemetryLanguageKey, telemetrySDKKey, sourceKey}

if ctxInfo.K8sEnabled {
if kubeEnabled {
names = appendK8sLabelNames(names)
}

Expand All @@ -708,7 +712,7 @@ func (r *metricsReporter) labelValuesTargetInfo(service svc.ID) []string {
"beyla",
}

if r.ctxInfo.K8sEnabled {
if r.kubeEnabled {
values = appendK8sLabelValuesService(values, service)
}

Expand Down
File renamed without changes.
Loading

0 comments on commit cf71d5d

Please sign in to comment.