diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 613db7585092..f1be2fdc27f8 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -50,6 +50,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/mezmoexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter v0.106.1 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opsrampdebugexporter v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.106.1 @@ -108,6 +109,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricsgenerationprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.106.1 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sattributesprocessor v0.0.0-00010101000000-000000000000 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sobjectsprocessor v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.106.1 @@ -238,7 +241,7 @@ require ( go.opentelemetry.io/collector/receiver v0.106.1 go.opentelemetry.io/collector/receiver/nopreceiver v0.106.1 go.opentelemetry.io/collector/receiver/otlpreceiver v0.106.1 - golang.org/x/sys v0.22.0 + golang.org/x/sys v0.24.0 ) require ( @@ -693,7 +696,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/redis/go-redis/v9 v9.6.0 // indirect + github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/relvacode/iso8601 v1.4.0 // indirect github.com/rs/cors v1.11.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect @@ -806,14 +809,14 @@ require ( go.uber.org/fx v1.18.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.25.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect - golang.org/x/net v0.27.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.22.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect @@ -821,7 +824,7 @@ require ( google.golang.org/api v0.189.0 // indirect google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index cea8ae1e46e5..9cb603e1828f 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -2094,8 +2094,8 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA= -github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs= github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -2544,8 +2544,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2689,8 +2689,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2739,8 +2739,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2861,8 +2861,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2878,8 +2878,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2899,8 +2899,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -3213,8 +3213,8 @@ google.golang.org/genproto v0.0.0-20240722135656-d784300faade h1:lKFsS7wpngDgSCe google.golang.org/genproto v0.0.0-20240722135656-d784300faade/go.mod h1:FfBgJBJg9GcpPvKIuHSZ/aE1g2ecGL74upMzGZjiGEY= google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 h1:a/Z0jgw03aJ2rQnp5PlPpznJqJft0HyvyrcUcxgzPwY= google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade h1:oCRSWfwGXQsqlVdErcyTt4A93Y8fo0/9D4b1gnI++qo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a h1:EKiZZXueP9/T68B8Nl0GAx9cjbQnCId0yP3qPMgaaHs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 6f2d92c08e45..1c9aba44cb45 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -12,6 +12,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) var disallowFieldExtractConfigRegex = featuregate.GlobalRegistry().MustRegister( @@ -49,6 +51,8 @@ type Config struct { //Opsramp Metadata Addons Section MetadataAddOn []AddOnMetadata `mapstructure:"metadata_addon"` + + RedisConfig redis.OpsrampRedisConfig `mapstructure:"redis_config"` } func (cfg *Config) Validate() error { @@ -130,6 +134,26 @@ func (cfg *Config) Validate() error { } } + if cfg.RedisConfig.RedisHost == "" || cfg.RedisConfig.RedisPort == "" || cfg.RedisConfig.RedisPass == "" { + return fmt.Errorf("redis host, redis port and redis pass is mandatory") + } + + if cfg.RedisConfig.ClusterName == "" || cfg.RedisConfig.ClusterUid == "" { + return fmt.Errorf("cluster name and cluster uid is mandatory") + } + + if cfg.RedisConfig.NodeName == "" { + return fmt.Errorf("node name is mandatory") + } + + if cfg.RedisConfig.LruExpirationTime == 0 { + cfg.RedisConfig.LruExpirationTime = lru.DEFAULT_CACHE_EXPIRATION_INTERVAL + } + + if cfg.RedisConfig.LruCacheSize == 0 { + cfg.RedisConfig.LruCacheSize = lru.DEFAULT_CACHE_SIZE + } + return nil } diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 1e60ab2a9383..8ddb2717bbfd 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -166,6 +166,8 @@ func createProcessorOpts(cfg component.Config) []option { //Opsramp Metadata Addons opts = append(opts, withAddOnFields(oCfg.MetadataAddOn...)) + opts = append(opts, withRedisConfigFields(oCfg.RedisConfig)) + opts = append(opts, withExcludes(oCfg.Exclude)) return opts diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index a57610399ced..45442681dc12 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -5,9 +5,11 @@ go 1.21.0 require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.106.1 + github.com/redis/go-redis/v9 v9.6.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/client v0.106.1 go.opentelemetry.io/collector/component v0.106.1 @@ -38,6 +40,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v26.1.4+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 518bca43bab4..5107dba89f4f 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -788,6 +788,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -823,6 +827,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v26.1.4+incompatible h1:vuTpXDuoga+Z38m1OZHzl7NKisKWaWlhjQk7IDPSLsU= @@ -1046,6 +1052,8 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -1176,6 +1184,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/processor/k8sattributesprocessor/internal/lru/lru.go b/processor/k8sattributesprocessor/internal/lru/lru.go new file mode 100644 index 000000000000..5c081282a010 --- /dev/null +++ b/processor/k8sattributesprocessor/internal/lru/lru.go @@ -0,0 +1,53 @@ +package lru + +import ( + "fmt" + "sync" + "time" + + cache "github.com/hashicorp/golang-lru/v2/expirable" +) + +type Cache struct { + lrucache *cache.LRU[string, string] +} + +const ( + DEFAULT_CACHE_SIZE = 256 + DEFAULT_CACHE_EXPIRATION_INTERVAL = 10 * time.Minute +) + +var ( + once sync.Once + instance *Cache +) + +func New(size int, expirationInterval time.Duration) *Cache { + return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirationInterval)} +} + +func GetInstance(size int, expirationInterval time.Duration) *Cache { + once.Do(func() { + instance = New(size, expirationInterval) + }) + return instance +} + +func (c *Cache) Get(key string) (string, bool) { + return c.lrucache.Get(key) +} + +func (c *Cache) Add(key string, value string) { + c.lrucache.Add(key, value) +} + +func (c *Cache) AddEvicted(key string, value string) (evicted bool) { + return c.lrucache.Add(key, value) +} + +func (c *Cache) PrintKeys() { + fmt.Println(c.lrucache.Keys()) +} +func (c *Cache) PrintValues() { + fmt.Println(c.lrucache.Values()) +} diff --git a/processor/k8sattributesprocessor/internal/lru/lru_test.go b/processor/k8sattributesprocessor/internal/lru/lru_test.go new file mode 100644 index 000000000000..96a64db7445a --- /dev/null +++ b/processor/k8sattributesprocessor/internal/lru/lru_test.go @@ -0,0 +1,25 @@ +package lru + +import ( + "testing" +) + +func TestLRU(t *testing.T) { + cache := New(2, 0) + + // Test cases + cache.Add("key1", "value1") + cache.Add("key2", "value2") + + value, ok := cache.Get("key1") + if !ok || value != "value1" { + t.Errorf("Expected value1, got %v", value) + } + + cache.Add("key3", "value3") + + _, ok = cache.Get("key4") + if ok { + t.Errorf("Expected key1 to be evicted") + } +} diff --git a/processor/k8sattributesprocessor/internal/moid/moid.go b/processor/k8sattributesprocessor/internal/moid/moid.go new file mode 100644 index 000000000000..ff9bd2314c8b --- /dev/null +++ b/processor/k8sattributesprocessor/internal/moid/moid.go @@ -0,0 +1,141 @@ +package moid + +const ( + CLUSTER = "Cluster" + NAMESPACE = "Namespace" + NODE = "Node" + SERVICE = "Service" + DEPLOYMENT = "Deployment" + DAEMONSET = "DaemonSet" + REPLICASET = "ReplicaSet" + STATEFULSET = "StatefulSet" + POD = "Pod" + SEPARATOR = "_" +) + +type Moid struct { + clusterName string + clusterUuid string + namespaceName string + nodeName string + serviceName string + deploymentName string + replicasetName string + daemonsetName string + statefulsetName string + podName string +} + +func NewMoid(clusterName string) *Moid { + return &Moid{clusterName: clusterName} +} + +func (m *Moid) WithClusterUuid(clusterUuid string) *Moid { + m.clusterUuid = clusterUuid + return m +} + +func (m *Moid) WithNamespaceName(namespaceName string) *Moid { + m.namespaceName = namespaceName + return m +} + +func (m *Moid) WithNodeName(nodeName string) *Moid { + m.nodeName = nodeName + return m +} + +func (m *Moid) WithServiceName(serviceName string) *Moid { + m.serviceName = serviceName + return m +} + +func (m *Moid) WithDeploymentName(deploymentName string) *Moid { + m.deploymentName = deploymentName + return m +} + +func (m *Moid) WithReplicasetName(replicasetName string) *Moid { + m.replicasetName = replicasetName + return m +} + +func (m *Moid) WithDaemonsetName(daemonsetName string) *Moid { + m.daemonsetName = daemonsetName + return m +} + +func (m *Moid) WithStatefulsetName(statefulsetName string) *Moid { + m.statefulsetName = statefulsetName + return m +} + +func (m *Moid) WithPodName(podName string) *Moid { + m.podName = podName + return m +} + +func (m *Moid) PodMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + if m.replicasetName != "" { + moid += REPLICASET + SEPARATOR + m.replicasetName + SEPARATOR + } else if m.daemonsetName != "" { + moid += DAEMONSET + SEPARATOR + m.daemonsetName + SEPARATOR + } else if m.statefulsetName != "" { + moid += STATEFULSET + SEPARATOR + m.statefulsetName + SEPARATOR + } + + moid += POD + SEPARATOR + m.podName + return +} + +func (m *Moid) NodeMoid() (moid string) { + moid = m.clusterName + SEPARATOR + NODE + SEPARATOR + m.nodeName + return +} + +func (m *Moid) NamespaceMoid() (moid string) { + moid = m.clusterName + SEPARATOR + NAMESPACE + SEPARATOR + m.namespaceName + return +} + +func (m *Moid) DaemonSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += DAEMONSET + SEPARATOR + m.daemonsetName + return +} + +func (m *Moid) ReplicaSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += REPLICASET + SEPARATOR + m.replicasetName + return +} + +func (m *Moid) StatefulSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += STATEFULSET + SEPARATOR + m.statefulsetName + return +} + +func (m *Moid) DeploymentMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += DEPLOYMENT + SEPARATOR + m.deploymentName + return +} + +func (m *Moid) ServiceMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += SERVICE + SEPARATOR + m.serviceName + return +} + +func (m *Moid) ClusterMoid() (moid string) { + moid = m.clusterName + SEPARATOR + CLUSTER + SEPARATOR + m.clusterUuid + return +} diff --git a/processor/k8sattributesprocessor/internal/redis/client.go b/processor/k8sattributesprocessor/internal/redis/client.go new file mode 100644 index 000000000000..041b66abcdc4 --- /dev/null +++ b/processor/k8sattributesprocessor/internal/redis/client.go @@ -0,0 +1,127 @@ +package redis + +import ( + "context" + "time" + + lru "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" + goredis "github.com/redis/go-redis/v9" + "go.uber.org/zap" +) + +type Client struct { + Host string + Port string + Password string + GoClient *goredis.Client + lruCache *lru.Cache + Enabled bool + logger *zap.Logger +} + +type OpsrampRedisConfig struct { + RedisHost string `mapstructure:"redisHost"` + RedisPort string `mapstructure:"redisPort"` + RedisPass string `mapstructure:"redisPass"` + ClusterName string `mapstructure:"clusterName"` + ClusterUid string `mapstructure:"clusterUid"` + NodeName string `mapstructure:"nodeName"` + LruCacheSize int `mapstructure:"lruCacheSize"` + LruExpirationTime time.Duration `mapstructure:"lruExpirationTime"` +} + +func NewClient(logger *zap.Logger, lruCache *lru.Cache, rHost, rPort, rPass string) *Client { + client := Client{ + Host: rHost, + Port: rPort, + Password: rPass, + Enabled: true, + logger: logger, + lruCache: lruCache, + } + + if client.Host == "" { + logger.Info("Redis Host is empty, hence no lookup for moid/resourceuuid cache") + client.Enabled = false + return &client + } + + if client.Port == "" { + client.Port = "6379" + } + + client.Init() + + return &client +} + +func (c *Client) Init() error { + c.GoClient = goredis.NewClient(&goredis.Options{ + Addr: c.Host + ":" + c.Port, + Password: c.Password, + MaxRetries: -1, + MinRetryBackoff: 55 * time.Millisecond, + MaxRetryBackoff: 2 * time.Second, + }) + + if err := c.TestConnection(context.Background()); err != nil { + return err + } + + return nil +} + +func (c *Client) TestConnection(ctx context.Context) error { + logger := c.logger + + var err error + err = nil + + for i := 0; i < 15; i++ { + _, err = c.GoClient.Ping(ctx).Result() + if err != nil { + logger.Info("Could not connect/ping to Redis", zap.Any("error", err.Error())) + } else { + logger.Info("Connected to Redis") + break + } + time.Sleep(1 * time.Second) + } + + if err != nil { + logger.Error("Could not connect/ping to Redis", zap.Any("error", err.Error())) + } + + return err +} + +func (c *Client) GetValueInString(ctx context.Context, key string) string { + logger := c.logger + + // Try to init the cache if it is firt time + + cache := c.lruCache + + value, ok := cache.Get(key) + if !ok { + logger.Debug("Failed to fetch the key from lru cache ", zap.Any("key", key)) + if c.Enabled { + val, err := c.GoClient.Get(ctx, key).Result() + if err == goredis.Nil { + logger.Debug("key does not exist ", zap.Any("key", key)) + } else if err != nil { + logger.Error("Failed to fetch the key from redis ", zap.Error(err)) + } else { + logger.Debug("Got value from redis ", zap.Any("key", key), zap.Any("value", value)) + value = val + } + } + + if value != "" { + cache.Add(key, value) + } + } else { + logger.Debug("Got value from lru cache ", zap.Any("key", key), zap.Any("value", value)) + } + return value +} diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 13592e16f36d..80431959022e 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) const ( @@ -389,3 +390,13 @@ func withAddOnFields(filters ...AddOnMetadata) option { return nil } } + +func withRedisConfigFields(filters redis.OpsrampRedisConfig) option { + fmt.Println("The value of filters received : ", filters) + + return func(p *kubernetesprocessor) error { + p.redisConfig = filters + return nil + } + +} diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 5515e8f805d1..c292a50bafe6 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -18,6 +18,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/moid" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) const ( @@ -35,8 +38,10 @@ type kubernetesprocessor struct { rules kube.ExtractionRules filters kube.Filters addons []kube.AddOnMetadata + redisConfig redis.OpsrampRedisConfig podAssociations []kube.Association podIgnore kube.Excludes + redisClient *redis.Client } func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { @@ -63,6 +68,19 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error } } + kp.logger.Info("ops k8s attr processor start", zap.Any("redisHost", kp.redisConfig.RedisHost), + zap.Any("redisPort", kp.redisConfig.RedisPort), + zap.Any("redisPass", kp.redisConfig.RedisPass)) + + cache := lru.GetInstance(kp.redisConfig.LruCacheSize, kp.redisConfig.LruExpirationTime) + + if cache == nil { + kp.logger.Error("Failed to initilize the cache with GetInstance()") + return nil + } + + kp.redisClient = redis.NewClient(kp.logger, cache, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) + // This might have been set by an option already if kp.kc == nil { err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) @@ -122,11 +140,6 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations) kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue)) - for _, addon := range kp.addons { - //fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value) - resource.Attributes().PutStr(addon.Key, addon.Value) - } - for i := range podIdentifierValue { if podIdentifierValue[i].Source.From == kube.ConnectionSource && podIdentifierValue[i].Value != "" { if _, found := resource.Attributes().Get(kube.K8sIPLabelName); !found { @@ -179,6 +192,109 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } } + kp.processopsrampResources(ctx, resource) +} + +// processResource adds Pod metadata tags to resource based on pod association configuration +func (kp *kubernetesprocessor) processopsrampResources(ctx context.Context, resource pcommon.Resource) { + var found bool + var resourceUuid string + + for _, addon := range kp.addons { + //fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value) + resource.Attributes().PutStr(addon.Key, addon.Value) + } + + if _, found = resource.Attributes().Get("k8s.pod.uid"); found { + if resourceUuid = kp.GetResourceUuidUsingPodMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + /* + if resourceUuid = kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + } + } + */ + } + } else if _, found = resource.Attributes().Get("k8s.node.name"); found { + if resourceUuid = kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + } + } + } else { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + + /* + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + } + */ + } + + if resourceUuid != "" { + resource.Attributes().PutStr("uuid", resourceUuid) + } +} + +func (op *kubernetesprocessor) GetResourceUuidUsingPodMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + var namespace, podname, rsname, dsname, ssname pcommon.Value + var found bool + + if namespace, found = resource.Attributes().Get("k8s.namespace.name"); !found { + return + } + if podname, found = resource.Attributes().Get("k8s.pod.name"); !found { + return + } + + podMoid := moid.NewMoid(op.redisConfig.ClusterName).WithNamespaceName(namespace.Str()).WithPodName(podname.Str()) + + if rsname, found = resource.Attributes().Get("k8s.replicaset.name"); found { + podMoid.WithReplicasetName(rsname.Str()) + } else if dsname, found = resource.Attributes().Get("k8s.daemonset.name"); found { + podMoid.WithDaemonsetName(dsname.Str()) + } else if ssname, found = resource.Attributes().Get("k8s.statefulset.name"); found { + podMoid.WithStatefulsetName(ssname.Str()) + } + + podMoidKey := podMoid.PodMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, podMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", podMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingResourceNodeMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + var nodename pcommon.Value + var found bool + if nodename, found = resource.Attributes().Get("k8s.node.name"); !found { + return + } + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(nodename.Str()).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingCurrentNodeMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(op.redisConfig.NodeName).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingClusterMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(op.redisConfig.NodeName).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return } func getNamespace(pod *kube.Pod, resAttrs pcommon.Map) string { diff --git a/processor/opsrampk8sattributesprocessor/go.mod b/processor/opsrampk8sattributesprocessor/go.mod index 2b93d4b197b9..e723377f22f1 100644 --- a/processor/opsrampk8sattributesprocessor/go.mod +++ b/processor/opsrampk8sattributesprocessor/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/collector/processor v0.106.1 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 ) require ( diff --git a/processor/opsrampk8sattributesprocessor/go.sum b/processor/opsrampk8sattributesprocessor/go.sum index ec7ef67b9d45..94a909c488ac 100644 --- a/processor/opsrampk8sattributesprocessor/go.sum +++ b/processor/opsrampk8sattributesprocessor/go.sum @@ -27,6 +27,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go new file mode 100644 index 000000000000..4d560027155e --- /dev/null +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go @@ -0,0 +1,53 @@ +package lru + +import ( + "fmt" + "sync" + "time" + + cache "github.com/hashicorp/golang-lru/v2/expirable" +) + +type Cache struct { + lrucache *cache.LRU[string, string] +} + +const ( + DEFAULT_CACHE_SIZE = 16 + DEFAULT_CACHE_EXPIRATION_INTERVAL = time.Minute +) + +var ( + once sync.Once + instance *Cache +) + +func New(size int, expirateInterval time.Duration) *Cache { + return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirateInterval)} +} + +func GetInstance() *Cache { + once.Do(func() { + instance = New(DEFAULT_CACHE_SIZE, DEFAULT_CACHE_EXPIRATION_INTERVAL) + }) + return instance +} + +func (c *Cache) Get(key string) (string, bool) { + return c.lrucache.Get(key) +} + +func (c *Cache) Add(key string, value string) { + c.lrucache.Add(key, value) +} + +func (c *Cache) AddEvicted(key string, value string) (evicted bool) { + return c.lrucache.Add(key, value) +} + +func (c *Cache) PrintKeys() { + fmt.Println(c.lrucache.Keys()) +} +func (c *Cache) PrintValues() { + fmt.Println(c.lrucache.Values()) +} diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go new file mode 100644 index 000000000000..96a64db7445a --- /dev/null +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go @@ -0,0 +1,25 @@ +package lru + +import ( + "testing" +) + +func TestLRU(t *testing.T) { + cache := New(2, 0) + + // Test cases + cache.Add("key1", "value1") + cache.Add("key2", "value2") + + value, ok := cache.Get("key1") + if !ok || value != "value1" { + t.Errorf("Expected value1, got %v", value) + } + + cache.Add("key3", "value3") + + _, ok = cache.Get("key4") + if ok { + t.Errorf("Expected key1 to be evicted") + } +} diff --git a/processor/opsrampk8sattributesprocessor/internal/redis/client.go b/processor/opsrampk8sattributesprocessor/internal/redis/client.go index c720d1eaa036..fa1c7e4ceacc 100644 --- a/processor/opsrampk8sattributesprocessor/internal/redis/client.go +++ b/processor/opsrampk8sattributesprocessor/internal/redis/client.go @@ -3,6 +3,7 @@ package redis import ( "context" + lru "github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sattributesprocessor/internal/lru" goredis "github.com/redis/go-redis/v9" "go.uber.org/zap" ) @@ -70,25 +71,37 @@ func (c *Client) TestConnection(ctx context.Context) error { return nil } -func (c *Client) GetValueInString(ctx context.Context, key string) (value string) { - +func (c *Client) GetValueInString(ctx context.Context, key string) string { logger := c.logger - if c.Enabled { - if c.Connected { - val, err := c.GoClient.Get(ctx, key).Result() - if err == goredis.Nil { - logger.Debug("key does not exist", zap.Any("key", key)) - } else if err != nil { + // Try to init the cache if it is firt time + cache := lru.GetInstance() + + if cache != nil { + logger.Debug("Failed to initilize the cache with GetInstance()") + return "" + } + value, ok := cache.Get(key) + if !ok { + logger.Debug("Failed to fetch the key from the cache ", zap.Any("value : ", key)) + if c.Enabled { + if c.Connected { + val, err := c.GoClient.Get(ctx, key).Result() + if err == goredis.Nil { + logger.Debug("key does not exist", zap.Any("key", key)) + } else if err != nil { + logger.Info("Trying to reconnect") + c.Init() + } else { + value = val + } + } else { logger.Info("Trying to reconnect") c.Init() - } else { - value = val } - } else { - logger.Info("Trying to reconnect") - c.Init() } + //Before returning ; it should update the cache + cache.Add(key, value) } - return + return value }