From 8327b5b5c8de94a209e1947a3685141d7e8daa93 Mon Sep 17 00:00:00 2001 From: Pradeep Thota Date: Tue, 30 Jul 2024 11:42:55 +0000 Subject: [PATCH 1/6] feature:lru cache implementation --- .../opsrampk8sattributesprocessor/go.mod | 1 + .../opsrampk8sattributesprocessor/go.sum | 2 + .../internal/lru/lru.go | 44 +++++++++++++++++++ .../internal/lru/lru_test.go | 28 ++++++++++++ .../internal/redis/client.go | 44 +++++++++++++------ 5 files changed, 105 insertions(+), 14 deletions(-) create mode 100644 processor/opsrampk8sattributesprocessor/internal/lru/lru.go create mode 100644 processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go diff --git a/processor/opsrampk8sattributesprocessor/go.mod b/processor/opsrampk8sattributesprocessor/go.mod index ebecc72a9a72..a3c4d751d8d6 100644 --- a/processor/opsrampk8sattributesprocessor/go.mod +++ b/processor/opsrampk8sattributesprocessor/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/collector/processor v0.104.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 ) require ( diff --git a/processor/opsrampk8sattributesprocessor/go.sum b/processor/opsrampk8sattributesprocessor/go.sum index 9a23b82244a3..d9ec6dd683ec 100644 --- a/processor/opsrampk8sattributesprocessor/go.sum +++ b/processor/opsrampk8sattributesprocessor/go.sum @@ -27,6 +27,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go new file mode 100644 index 000000000000..47c3b47074d2 --- /dev/null +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go @@ -0,0 +1,44 @@ +package lru + +import ( + "sync" + + cache "github.com/hashicorp/golang-lru/v2" +) + +type Cache struct { + lru *cache.Cache[string, string] +} + +var ( + once sync.Once + instance *Cache + err error +) + +func New(size int) (*Cache, error) { + cache, err := cache.New[string, string](size) + if err != nil { + return nil, err + } + return &Cache{lru: cache}, nil +} + +func GetInstance(size int) (*Cache, error) { + once.Do(func() { + instance, err = New(size) + }) + return instance, err +} + +func (c *Cache) Get(key string) (string, bool) { + return c.lru.Get(key) +} + +func (c *Cache) Add(key string, value string) { + c.lru.Add(key, value) +} + +func (c *Cache) AddEvicted(key string, value string) (evicted bool) { + return c.lru.Add(key, value) +} diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go new file mode 100644 index 000000000000..aa4d88dd2efb --- /dev/null +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go @@ -0,0 +1,28 @@ +package lru + +import ( + "testing" +) + +func TestLRU(t *testing.T) { + cache, err := New(2) + if err != nil { + t.Fatalf("Error creating cache: %v", err) + } + + // Test cases + cache.Add("key1", "value1") + cache.Add("key2", "value2") + + value, ok := cache.Get("key1") + if !ok || value != "value1" { + t.Errorf("Expected value1, got %v", value) + } + + cache.Add("key3", "value3") + + _, ok = cache.Get("key4") + if ok { + t.Errorf("Expected key1 to be evicted") + } +} diff --git a/processor/opsrampk8sattributesprocessor/internal/redis/client.go b/processor/opsrampk8sattributesprocessor/internal/redis/client.go index c720d1eaa036..4f4e29398d0b 100644 --- a/processor/opsrampk8sattributesprocessor/internal/redis/client.go +++ b/processor/opsrampk8sattributesprocessor/internal/redis/client.go @@ -3,10 +3,15 @@ package redis import ( "context" + lru "github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sattributesprocessor/internal/lru" goredis "github.com/redis/go-redis/v9" "go.uber.org/zap" ) +const ( + CACHE_SIZE = 16 +) + type Client struct { Host string Port string @@ -70,25 +75,36 @@ func (c *Client) TestConnection(ctx context.Context) error { return nil } -func (c *Client) GetValueInString(ctx context.Context, key string) (value string) { - +func (c *Client) GetValueInString(ctx context.Context, key string) string { logger := c.logger - if c.Enabled { - if c.Connected { - val, err := c.GoClient.Get(ctx, key).Result() - if err == goredis.Nil { - logger.Debug("key does not exist", zap.Any("key", key)) - } else if err != nil { + // Try to init the cache if it is firt time + cache, err := lru.GetInstance(CACHE_SIZE) + if err != nil { + logger.Debug("Failed to initilize the cache for ", zap.Any("size", CACHE_SIZE), zap.Any(" error found was ", err)) + //TODO: Need to check whether I shuould return from here or not + } + value, ok := cache.Get(key) + if !ok { + logger.Debug("Failed to fetch the key from the cache ", zap.Any("value : ", key)) + if c.Enabled { + if c.Connected { + val, err := c.GoClient.Get(ctx, key).Result() + if err == goredis.Nil { + logger.Debug("key does not exist", zap.Any("key", key)) + } else if err != nil { + logger.Info("Trying to reconnect") + c.Init() + } else { + value = val + } + } else { logger.Info("Trying to reconnect") c.Init() - } else { - value = val } - } else { - logger.Info("Trying to reconnect") - c.Init() } + //Before returning ; it should update the cache + cache.Add(key, value) } - return + return value } From 63f9b52fc19729222dffd0321933d87ed8d24935 Mon Sep 17 00:00:00 2001 From: Pradeep Thota Date: Fri, 2 Aug 2024 06:42:09 +0000 Subject: [PATCH 2/6] enhancement:LRU expirable cache implementation with k8s attribute processor --- cmd/otelcontribcol/go.mod | 2 +- cmd/otelcontribcol/go.sum | 4 +- processor/k8sattributesprocessor/config.go | 23 +++ processor/k8sattributesprocessor/factory.go | 2 + processor/k8sattributesprocessor/go.mod | 3 + processor/k8sattributesprocessor/go.sum | 10 ++ .../internal/kube/kube.go | 9 ++ .../internal/lru/lru.go | 53 +++++++ .../internal/lru/lru_test.go | 25 ++++ .../internal/moid/moid.go | 141 ++++++++++++++++++ .../internal/redis/client.go | 107 +++++++++++++ processor/k8sattributesprocessor/options.go | 10 ++ processor/k8sattributesprocessor/processor.go | 105 ++++++++++++- .../internal/lru/lru.go | 39 +++-- .../internal/lru/lru_test.go | 5 +- .../internal/redis/client.go | 13 +- 16 files changed, 520 insertions(+), 31 deletions(-) create mode 100644 processor/k8sattributesprocessor/internal/lru/lru.go create mode 100644 processor/k8sattributesprocessor/internal/lru/lru_test.go create mode 100644 processor/k8sattributesprocessor/internal/moid/moid.go create mode 100644 processor/k8sattributesprocessor/internal/redis/client.go diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index b8050215c1bc..ce2a5c8c8592 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -691,7 +691,7 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/redis/go-redis/v9 v9.5.3 // indirect + github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/relvacode/iso8601 v1.4.0 // indirect github.com/rs/cors v1.11.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index e3713f1cf9bc..d261cd18e3cb 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -2084,8 +2084,8 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= -github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs= github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index db88bc18a0b0..7e225993c132 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -41,6 +41,8 @@ type Config struct { //Opsramp Metadata Addons Section MetadataAddOn []AddOnMetadata `mapstructure:"metadata_addon"` + + RedisConfig OpsrampRedisConfig `mapstructure:"redis_config"` } func (cfg *Config) Validate() error { @@ -119,6 +121,18 @@ func (cfg *Config) Validate() error { } } + if cfg.RedisConfig.RedisHost == "" || cfg.RedisConfig.RedisPort == "" || cfg.RedisConfig.RedisPass == "" { + return fmt.Errorf("redis host, redis port and redis pass is mandatory") + } + + if cfg.RedisConfig.ClusterName == "" || cfg.RedisConfig.ClusterUid == "" { + return fmt.Errorf("cluster name and cluster uid is mandatory") + } + + if cfg.RedisConfig.NodeName == "" { + return fmt.Errorf("node name is mandatory") + } + return nil } @@ -323,3 +337,12 @@ type AddOnMetadata struct { Value string `mapstructure:"value"` } + +type OpsrampRedisConfig struct { + RedisHost string `mapstructure:"redisHost"` + RedisPort string `mapstructure:"redisPort"` + RedisPass string `mapstructure:"redisPass"` + ClusterName string `mapstructure:"clusterName"` + ClusterUid string `mapstructure:"clusterUid"` + NodeName string `mapstructure:"nodeName"` +} diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 1e60ab2a9383..8ddb2717bbfd 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -166,6 +166,8 @@ func createProcessorOpts(cfg component.Config) []option { //Opsramp Metadata Addons opts = append(opts, withAddOnFields(oCfg.MetadataAddOn...)) + opts = append(opts, withRedisConfigFields(oCfg.RedisConfig)) + opts = append(opts, withExcludes(oCfg.Exclude)) return opts diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index 4f1ed9132ee9..0bce8b7971ae 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -5,8 +5,10 @@ go 1.21.0 require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.104.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.104.0 + github.com/redis/go-redis/v9 v9.6.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.104.0 go.opentelemetry.io/collector/component v0.104.0 @@ -36,6 +38,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 2b32857a3a36..bc83503a3249 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -788,6 +788,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -823,6 +827,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v25.0.5+incompatible h1:UmQydMduGkrD5nQde1mecF/YnSbTOaPeFIeP5C4W+DE= @@ -1046,6 +1052,8 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -1174,6 +1182,8 @@ github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 7c94528490ab..167c065a391d 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -364,3 +364,12 @@ type AddOnMetadata struct { Value string `mapstructure:"value"` } + +type OpsrampRedisConfig struct { + RedisHost string `mapstructure:"redisHost"` + RedisPort string `mapstructure:"redisPort"` + RedisPass string `mapstructure:"redisPass"` + ClusterName string `mapstructure:"clusterName"` + ClusterUid string `mapstructure:"clusterUid"` + NodeName string `mapstructure:"nodeName"` +} diff --git a/processor/k8sattributesprocessor/internal/lru/lru.go b/processor/k8sattributesprocessor/internal/lru/lru.go new file mode 100644 index 000000000000..4d560027155e --- /dev/null +++ b/processor/k8sattributesprocessor/internal/lru/lru.go @@ -0,0 +1,53 @@ +package lru + +import ( + "fmt" + "sync" + "time" + + cache "github.com/hashicorp/golang-lru/v2/expirable" +) + +type Cache struct { + lrucache *cache.LRU[string, string] +} + +const ( + DEFAULT_CACHE_SIZE = 16 + DEFAULT_CACHE_EXPIRATION_INTERVAL = time.Minute +) + +var ( + once sync.Once + instance *Cache +) + +func New(size int, expirateInterval time.Duration) *Cache { + return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirateInterval)} +} + +func GetInstance() *Cache { + once.Do(func() { + instance = New(DEFAULT_CACHE_SIZE, DEFAULT_CACHE_EXPIRATION_INTERVAL) + }) + return instance +} + +func (c *Cache) Get(key string) (string, bool) { + return c.lrucache.Get(key) +} + +func (c *Cache) Add(key string, value string) { + c.lrucache.Add(key, value) +} + +func (c *Cache) AddEvicted(key string, value string) (evicted bool) { + return c.lrucache.Add(key, value) +} + +func (c *Cache) PrintKeys() { + fmt.Println(c.lrucache.Keys()) +} +func (c *Cache) PrintValues() { + fmt.Println(c.lrucache.Values()) +} diff --git a/processor/k8sattributesprocessor/internal/lru/lru_test.go b/processor/k8sattributesprocessor/internal/lru/lru_test.go new file mode 100644 index 000000000000..96a64db7445a --- /dev/null +++ b/processor/k8sattributesprocessor/internal/lru/lru_test.go @@ -0,0 +1,25 @@ +package lru + +import ( + "testing" +) + +func TestLRU(t *testing.T) { + cache := New(2, 0) + + // Test cases + cache.Add("key1", "value1") + cache.Add("key2", "value2") + + value, ok := cache.Get("key1") + if !ok || value != "value1" { + t.Errorf("Expected value1, got %v", value) + } + + cache.Add("key3", "value3") + + _, ok = cache.Get("key4") + if ok { + t.Errorf("Expected key1 to be evicted") + } +} diff --git a/processor/k8sattributesprocessor/internal/moid/moid.go b/processor/k8sattributesprocessor/internal/moid/moid.go new file mode 100644 index 000000000000..ff9bd2314c8b --- /dev/null +++ b/processor/k8sattributesprocessor/internal/moid/moid.go @@ -0,0 +1,141 @@ +package moid + +const ( + CLUSTER = "Cluster" + NAMESPACE = "Namespace" + NODE = "Node" + SERVICE = "Service" + DEPLOYMENT = "Deployment" + DAEMONSET = "DaemonSet" + REPLICASET = "ReplicaSet" + STATEFULSET = "StatefulSet" + POD = "Pod" + SEPARATOR = "_" +) + +type Moid struct { + clusterName string + clusterUuid string + namespaceName string + nodeName string + serviceName string + deploymentName string + replicasetName string + daemonsetName string + statefulsetName string + podName string +} + +func NewMoid(clusterName string) *Moid { + return &Moid{clusterName: clusterName} +} + +func (m *Moid) WithClusterUuid(clusterUuid string) *Moid { + m.clusterUuid = clusterUuid + return m +} + +func (m *Moid) WithNamespaceName(namespaceName string) *Moid { + m.namespaceName = namespaceName + return m +} + +func (m *Moid) WithNodeName(nodeName string) *Moid { + m.nodeName = nodeName + return m +} + +func (m *Moid) WithServiceName(serviceName string) *Moid { + m.serviceName = serviceName + return m +} + +func (m *Moid) WithDeploymentName(deploymentName string) *Moid { + m.deploymentName = deploymentName + return m +} + +func (m *Moid) WithReplicasetName(replicasetName string) *Moid { + m.replicasetName = replicasetName + return m +} + +func (m *Moid) WithDaemonsetName(daemonsetName string) *Moid { + m.daemonsetName = daemonsetName + return m +} + +func (m *Moid) WithStatefulsetName(statefulsetName string) *Moid { + m.statefulsetName = statefulsetName + return m +} + +func (m *Moid) WithPodName(podName string) *Moid { + m.podName = podName + return m +} + +func (m *Moid) PodMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + if m.replicasetName != "" { + moid += REPLICASET + SEPARATOR + m.replicasetName + SEPARATOR + } else if m.daemonsetName != "" { + moid += DAEMONSET + SEPARATOR + m.daemonsetName + SEPARATOR + } else if m.statefulsetName != "" { + moid += STATEFULSET + SEPARATOR + m.statefulsetName + SEPARATOR + } + + moid += POD + SEPARATOR + m.podName + return +} + +func (m *Moid) NodeMoid() (moid string) { + moid = m.clusterName + SEPARATOR + NODE + SEPARATOR + m.nodeName + return +} + +func (m *Moid) NamespaceMoid() (moid string) { + moid = m.clusterName + SEPARATOR + NAMESPACE + SEPARATOR + m.namespaceName + return +} + +func (m *Moid) DaemonSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += DAEMONSET + SEPARATOR + m.daemonsetName + return +} + +func (m *Moid) ReplicaSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += REPLICASET + SEPARATOR + m.replicasetName + return +} + +func (m *Moid) StatefulSetMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += STATEFULSET + SEPARATOR + m.statefulsetName + return +} + +func (m *Moid) DeploymentMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += DEPLOYMENT + SEPARATOR + m.deploymentName + return +} + +func (m *Moid) ServiceMoid() (moid string) { + moid = m.clusterName + SEPARATOR + m.namespaceName + SEPARATOR + + moid += SERVICE + SEPARATOR + m.serviceName + return +} + +func (m *Moid) ClusterMoid() (moid string) { + moid = m.clusterName + SEPARATOR + CLUSTER + SEPARATOR + m.clusterUuid + return +} diff --git a/processor/k8sattributesprocessor/internal/redis/client.go b/processor/k8sattributesprocessor/internal/redis/client.go new file mode 100644 index 000000000000..72011322cac5 --- /dev/null +++ b/processor/k8sattributesprocessor/internal/redis/client.go @@ -0,0 +1,107 @@ +package redis + +import ( + "context" + + lru "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" + goredis "github.com/redis/go-redis/v9" + "go.uber.org/zap" +) + +type Client struct { + Host string + Port string + Password string + GoClient *goredis.Client + Enabled bool + Connected bool + logger *zap.Logger +} + +func NewClient(logger *zap.Logger, rHost, rPort, rPass string) *Client { + client := Client{ + Host: rHost, + Port: rPort, + Password: rPass, + Enabled: true, + Connected: false, + logger: logger, + } + + if client.Host == "" { + client.Enabled = false + return &client + } + + if client.Port == "" { + client.Port = "6379" + } + + client.Init() + + return &client +} + +func (c *Client) Init() error { + c.GoClient = goredis.NewClient(&goredis.Options{ + Addr: c.Host + ":" + c.Port, + Password: c.Password, + DB: 0, + }) + + if err := c.TestConnection(context.Background()); err != nil { + c.Connected = false + return err + } + + c.Connected = true + + return nil +} + +func (c *Client) TestConnection(ctx context.Context) error { + logger := c.logger + _, err := c.GoClient.Ping(ctx).Result() + if err != nil { + logger.Error("Could not connect/ping to Redis", zap.Any("error", err.Error())) + return err + } + logger.Info("Connected to Redis") + + return nil +} + +func (c *Client) GetValueInString(ctx context.Context, key string) string { + logger := c.logger + + // Try to init the cache if it is firt time + cache := lru.GetInstance() + + if cache != nil { + logger.Debug("Failed to initilize the cache with GetInstance()") + return "" + } + value, ok := cache.Get(key) + if !ok { + logger.Debug("Failed to fetch the key from the cache ", zap.Any("value : ", key)) + if c.Enabled { + if c.Connected { + val, err := c.GoClient.Get(ctx, key).Result() + if err == goredis.Nil { + logger.Debug("key does not exist", zap.Any("key", key)) + } else if err != nil { + logger.Info("Trying to reconnect") + c.Init() + } else { + value = val + } + } else { + logger.Info("Trying to reconnect") + c.Init() + } + } + //Before returning ; it should update the cache + cache.Add(key, value) + } + return value +} diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 13592e16f36d..20158547f465 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -389,3 +389,13 @@ func withAddOnFields(filters ...AddOnMetadata) option { return nil } } + +func withRedisConfigFields(filters OpsrampRedisConfig) option { + fmt.Println("The value of filters received : ", filters) + + return func(p *kubernetesprocessor) error { + p.redisConfig = kube.OpsrampRedisConfig(filters) + return nil + } + +} diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 5515e8f805d1..accba67f8e1b 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -18,6 +18,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/moid" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) const ( @@ -35,8 +37,10 @@ type kubernetesprocessor struct { rules kube.ExtractionRules filters kube.Filters addons []kube.AddOnMetadata + redisConfig kube.OpsrampRedisConfig podAssociations []kube.Association podIgnore kube.Excludes + redisClient *redis.Client } func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { @@ -54,6 +58,13 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k } func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error { + + kp.logger.Info("ops k8s attr processor start", zap.Any("redisHost", kp.redisConfig.RedisHost), + zap.Any("redisPort", kp.redisConfig.RedisPort), + zap.Any("redisPass", kp.redisConfig.RedisPass)) + + kp.redisClient = redis.NewClient(kp.logger, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) + allOptions := append(createProcessorOpts(kp.cfg), kp.options...) for _, opt := range allOptions { @@ -62,7 +73,6 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error return nil } } - // This might have been set by an option already if kp.kc == nil { err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) @@ -179,6 +189,99 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } } } + kp.processopsrampResources(ctx, resource) +} + +// processResource adds Pod metadata tags to resource based on pod association configuration +func (op *kubernetesprocessor) processopsrampResources(ctx context.Context, resource pcommon.Resource) { + var found bool + var resourceUuid string + + resource.Attributes().PutStr("opsramp.k8s.cluster.name", op.redisConfig.ClusterName) + resource.Attributes().PutStr("opsramp.k8s.cluster.uid", op.redisConfig.ClusterUid) + if _, found = resource.Attributes().Get("k8s.pod.uid"); found { + if resourceUuid = op.GetResourceUuidUsingPodMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = op.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) + } + } + } + } else if _, found = resource.Attributes().Get("k8s.node.name"); found { + if resourceUuid = op.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) + } + } + } else { + if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) + } + } + + if resourceUuid != "" { + resource.Attributes().PutStr("uuid", resourceUuid) + } +} + +func (op *kubernetesprocessor) GetResourceUuidUsingPodMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + var namespace, podname, rsname, dsname, ssname pcommon.Value + var found bool + + if namespace, found = resource.Attributes().Get("k8s.namespace.name"); !found { + return + } + if podname, found = resource.Attributes().Get("k8s.pod.name"); !found { + return + } + + podMoid := moid.NewMoid(op.redisConfig.ClusterName).WithNamespaceName(namespace.Str()).WithPodName(podname.Str()) + + if rsname, found = resource.Attributes().Get("k8s.replicaset.name"); found { + podMoid.WithReplicasetName(rsname.Str()) + } else if dsname, found = resource.Attributes().Get("k8s.daemonset.name"); found { + podMoid.WithDaemonsetName(dsname.Str()) + } else if ssname, found = resource.Attributes().Get("k8s.statefulset.name"); found { + podMoid.WithStatefulsetName(ssname.Str()) + } + + podMoidKey := podMoid.PodMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, podMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", podMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingResourceNodeMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + var nodename pcommon.Value + var found bool + if nodename, found = resource.Attributes().Get("k8s.node.name"); !found { + return + } + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(nodename.Str()).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingCurrentNodeMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(op.redisConfig.NodeName).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return +} + +func (op *kubernetesprocessor) GetResourceUuidUsingClusterMoid(ctx context.Context, resource pcommon.Resource) (resourceUuid string) { + + nodeMoidKey := moid.NewMoid(op.redisConfig.ClusterName).WithNodeName(op.redisConfig.NodeName).NodeMoid() + + resourceUuid = op.redisClient.GetValueInString(ctx, nodeMoidKey) + op.logger.Debug("redis KV ", zap.Any("key", nodeMoidKey), zap.Any("value", resourceUuid)) + return } func getNamespace(pod *kube.Pod, resAttrs pcommon.Map) string { diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go index 47c3b47074d2..4d560027155e 100644 --- a/processor/opsrampk8sattributesprocessor/internal/lru/lru.go +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru.go @@ -1,44 +1,53 @@ package lru import ( + "fmt" "sync" + "time" - cache "github.com/hashicorp/golang-lru/v2" + cache "github.com/hashicorp/golang-lru/v2/expirable" ) type Cache struct { - lru *cache.Cache[string, string] + lrucache *cache.LRU[string, string] } +const ( + DEFAULT_CACHE_SIZE = 16 + DEFAULT_CACHE_EXPIRATION_INTERVAL = time.Minute +) + var ( once sync.Once instance *Cache - err error ) -func New(size int) (*Cache, error) { - cache, err := cache.New[string, string](size) - if err != nil { - return nil, err - } - return &Cache{lru: cache}, nil +func New(size int, expirateInterval time.Duration) *Cache { + return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirateInterval)} } -func GetInstance(size int) (*Cache, error) { +func GetInstance() *Cache { once.Do(func() { - instance, err = New(size) + instance = New(DEFAULT_CACHE_SIZE, DEFAULT_CACHE_EXPIRATION_INTERVAL) }) - return instance, err + return instance } func (c *Cache) Get(key string) (string, bool) { - return c.lru.Get(key) + return c.lrucache.Get(key) } func (c *Cache) Add(key string, value string) { - c.lru.Add(key, value) + c.lrucache.Add(key, value) } func (c *Cache) AddEvicted(key string, value string) (evicted bool) { - return c.lru.Add(key, value) + return c.lrucache.Add(key, value) +} + +func (c *Cache) PrintKeys() { + fmt.Println(c.lrucache.Keys()) +} +func (c *Cache) PrintValues() { + fmt.Println(c.lrucache.Values()) } diff --git a/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go index aa4d88dd2efb..96a64db7445a 100644 --- a/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go +++ b/processor/opsrampk8sattributesprocessor/internal/lru/lru_test.go @@ -5,10 +5,7 @@ import ( ) func TestLRU(t *testing.T) { - cache, err := New(2) - if err != nil { - t.Fatalf("Error creating cache: %v", err) - } + cache := New(2, 0) // Test cases cache.Add("key1", "value1") diff --git a/processor/opsrampk8sattributesprocessor/internal/redis/client.go b/processor/opsrampk8sattributesprocessor/internal/redis/client.go index 4f4e29398d0b..fa1c7e4ceacc 100644 --- a/processor/opsrampk8sattributesprocessor/internal/redis/client.go +++ b/processor/opsrampk8sattributesprocessor/internal/redis/client.go @@ -8,10 +8,6 @@ import ( "go.uber.org/zap" ) -const ( - CACHE_SIZE = 16 -) - type Client struct { Host string Port string @@ -79,10 +75,11 @@ func (c *Client) GetValueInString(ctx context.Context, key string) string { logger := c.logger // Try to init the cache if it is firt time - cache, err := lru.GetInstance(CACHE_SIZE) - if err != nil { - logger.Debug("Failed to initilize the cache for ", zap.Any("size", CACHE_SIZE), zap.Any(" error found was ", err)) - //TODO: Need to check whether I shuould return from here or not + cache := lru.GetInstance() + + if cache != nil { + logger.Debug("Failed to initilize the cache with GetInstance()") + return "" } value, ok := cache.Get(key) if !ok { From 2ae41a9810890628e4b8aa427fc5fd4bcf8458c7 Mon Sep 17 00:00:00 2001 From: mithunbelur Date: Fri, 2 Aug 2024 09:25:16 +0000 Subject: [PATCH 3/6] redis client code changes and testing lru --- .../internal/redis/client.go | 86 ++++++++++--------- processor/k8sattributesprocessor/processor.go | 57 ++++++------ 2 files changed, 78 insertions(+), 65 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/redis/client.go b/processor/k8sattributesprocessor/internal/redis/client.go index 72011322cac5..5cffba228b16 100644 --- a/processor/k8sattributesprocessor/internal/redis/client.go +++ b/processor/k8sattributesprocessor/internal/redis/client.go @@ -2,6 +2,7 @@ package redis import ( "context" + "time" lru "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" goredis "github.com/redis/go-redis/v9" @@ -9,26 +10,25 @@ import ( ) type Client struct { - Host string - Port string - Password string - GoClient *goredis.Client - Enabled bool - Connected bool - logger *zap.Logger + Host string + Port string + Password string + GoClient *goredis.Client + Enabled bool + logger *zap.Logger } func NewClient(logger *zap.Logger, rHost, rPort, rPass string) *Client { client := Client{ - Host: rHost, - Port: rPort, - Password: rPass, - Enabled: true, - Connected: false, - logger: logger, + Host: rHost, + Port: rPort, + Password: rPass, + Enabled: true, + logger: logger, } if client.Host == "" { + logger.Info("Redis Host is empty, hence no lookup for moid/resourceuuid cache") client.Enabled = false return &client } @@ -44,31 +44,42 @@ func NewClient(logger *zap.Logger, rHost, rPort, rPass string) *Client { func (c *Client) Init() error { c.GoClient = goredis.NewClient(&goredis.Options{ - Addr: c.Host + ":" + c.Port, - Password: c.Password, - DB: 0, + Addr: c.Host + ":" + c.Port, + Password: c.Password, + MaxRetries: -1, + MinRetryBackoff: 55 * time.Millisecond, + MaxRetryBackoff: 2 * time.Second, }) if err := c.TestConnection(context.Background()); err != nil { - c.Connected = false return err } - c.Connected = true - return nil } func (c *Client) TestConnection(ctx context.Context) error { logger := c.logger - _, err := c.GoClient.Ping(ctx).Result() + + var err error + err = nil + + for i := 0; i < 15; i++ { + _, err = c.GoClient.Ping(ctx).Result() + if err != nil { + logger.Info("Could not connect/ping to Redis", zap.Any("error", err.Error())) + } else { + logger.Info("Connected to Redis") + break + } + time.Sleep(1 * time.Second) + } + if err != nil { logger.Error("Could not connect/ping to Redis", zap.Any("error", err.Error())) - return err } - logger.Info("Connected to Redis") - return nil + return err } func (c *Client) GetValueInString(ctx context.Context, key string) string { @@ -77,31 +88,28 @@ func (c *Client) GetValueInString(ctx context.Context, key string) string { // Try to init the cache if it is firt time cache := lru.GetInstance() - if cache != nil { - logger.Debug("Failed to initilize the cache with GetInstance()") + if cache == nil { + logger.Error("Failed to initilize the cache with GetInstance()") return "" } + value, ok := cache.Get(key) if !ok { logger.Debug("Failed to fetch the key from the cache ", zap.Any("value : ", key)) if c.Enabled { - if c.Connected { - val, err := c.GoClient.Get(ctx, key).Result() - if err == goredis.Nil { - logger.Debug("key does not exist", zap.Any("key", key)) - } else if err != nil { - logger.Info("Trying to reconnect") - c.Init() - } else { - value = val - } + val, err := c.GoClient.Get(ctx, key).Result() + if err == goredis.Nil { + logger.Debug("key does not exist ", zap.Any("key", key)) + } else if err != nil { + logger.Error("Failed to fetch the key from redis ", zap.Error(err)) } else { - logger.Info("Trying to reconnect") - c.Init() + value = val } } - //Before returning ; it should update the cache - cache.Add(key, value) + + if value != "" { + cache.Add(key, value) + } } return value } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index accba67f8e1b..6eae710107f5 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -58,13 +58,6 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k } func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error { - - kp.logger.Info("ops k8s attr processor start", zap.Any("redisHost", kp.redisConfig.RedisHost), - zap.Any("redisPort", kp.redisConfig.RedisPort), - zap.Any("redisPass", kp.redisConfig.RedisPass)) - - kp.redisClient = redis.NewClient(kp.logger, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) - allOptions := append(createProcessorOpts(kp.cfg), kp.options...) for _, opt := range allOptions { @@ -73,6 +66,13 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error return nil } } + + kp.logger.Info("ops k8s attr processor start", zap.Any("redisHost", kp.redisConfig.RedisHost), + zap.Any("redisPort", kp.redisConfig.RedisPort), + zap.Any("redisPass", kp.redisConfig.RedisPass)) + + kp.redisClient = redis.NewClient(kp.logger, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) + // This might have been set by an option already if kp.kc == nil { err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) @@ -132,11 +132,6 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations) kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue)) - for _, addon := range kp.addons { - //fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value) - resource.Attributes().PutStr(addon.Key, addon.Value) - } - for i := range podIdentifierValue { if podIdentifierValue[i].Source.From == kube.ConnectionSource && podIdentifierValue[i].Value != "" { if _, found := resource.Attributes().Get(kube.K8sIPLabelName); !found { @@ -193,30 +188,40 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco } // processResource adds Pod metadata tags to resource based on pod association configuration -func (op *kubernetesprocessor) processopsrampResources(ctx context.Context, resource pcommon.Resource) { +func (kp *kubernetesprocessor) processopsrampResources(ctx context.Context, resource pcommon.Resource) { var found bool var resourceUuid string - resource.Attributes().PutStr("opsramp.k8s.cluster.name", op.redisConfig.ClusterName) - resource.Attributes().PutStr("opsramp.k8s.cluster.uid", op.redisConfig.ClusterUid) + for _, addon := range kp.addons { + //fmt.Println(">>>>>> Addons Added key : ", addon.Key, " Value ", addon.Value) + resource.Attributes().PutStr(addon.Key, addon.Value) + } + if _, found = resource.Attributes().Get("k8s.pod.uid"); found { - if resourceUuid = op.GetResourceUuidUsingPodMoid(ctx, resource); resourceUuid == "" { - if resourceUuid = op.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { - if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { - resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) + if resourceUuid = kp.GetResourceUuidUsingPodMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + /* + if resourceUuid = kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + } } - } + */ } } else if _, found = resource.Attributes().Get("k8s.node.name"); found { - if resourceUuid = op.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { - if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { - resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) + if resourceUuid = kp.GetResourceUuidUsingResourceNodeMoid(ctx, resource); resourceUuid == "" { + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) } } } else { - if resourceUuid = op.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { - resourceUuid = op.GetResourceUuidUsingClusterMoid(ctx, resource) - } + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + + /* + if resourceUuid = kp.GetResourceUuidUsingCurrentNodeMoid(ctx, resource); resourceUuid == "" { + resourceUuid = kp.GetResourceUuidUsingClusterMoid(ctx, resource) + } + */ } if resourceUuid != "" { From ed9b2bc6304f121bc0c496d82da1743ad1429bea Mon Sep 17 00:00:00 2001 From: Pradeep Thota Date: Fri, 2 Aug 2024 10:00:31 +0000 Subject: [PATCH 4/6] enhance:moving opsramp config from internal/kube to internal/redis --- processor/k8sattributesprocessor/config.go | 12 ++---------- .../k8sattributesprocessor/internal/kube/kube.go | 9 --------- .../k8sattributesprocessor/internal/redis/client.go | 9 +++++++++ processor/k8sattributesprocessor/options.go | 5 +++-- processor/k8sattributesprocessor/processor.go | 2 +- 5 files changed, 15 insertions(+), 22 deletions(-) diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 7e225993c132..1bf8063748ed 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) // Config defines configuration for k8s attributes processor. @@ -42,7 +43,7 @@ type Config struct { //Opsramp Metadata Addons Section MetadataAddOn []AddOnMetadata `mapstructure:"metadata_addon"` - RedisConfig OpsrampRedisConfig `mapstructure:"redis_config"` + RedisConfig redis.OpsrampRedisConfig `mapstructure:"redis_config"` } func (cfg *Config) Validate() error { @@ -337,12 +338,3 @@ type AddOnMetadata struct { Value string `mapstructure:"value"` } - -type OpsrampRedisConfig struct { - RedisHost string `mapstructure:"redisHost"` - RedisPort string `mapstructure:"redisPort"` - RedisPass string `mapstructure:"redisPass"` - ClusterName string `mapstructure:"clusterName"` - ClusterUid string `mapstructure:"clusterUid"` - NodeName string `mapstructure:"nodeName"` -} diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 167c065a391d..7c94528490ab 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -364,12 +364,3 @@ type AddOnMetadata struct { Value string `mapstructure:"value"` } - -type OpsrampRedisConfig struct { - RedisHost string `mapstructure:"redisHost"` - RedisPort string `mapstructure:"redisPort"` - RedisPass string `mapstructure:"redisPass"` - ClusterName string `mapstructure:"clusterName"` - ClusterUid string `mapstructure:"clusterUid"` - NodeName string `mapstructure:"nodeName"` -} diff --git a/processor/k8sattributesprocessor/internal/redis/client.go b/processor/k8sattributesprocessor/internal/redis/client.go index 5cffba228b16..8bf555bc58fa 100644 --- a/processor/k8sattributesprocessor/internal/redis/client.go +++ b/processor/k8sattributesprocessor/internal/redis/client.go @@ -18,6 +18,15 @@ type Client struct { logger *zap.Logger } +type OpsrampRedisConfig struct { + RedisHost string `mapstructure:"redisHost"` + RedisPort string `mapstructure:"redisPort"` + RedisPass string `mapstructure:"redisPass"` + ClusterName string `mapstructure:"clusterName"` + ClusterUid string `mapstructure:"clusterUid"` + NodeName string `mapstructure:"nodeName"` +} + func NewClient(logger *zap.Logger, rHost, rPort, rPass string) *Client { client := Client{ Host: rHost, diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 20158547f465..80431959022e 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) const ( @@ -390,11 +391,11 @@ func withAddOnFields(filters ...AddOnMetadata) option { } } -func withRedisConfigFields(filters OpsrampRedisConfig) option { +func withRedisConfigFields(filters redis.OpsrampRedisConfig) option { fmt.Println("The value of filters received : ", filters) return func(p *kubernetesprocessor) error { - p.redisConfig = kube.OpsrampRedisConfig(filters) + p.redisConfig = filters return nil } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 6eae710107f5..b9dd83721119 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -37,7 +37,7 @@ type kubernetesprocessor struct { rules kube.ExtractionRules filters kube.Filters addons []kube.AddOnMetadata - redisConfig kube.OpsrampRedisConfig + redisConfig redis.OpsrampRedisConfig podAssociations []kube.Association podIgnore kube.Excludes redisClient *redis.Client From fda93198193a3378fa7ad0070783c28f478b9778 Mon Sep 17 00:00:00 2001 From: mithunbelur Date: Fri, 2 Aug 2024 15:23:18 +0000 Subject: [PATCH 5/6] configurable lru cache size and expirty --- processor/k8sattributesprocessor/config.go | 9 ++++++ .../internal/lru/lru.go | 12 ++++---- .../internal/redis/client.go | 29 ++++++++++--------- processor/k8sattributesprocessor/processor.go | 10 ++++++- 4 files changed, 40 insertions(+), 20 deletions(-) diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 1bf8063748ed..0831f7c58099 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) @@ -134,6 +135,14 @@ func (cfg *Config) Validate() error { return fmt.Errorf("node name is mandatory") } + if cfg.RedisConfig.LruExpirationTime == 0 { + cfg.RedisConfig.LruExpirationTime = lru.DEFAULT_CACHE_EXPIRATION_INTERVAL + } + + if cfg.RedisConfig.LruCacheSize == 0 { + cfg.RedisConfig.LruCacheSize = lru.DEFAULT_CACHE_SIZE + } + return nil } diff --git a/processor/k8sattributesprocessor/internal/lru/lru.go b/processor/k8sattributesprocessor/internal/lru/lru.go index 4d560027155e..5c081282a010 100644 --- a/processor/k8sattributesprocessor/internal/lru/lru.go +++ b/processor/k8sattributesprocessor/internal/lru/lru.go @@ -13,8 +13,8 @@ type Cache struct { } const ( - DEFAULT_CACHE_SIZE = 16 - DEFAULT_CACHE_EXPIRATION_INTERVAL = time.Minute + DEFAULT_CACHE_SIZE = 256 + DEFAULT_CACHE_EXPIRATION_INTERVAL = 10 * time.Minute ) var ( @@ -22,13 +22,13 @@ var ( instance *Cache ) -func New(size int, expirateInterval time.Duration) *Cache { - return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirateInterval)} +func New(size int, expirationInterval time.Duration) *Cache { + return &Cache{lrucache: cache.NewLRU[string, string](size, nil, expirationInterval)} } -func GetInstance() *Cache { +func GetInstance(size int, expirationInterval time.Duration) *Cache { once.Do(func() { - instance = New(DEFAULT_CACHE_SIZE, DEFAULT_CACHE_EXPIRATION_INTERVAL) + instance = New(size, expirationInterval) }) return instance } diff --git a/processor/k8sattributesprocessor/internal/redis/client.go b/processor/k8sattributesprocessor/internal/redis/client.go index 8bf555bc58fa..041b66abcdc4 100644 --- a/processor/k8sattributesprocessor/internal/redis/client.go +++ b/processor/k8sattributesprocessor/internal/redis/client.go @@ -14,26 +14,30 @@ type Client struct { Port string Password string GoClient *goredis.Client + lruCache *lru.Cache Enabled bool logger *zap.Logger } type OpsrampRedisConfig struct { - RedisHost string `mapstructure:"redisHost"` - RedisPort string `mapstructure:"redisPort"` - RedisPass string `mapstructure:"redisPass"` - ClusterName string `mapstructure:"clusterName"` - ClusterUid string `mapstructure:"clusterUid"` - NodeName string `mapstructure:"nodeName"` + RedisHost string `mapstructure:"redisHost"` + RedisPort string `mapstructure:"redisPort"` + RedisPass string `mapstructure:"redisPass"` + ClusterName string `mapstructure:"clusterName"` + ClusterUid string `mapstructure:"clusterUid"` + NodeName string `mapstructure:"nodeName"` + LruCacheSize int `mapstructure:"lruCacheSize"` + LruExpirationTime time.Duration `mapstructure:"lruExpirationTime"` } -func NewClient(logger *zap.Logger, rHost, rPort, rPass string) *Client { +func NewClient(logger *zap.Logger, lruCache *lru.Cache, rHost, rPort, rPass string) *Client { client := Client{ Host: rHost, Port: rPort, Password: rPass, Enabled: true, logger: logger, + lruCache: lruCache, } if client.Host == "" { @@ -95,16 +99,12 @@ func (c *Client) GetValueInString(ctx context.Context, key string) string { logger := c.logger // Try to init the cache if it is firt time - cache := lru.GetInstance() - if cache == nil { - logger.Error("Failed to initilize the cache with GetInstance()") - return "" - } + cache := c.lruCache value, ok := cache.Get(key) if !ok { - logger.Debug("Failed to fetch the key from the cache ", zap.Any("value : ", key)) + logger.Debug("Failed to fetch the key from lru cache ", zap.Any("key", key)) if c.Enabled { val, err := c.GoClient.Get(ctx, key).Result() if err == goredis.Nil { @@ -112,6 +112,7 @@ func (c *Client) GetValueInString(ctx context.Context, key string) string { } else if err != nil { logger.Error("Failed to fetch the key from redis ", zap.Error(err)) } else { + logger.Debug("Got value from redis ", zap.Any("key", key), zap.Any("value", value)) value = val } } @@ -119,6 +120,8 @@ func (c *Client) GetValueInString(ctx context.Context, key string) string { if value != "" { cache.Add(key, value) } + } else { + logger.Debug("Got value from lru cache ", zap.Any("key", key), zap.Any("value", value)) } return value } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index b9dd83721119..c292a50bafe6 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -18,6 +18,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/lru" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/moid" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/redis" ) @@ -71,7 +72,14 @@ func (kp *kubernetesprocessor) Start(_ context.Context, _ component.Host) error zap.Any("redisPort", kp.redisConfig.RedisPort), zap.Any("redisPass", kp.redisConfig.RedisPass)) - kp.redisClient = redis.NewClient(kp.logger, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) + cache := lru.GetInstance(kp.redisConfig.LruCacheSize, kp.redisConfig.LruExpirationTime) + + if cache == nil { + kp.logger.Error("Failed to initilize the cache with GetInstance()") + return nil + } + + kp.redisClient = redis.NewClient(kp.logger, cache, kp.redisConfig.RedisHost, kp.redisConfig.RedisPort, kp.redisConfig.RedisPass) // This might have been set by an option already if kp.kc == nil { From 37136846e7ec9453808394b896b42fd8b5e6669d Mon Sep 17 00:00:00 2001 From: mithunbelur Date: Fri, 23 Aug 2024 09:41:19 +0000 Subject: [PATCH 6/6] gomod changes after merge --- cmd/otelcontribcol/go.mod | 17 ++++++++++------- cmd/otelcontribcol/go.sum | 28 ++++++++++++++-------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index b4ab448f7853..f1be2fdc27f8 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -50,6 +50,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/mezmoexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter v0.106.1 + github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opsrampdebugexporter v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter v0.106.1 @@ -108,6 +109,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricsgenerationprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.106.1 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sattributesprocessor v0.0.0-00010101000000-000000000000 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/opsrampk8sobjectsprocessor v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.106.1 github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.106.1 @@ -238,7 +241,7 @@ require ( go.opentelemetry.io/collector/receiver v0.106.1 go.opentelemetry.io/collector/receiver/nopreceiver v0.106.1 go.opentelemetry.io/collector/receiver/otlpreceiver v0.106.1 - golang.org/x/sys v0.22.0 + golang.org/x/sys v0.24.0 ) require ( @@ -806,14 +809,14 @@ require ( go.uber.org/fx v1.18.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.25.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect - golang.org/x/net v0.27.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.22.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect @@ -821,7 +824,7 @@ require ( google.golang.org/api v0.189.0 // indirect google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 1bf012bab007..9cb603e1828f 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -2544,8 +2544,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2689,8 +2689,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2739,8 +2739,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2861,8 +2861,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2878,8 +2878,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2899,8 +2899,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -3213,8 +3213,8 @@ google.golang.org/genproto v0.0.0-20240722135656-d784300faade h1:lKFsS7wpngDgSCe google.golang.org/genproto v0.0.0-20240722135656-d784300faade/go.mod h1:FfBgJBJg9GcpPvKIuHSZ/aE1g2ecGL74upMzGZjiGEY= google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5 h1:a/Z0jgw03aJ2rQnp5PlPpznJqJft0HyvyrcUcxgzPwY= google.golang.org/genproto/googleapis/api v0.0.0-20240709173604-40e1e62336c5/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade h1:oCRSWfwGXQsqlVdErcyTt4A93Y8fo0/9D4b1gnI++qo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240722135656-d784300faade/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a h1:EKiZZXueP9/T68B8Nl0GAx9cjbQnCId0yP3qPMgaaHs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=