From 594462dc8c9562b5340d1d494ddf50d8ca519e86 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 16 Aug 2022 18:13:21 -0400 Subject: [PATCH 01/10] Resolve bug where TA doesn't allocate all targets --- cmd/otel-allocator/allocation/allocator.go | 29 ++++++---- .../allocation/allocator_test.go | 57 +++++++++++++++---- cmd/otel-allocator/allocation/http.go | 4 +- cmd/otel-allocator/main.go | 5 +- 4 files changed, 70 insertions(+), 25 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 93c41b0a48..41ed6104c9 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -40,6 +40,10 @@ type TargetItem struct { Collector *collector } +func (t TargetItem) hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + // Create a struct that holds collector - and jobs for that collector // This struct will be parsed into endpoint with collector and jobs info @@ -59,11 +63,15 @@ type Allocator struct { collectors map[string]*collector // all current collectors - TargetItems map[string]*TargetItem + targetItems map[string]*TargetItem log logr.Logger } +func (allocator *Allocator) TargetItems() map[string]*TargetItem { + return allocator.targetItems +} + // findNextCollector finds the next collector with less number of targets. func (allocator *Allocator) findNextCollector() *collector { var col *collector @@ -91,8 +99,9 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { allocator.targetsWaiting = make(map[string]TargetItem, len(targets)) // Set new data for _, i := range targets { - allocator.targetsWaiting[i.JobName+i.TargetURL] = i + allocator.targetsWaiting[i.hash()] = i } + //allocator.log.Info(fmt.Sprintf("len(targetsWaiting): %d\nlen(targets): %d", len(allocator.targetsWaiting), len(targets))) } // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. @@ -133,16 +142,16 @@ func (allocator *Allocator) ReallocateCollectors() { timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors")) defer timer.ObserveDuration() defer allocator.m.Unlock() - allocator.TargetItems = make(map[string]*TargetItem) + allocator.targetItems = make(map[string]*TargetItem) allocator.processWaitingTargets() } // removeOutdatedTargets removes targets that are no longer available. func (allocator *Allocator) removeOutdatedTargets() { - for k := range allocator.TargetItems { + for k := range allocator.targetItems { if _, ok := allocator.targetsWaiting[k]; !ok { - allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets-- - delete(allocator.TargetItems, k) + allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets-- + delete(allocator.targetItems, k) } } } @@ -150,9 +159,9 @@ func (allocator *Allocator) removeOutdatedTargets() { // processWaitingTargets processes the newly set targets. func (allocator *Allocator) processWaitingTargets() { for k, v := range allocator.targetsWaiting { - if _, ok := allocator.TargetItems[k]; !ok { + if _, ok := allocator.targetItems[k]; !ok { col := allocator.findNextCollector() - allocator.TargetItems[k] = &v + allocator.targetItems[k] = &v targetItem := TargetItem{ JobName: v.JobName, Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))}, @@ -162,7 +171,7 @@ func (allocator *Allocator) processWaitingTargets() { } col.NumTargets++ targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets)) - allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem + allocator.targetItems[v.hash()] = &targetItem } } } @@ -172,6 +181,6 @@ func NewAllocator(log logr.Logger) *Allocator { log: log, targetsWaiting: make(map[string]TargetItem), collectors: make(map[string]*collector), - TargetItems: make(map[string]*TargetItem), + targetItems: make(map[string]*TargetItem), } } diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 893422a3b4..596ff3b628 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -45,11 +45,12 @@ func TestAddingAndRemovingTargets(t *testing.T) { cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) + labels := model.LabelSet{} initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} var targetList []TargetItem for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly @@ -58,13 +59,13 @@ func TestAddingAndRemovingTargets(t *testing.T) { // verify expectedTargetLen := len(initTargets) - assert.Len(t, s.TargetItems, expectedTargetLen) + assert.Len(t, s.TargetItems(), expectedTargetLen) // prepare second round of targets tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} var newTargetList []TargetItem for _, i := range tar { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that less targets are found - removed @@ -73,11 +74,45 @@ func TestAddingAndRemovingTargets(t *testing.T) { // verify expectedNewTargetLen := len(tar) - assert.Len(t, s.TargetItems, expectedNewTargetLen) + assert.Len(t, s.TargetItems(), expectedNewTargetLen) // verify results map for _, i := range tar { - _, ok := s.TargetItems["sample-name"+i] + _, ok := s.TargetItems()["sample-name"+i+labels.Fingerprint().String()] + assert.True(t, ok) + } +} + +// Tests that two targets with the same target url and job name but different label set are both added +func TestAllocationCollision(t *testing.T) { + // prepare allocator with initial targets and collectors + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + firstLabels := model.LabelSet{ + "test": "test1", + } + secondLabels := model.LabelSet{ + "test": "test2", + } + + targetList := []TargetItem{ + TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, + TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, + } + + // test that targets and collectors are added properly + s.SetWaitingTargets(targetList) + s.AllocateTargets() + + // verify + expectedTargetLen := len(targetList) + assert.Len(t, s.TargetItems(), expectedTargetLen) + + // verify results map + for _, i := range targetList { + _, ok := s.TargetItems()[i.hash()] assert.True(t, ok) } } @@ -104,8 +139,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // Divisor needed to get 15% divisor := 6.7 - count := len(s.TargetItems) / len(s.collectors) - percent := float64(len(s.TargetItems)) / divisor + count := len(s.TargetItems()) / len(s.collectors) + percent := float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { @@ -123,8 +158,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { s.SetWaitingTargets(newTargetList) s.AllocateTargets() - count = len(s.TargetItems) / len(s.collectors) - percent = float64(len(s.TargetItems)) / divisor + count = len(s.TargetItems()) / len(s.collectors) + percent = float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { @@ -141,8 +176,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { s.SetWaitingTargets(newTargetList) s.AllocateTargets() - count = len(s.TargetItems) / len(s.collectors) - percent = float64(len(s.TargetItems)) / divisor + count = len(s.TargetItems()) / len(s.collectors) + percent = float64(len(s.TargetItems())) / divisor // test for _, i := range s.collectors { diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index 49b6cb65bd..01f4e5c82e 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -23,7 +23,7 @@ type targetGroupJSON struct { func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON { displayData := make(map[string]collectorJSON) - for _, j := range allocator.TargetItems { + for _, j := range allocator.TargetItems() { if j.JobName == job { var targetList []TargetItem targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...) @@ -52,7 +52,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin if col.Name == collector { for _, targetItemArr := range cMap { for _, targetItem := range targetItemArr { - if targetItem.Collector.Name == collector && targetItem.JobName == job { + if targetItem.Collector.Name == collector && targetItem.JobName == job { group[targetItem.Label.String()] = targetItem.TargetURL labelSet[targetItem.TargetURL] = targetItem.Label } diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index c1806c2e67..af63b5c551 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ctrl "sigs.k8s.io/controller-runtime" + //_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) var ( @@ -185,7 +186,7 @@ func (s *server) Shutdown(ctx context.Context) error { func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { displayData := make(map[string]allocation.LinkJSON) - for _, v := range s.allocator.TargetItems { + for _, v := range s.allocator.TargetItems() { displayData[v.JobName] = allocation.LinkJSON{v.Link.Link} } jsonHandler(w, r, displayData) @@ -206,7 +207,7 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem - for _, v := range s.allocator.TargetItems { + for _, v := range s.allocator.TargetItems() { compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v) } params := mux.Vars(r) From 36c22ded88428bf6b427577820b8bf18571ceea2 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 17 Aug 2022 15:43:58 -0400 Subject: [PATCH 02/10] Cleanup cleanup --- cmd/otel-allocator/allocation/allocator.go | 1 - cmd/otel-allocator/main.go | 1 - 2 files changed, 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 41ed6104c9..c59e3fb503 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -101,7 +101,6 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { for _, i := range targets { allocator.targetsWaiting[i.hash()] = i } - //allocator.log.Info(fmt.Sprintf("len(targetsWaiting): %d\nlen(targets): %d", len(allocator.targetsWaiting), len(targets))) } // SetCollectors sets the set of collectors with key=collectorName, value=Collector object. diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index af63b5c551..c467ff4bfc 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ctrl "sigs.k8s.io/controller-runtime" - //_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) var ( From 09e2319b00181fdac7f83acee0d0320a9557e2b9 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 17 Aug 2022 19:10:09 -0400 Subject: [PATCH 03/10] Changed how targets and collectors are allocated --- cmd/otel-allocator/allocation/allocator.go | 125 +++++++++--------- .../allocation/allocator_test.go | 6 - cmd/otel-allocator/collector/collector.go | 6 +- cmd/otel-allocator/main.go | 10 +- 4 files changed, 70 insertions(+), 77 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index c59e3fb503..68558e6322 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -59,7 +59,7 @@ type collector struct { type Allocator struct { m sync.Mutex - targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed + //targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed collectors map[string]*collector // all current collectors @@ -89,17 +89,63 @@ func (allocator *Allocator) findNextCollector() *collector { return col } +// allCollectorsPresent checks if all of the collectors provided are in the allocator's map +func (allocator *Allocator) allCollectorsPresent(collectors []string) bool { + if len(collectors) != len(allocator.collectors) { + return false + } + for _, s := range collectors { + if _, ok := allocator.collectors[s]; !ok { + return false + } + } + return true +} + // SetTargets accepts the a list of targets that will be used to make // load balancing decisions. This method should be called when where are // new targets discovered or existing targets are shutdown. func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetWaitingTargets")) + defer timer.ObserveDuration() // Dump old data allocator.m.Lock() defer allocator.m.Unlock() - allocator.targetsWaiting = make(map[string]TargetItem, len(targets)) - // Set new data - for _, i := range targets { - allocator.targetsWaiting[i.hash()] = i + + // Make the temp map for access + tempTargetMap := make(map[string]TargetItem, len(targets)) + for _, target := range targets { + tempTargetMap[target.hash()] = target + } + + // Check for removals + for k, target := range allocator.targetItems { + // if the old target is no longer in the new list, remove it + if _, ok := tempTargetMap[k]; !ok { + allocator.collectors[target.Collector.Name].NumTargets-- + delete(allocator.targetItems, k) + } + } + + // Check for additions + for k, target := range tempTargetMap { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Assign a collector to the new target + col := allocator.findNextCollector() + targetItem := TargetItem{ + JobName: target.JobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + Collector: col, + } + col.NumTargets++ + targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets)) + allocator.targetItems[k] = &targetItem + } } } @@ -107,12 +153,17 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { // SetCollectors is called when Collectors are added or removed func (allocator *Allocator) SetCollectors(collectors []string) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) + defer timer.ObserveDuration() allocator.m.Lock() defer allocator.m.Unlock() if len(collectors) == 0 { log.Info("No collector instances present") return + } else if allocator.allCollectorsPresent(collectors) { + log.Info("No changes to the collectors found") + return } for k := range allocator.collectors { delete(allocator.collectors, k) @@ -121,65 +172,19 @@ func (allocator *Allocator) SetCollectors(collectors []string) { for _, i := range collectors { allocator.collectors[i] = &collector{Name: i, NumTargets: 0} } - collectorsAllocatable.Set(float64(len(collectors))) -} - -// Reallocate needs to be called to process the new target updates. -// Until Reallocate is called, old targets will be served. -func (allocator *Allocator) AllocateTargets() { - allocator.m.Lock() - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets")) - defer timer.ObserveDuration() - defer allocator.m.Unlock() - allocator.removeOutdatedTargets() - allocator.processWaitingTargets() -} - -// ReallocateCollectors reallocates the targets among the new collector instances -func (allocator *Allocator) ReallocateCollectors() { - allocator.m.Lock() - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors")) - defer timer.ObserveDuration() - defer allocator.m.Unlock() - allocator.targetItems = make(map[string]*TargetItem) - allocator.processWaitingTargets() -} - -// removeOutdatedTargets removes targets that are no longer available. -func (allocator *Allocator) removeOutdatedTargets() { - for k := range allocator.targetItems { - if _, ok := allocator.targetsWaiting[k]; !ok { - allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets-- - delete(allocator.targetItems, k) - } - } -} - -// processWaitingTargets processes the newly set targets. -func (allocator *Allocator) processWaitingTargets() { - for k, v := range allocator.targetsWaiting { - if _, ok := allocator.targetItems[k]; !ok { - col := allocator.findNextCollector() - allocator.targetItems[k] = &v - targetItem := TargetItem{ - JobName: v.JobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))}, - TargetURL: v.TargetURL, - Label: v.Label, - Collector: col, - } - col.NumTargets++ - targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets)) - allocator.targetItems[v.hash()] = &targetItem - } + for k, _ := range allocator.targetItems { + chosenCollector := allocator.findNextCollector() + allocator.targetItems[k].Collector = chosenCollector + chosenCollector.NumTargets++ + targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) } + collectorsAllocatable.Set(float64(len(collectors))) } func NewAllocator(log logr.Logger) *Allocator { return &Allocator{ - log: log, - targetsWaiting: make(map[string]TargetItem), - collectors: make(map[string]*collector), - targetItems: make(map[string]*TargetItem), + log: log, + collectors: make(map[string]*collector), + targetItems: make(map[string]*TargetItem), } } diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 596ff3b628..4f3e1c93bd 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -55,7 +55,6 @@ func TestAddingAndRemovingTargets(t *testing.T) { // test that targets and collectors are added properly s.SetWaitingTargets(targetList) - s.AllocateTargets() // verify expectedTargetLen := len(initTargets) @@ -70,7 +69,6 @@ func TestAddingAndRemovingTargets(t *testing.T) { // test that less targets are found - removed s.SetWaitingTargets(newTargetList) - s.AllocateTargets() // verify expectedNewTargetLen := len(tar) @@ -104,7 +102,6 @@ func TestAllocationCollision(t *testing.T) { // test that targets and collectors are added properly s.SetWaitingTargets(targetList) - s.AllocateTargets() // verify expectedTargetLen := len(targetList) @@ -134,7 +131,6 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetWaitingTargets(newTargetList) - s.AllocateTargets() // Divisor needed to get 15% divisor := 6.7 @@ -156,7 +152,6 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetWaitingTargets(newTargetList) - s.AllocateTargets() count = len(s.TargetItems()) / len(s.collectors) percent = float64(len(s.TargetItems())) / divisor @@ -174,7 +169,6 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetWaitingTargets(newTargetList) - s.AllocateTargets() count = len(s.TargetItems()) / len(s.collectors) percent = float64(len(s.TargetItems())) / divisor diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 933b61bbee..7732d42f08 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -22,8 +22,8 @@ const ( ) var ( - ns = os.Getenv("OTELCOL_NAMESPACE") - collectors = promauto.NewGauge(prometheus.GaugeOpts{ + ns = os.Getenv("OTELCOL_NAMESPACE") + collectorsDiscovered = promauto.NewGauge(prometheus.GaugeOpts{ Name: "opentelemetry_allocator_collectors_discovered", Help: "Number of collectors discovered.", }) @@ -95,7 +95,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string { log := k.log.WithValues("component", "opentelemetry-targetallocator") for { - collectors.Set(float64(len(collectorMap))) + collectorsDiscovered.Set(float64(len(collectorMap))) select { case <-k.close: return "kubernetes client closed" diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index c467ff4bfc..89a4f6f2c7 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -58,10 +58,7 @@ func main() { // creates a new discovery manager discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger()) defer discoveryManager.Close() - discoveryManager.Watch(func(targets []allocation.TargetItem) { - allocator.SetWaitingTargets(targets) - allocator.AllocateTargets() - }) + discoveryManager.Watch(allocator.SetWaitingTargets) srv, err := newServer(log, allocator, discoveryManager, cliConf) if err != nil { @@ -165,10 +162,7 @@ func configureFileDiscovery(log logr.Logger, allocator *allocation.Allocator, di return nil, err } - k8sClient.Watch(ctx, cfg.LabelSelector, func(collectors []string) { - allocator.SetCollectors(collectors) - allocator.ReallocateCollectors() - }) + k8sClient.Watch(ctx, cfg.LabelSelector, allocator.SetCollectors) return k8sClient, nil } From ba866a637dd6ed31ada0217662077446ac54e319 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 18 Aug 2022 13:31:50 -0400 Subject: [PATCH 04/10] Comments --- cmd/otel-allocator/allocation/allocator.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 68558e6322..82688e1236 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -55,12 +55,10 @@ type collector struct { // Allocator makes decisions to distribute work among // a number of OpenTelemetry collectors based on the number of targets. // Users need to call SetTargets when they have new targets in their -// clusters and call Reshard to process the new targets and reshard. +// clusters and call SetCollectors when the collectors have changed. type Allocator struct { m sync.Mutex - //targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed - collectors map[string]*collector // all current collectors targetItems map[string]*TargetItem From f5d1b598ea6d9336ff1ac17683a57442b3868909 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 18 Aug 2022 13:46:32 -0400 Subject: [PATCH 05/10] added a test --- .../allocation/allocator_test.go | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 4f3e1c93bd..4fe307a354 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -114,6 +114,41 @@ func TestAllocationCollision(t *testing.T) { } } +func TestNoCollectorReassignment(t *testing.T) { + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + labels := model.LabelSet{} + + excpectedColLen := len(cols) + assert.Len(t, s.collectors, excpectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.collectors[i]) + } + initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} + var targetList []TargetItem + for _, i := range initTargets { + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + } + // test that targets and collectors are added properly + s.SetWaitingTargets(targetList) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, targetItems, newTargetItems) + +} + // Tests that the delta in number of targets per collector is less than 15% of an even distribution func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { From caa6135bd114f0211b5806d6543d6bd458958fa6 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 18 Aug 2022 15:19:33 -0400 Subject: [PATCH 06/10] Fix comment spacing --- cmd/otel-allocator/allocation/allocator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 289851d57c..4ee1e59bf7 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -28,7 +28,7 @@ var ( /* Load balancer will serve on an HTTP server exposing /jobs//targets - The targets are allocated using the least connection method + The targets are allocated using the least connection method Load balancer will need information about the collectors in order to set the URLs Keep a Map of what each collector currently holds and update it based on new scrape target updates */ @@ -203,7 +203,7 @@ func (allocator *Allocator) SetCollectors(collectors []string) { for _, item := range allocator.targetItems { allocator.assignTargetToNextCollector(item) } - + collectorsAllocatable.Set(float64(len(collectors))) } From f3b46cdacca320846c39d3a0a0d12301bd0c4cfc Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 19 Aug 2022 10:44:55 -0400 Subject: [PATCH 07/10] Updated based on feedback --- cmd/otel-allocator/allocation/allocator.go | 64 +++++++++++------ .../allocation/allocator_test.go | 69 +++++++++++++++---- cmd/otel-allocator/main.go | 2 +- 3 files changed, 99 insertions(+), 36 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 4ee1e59bf7..d3b8e66ebb 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -89,7 +89,7 @@ func (allocator *Allocator) Collectors() map[string]*collector { } // findNextCollector finds the next collector with fewer number of targets. -// This method is called from within SetWaitingTargets and SetCollectors, whose caller +// This method is called from within SetTargets and SetCollectors, whose caller // acquires the needed lock. func (allocator *Allocator) findNextCollector() *collector { var col *collector @@ -107,8 +107,10 @@ func (allocator *Allocator) findNextCollector() *collector { return col } -// assignTargetToNextCollector assigns a target to the next available collector -func (allocator *Allocator) assignTargetToNextCollector(target *TargetItem) { +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. +func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { chosenCollector := allocator.findNextCollector() targetItem := TargetItem{ JobName: target.JobName, @@ -122,26 +124,34 @@ func (allocator *Allocator) assignTargetToNextCollector(target *TargetItem) { targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) } -// allCollectorsPresent checks if all the collectors provided are in the allocator's map -func (allocator *Allocator) allCollectorsPresent(collectors []string) bool { - if len(collectors) != len(allocator.collectors) { - return false - } +// getCollectorChanges returns the new and removed collectors respectively. +// This method is called from within SetCollectors, which acquires the needed lock. +func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { + var newCollectors []string + var removedCollectors []string + // Used as a set to check for removed collectors + tempCollectorMap := map[string]bool{} for _, s := range collectors { - if _, ok := allocator.collectors[s]; !ok { - return false + if _, found := allocator.collectors[s]; !found { + newCollectors = append(newCollectors, s) } + tempCollectorMap[s] = true } - return true + for k := range allocator.collectors { + if _, found := tempCollectorMap[k]; !found { + removedCollectors = append(removedCollectors, k) + } + } + return newCollectors, removedCollectors } -// SetWaitingTargets accepts a list of targets that will be used to make +// SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. -func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetWaitingTargets")) +func (allocator *Allocator) SetTargets(targets []TargetItem) { + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) defer timer.ObserveDuration() - // Dump old data + allocator.m.Lock() defer allocator.m.Unlock() @@ -157,6 +167,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { if _, ok := tempTargetMap[k]; !ok { allocator.collectors[target.Collector.Name].NumTargets-- delete(allocator.targetItems, k) + targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) } } @@ -167,7 +178,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { continue } else { // Assign a collector to the new target - allocator.assignTargetToNextCollector(&target) + allocator.addTargetToTargetItems(&target) } } } @@ -181,27 +192,36 @@ func (allocator *Allocator) SetCollectors(collectors []string) { allocator.m.Lock() defer allocator.m.Unlock() + newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) if len(collectors) == 0 { log.Info("No collector instances present") return - } else if allocator.allCollectorsPresent(collectors) { + } else if len(newCollectors) == 0 && len(removedCollectors) == 0 { log.Info("No changes to the collectors found") return } // Clear existing collectors - for k := range allocator.collectors { + for _, k := range removedCollectors { delete(allocator.collectors, k) } - // Insert the new collectors - for _, i := range collectors { + for _, i := range newCollectors { allocator.collectors[i] = &collector{Name: i, NumTargets: 0} } - // Re-Allocate the existing targets + // find targets which need to be redistributed + var redistribute []*TargetItem for _, item := range allocator.targetItems { - allocator.assignTargetToNextCollector(item) + for _, s := range removedCollectors { + if item.Collector.Name == s { + redistribute = append(redistribute, item) + } + } + } + // Re-Allocate the existing targets + for _, item := range redistribute { + allocator.addTargetToTargetItems(item) } collectorsAllocatable.Set(float64(len(collectors))) diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 7f4fc0c417..0b754cb47a 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -11,7 +11,7 @@ import ( var logger = logf.Log.WithName("unit-tests") -// Tests least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload +// Tests the least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload func TestFindNextCollector(t *testing.T) { s := NewAllocator(logger) @@ -55,7 +55,7 @@ func TestAddingAndRemovingTargets(t *testing.T) { } // test that targets and collectors are added properly - s.SetWaitingTargets(targetList) + s.SetTargets(targetList) // verify expectedTargetLen := len(initTargets) @@ -68,8 +68,8 @@ func TestAddingAndRemovingTargets(t *testing.T) { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } - // test that less targets are found - removed - s.SetWaitingTargets(newTargetList) + // test that fewer targets are found - removed + s.SetTargets(newTargetList) // verify targetItems := s.TargetItems() @@ -98,12 +98,12 @@ func TestAllocationCollision(t *testing.T) { } targetList := []TargetItem{ - TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, - TargetItem{JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, + {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, + {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, } // test that targets and collectors are added properly - s.SetWaitingTargets(targetList) + s.SetTargets(targetList) // verify targetItems := s.TargetItems() @@ -124,8 +124,8 @@ func TestNoCollectorReassignment(t *testing.T) { s.SetCollectors(cols) labels := model.LabelSet{} - excpectedColLen := len(cols) - assert.Len(t, s.collectors, excpectedColLen) + expectedColLen := len(cols) + assert.Len(t, s.collectors, expectedColLen) for _, i := range cols { assert.NotNil(t, s.collectors[i]) @@ -136,7 +136,7 @@ func TestNoCollectorReassignment(t *testing.T) { targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly - s.SetWaitingTargets(targetList) + s.SetTargets(targetList) // verify expectedTargetLen := len(initTargets) @@ -152,6 +152,49 @@ func TestNoCollectorReassignment(t *testing.T) { } +func TestSmartCollectorReassignment(t *testing.T) { + s := NewAllocator(logger) + + cols := []string{"col-1", "col-2", "col-3"} + s.SetCollectors(cols) + labels := model.LabelSet{} + + expectedColLen := len(cols) + assert.Len(t, s.collectors, expectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.collectors[i]) + } + initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} + var targetList []TargetItem + for _, i := range initTargets { + targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + } + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := []string{"col-1", "col-2", "col-4"} + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, len(targetItems), len(newTargetItems)) + for key, targetItem := range targetItems { + item, ok := newTargetItems[key] + assert.True(t, ok, "all target items should be found in new target item list") + if targetItem.Collector.Name != "col-3" { + assert.Equal(t, targetItem.Collector.Name, item.Collector.Name) + } else { + assert.Equal(t, "col-4", item.Collector.Name) + } + } +} + // Tests that the delta in number of targets per collector is less than 15% of an even distribution func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { @@ -168,7 +211,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) + s.SetTargets(newTargetList) // Divisor needed to get 15% divisor := 6.7 @@ -191,7 +234,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) + s.SetTargets(newTargetList) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() @@ -210,7 +253,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { for _, i := range targets { newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } - s.SetWaitingTargets(newTargetList) + s.SetTargets(newTargetList) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 89a4f6f2c7..bd2bd0a469 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -58,7 +58,7 @@ func main() { // creates a new discovery manager discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger()) defer discoveryManager.Close() - discoveryManager.Watch(allocator.SetWaitingTargets) + discoveryManager.Watch(allocator.SetTargets) srv, err := newServer(log, allocator, discoveryManager, cliConf) if err != nil { From a47483bb72627012e0e92ca1ad47f5ac3fb815cc Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 19 Aug 2022 10:54:14 -0400 Subject: [PATCH 08/10] Metrics fix, comment --- cmd/otel-allocator/allocation/allocator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index d3b8e66ebb..5a3d50dcf8 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -108,8 +108,8 @@ func (allocator *Allocator) findNextCollector() *collector { } // addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. +// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { chosenCollector := allocator.findNextCollector() targetItem := TargetItem{ @@ -204,6 +204,7 @@ func (allocator *Allocator) SetCollectors(collectors []string) { // Clear existing collectors for _, k := range removedCollectors { delete(allocator.collectors, k) + targetsPerCollector.WithLabelValues(k).Set(0) } // Insert the new collectors for _, i := range newCollectors { From 981d439d63b862a23dd2c30e975725436d18918b Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 22 Aug 2022 10:17:44 -0400 Subject: [PATCH 09/10] updated from feedback --- cmd/otel-allocator/allocation/allocator.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index 5a3d50dcf8..b1478c2a0f 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -58,7 +58,7 @@ type collector struct { // Users need to call SetTargets when they have new targets in their // clusters and call SetCollectors when the collectors have changed. type Allocator struct { - // m protects targetsWaiting, collectors, and targetItems for concurrent use. + // m protects collectors and targetItems for concurrent use. m sync.RWMutex collectors map[string]*collector // all current collectors targetItems map[string]*TargetItem @@ -102,7 +102,6 @@ func (allocator *Allocator) findNextCollector() *collector { col = v } } - } return col } @@ -177,7 +176,7 @@ func (allocator *Allocator) SetTargets(targets []TargetItem) { if _, ok := allocator.targetItems[k]; ok { continue } else { - // Assign a collector to the new target + // Assign new set of collectors with the one different name allocator.addTargetToTargetItems(&target) } } @@ -189,14 +188,15 @@ func (allocator *Allocator) SetCollectors(collectors []string) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) defer timer.ObserveDuration() + if len(collectors) == 0 { + log.Info("No collector instances present") + return + } allocator.m.Lock() defer allocator.m.Unlock() newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) - if len(collectors) == 0 { - log.Info("No collector instances present") - return - } else if len(newCollectors) == 0 && len(removedCollectors) == 0 { + if len(newCollectors) == 0 && len(removedCollectors) == 0 { log.Info("No changes to the collectors found") return } From a81d9cab8a8722913c15a421333aaa383c816c6e Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 22 Aug 2022 11:29:47 -0400 Subject: [PATCH 10/10] Change where metric is recorded --- cmd/otel-allocator/allocation/allocator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index b1478c2a0f..be4d82d3eb 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -188,6 +188,8 @@ func (allocator *Allocator) SetCollectors(collectors []string) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) defer timer.ObserveDuration() + + collectorsAllocatable.Set(float64(len(collectors))) if len(collectors) == 0 { log.Info("No collector instances present") return @@ -224,8 +226,6 @@ func (allocator *Allocator) SetCollectors(collectors []string) { for _, item := range redistribute { allocator.addTargetToTargetItems(item) } - - collectorsAllocatable.Set(float64(len(collectors))) } func NewAllocator(log logr.Logger) *Allocator {