Skip to content

Commit

Permalink
TargetAllocator - fix unnecessary and incorrect reallocation (open-te…
Browse files Browse the repository at this point in the history
…lemetry#1041)

* Resolve bug where TA doesn't allocate all targets

* Cleanup cleanup

* Changed how targets and collectors are allocated

* Comments

* added a test

* Fix comment spacing

* Updated based on feedback

* Metrics fix, comment

* updated from feedback

* Change where metric is recorded
  • Loading branch information
jaronoff97 authored Aug 22, 2022
1 parent 40d8bcf commit ca31c0c
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 118 deletions.
213 changes: 122 additions & 91 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets
The targets are allocated using the least connection method
Load balancer will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/
Expand Down Expand Up @@ -55,13 +56,12 @@ type collector struct {
// Allocator makes decisions to distribute work among
// a number of OpenTelemetry collectors based on the number of targets.
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
// clusters and call SetCollectors when the collectors have changed.
type Allocator struct {
// m protects targetsWaiting, collectors, and targetItems for concurrent use.
m sync.RWMutex
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem
// m protects collectors and targetItems for concurrent use.
m sync.RWMutex
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem

log logr.Logger
}
Expand All @@ -88,119 +88,150 @@ func (allocator *Allocator) Collectors() map[string]*collector {
return collectorsCopy
}

// SetWaitingTargets accepts a list of targets that will be used to make
// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within SetTargets and SetCollectors, whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}
}
return col
}

// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) {
chosenCollector := allocator.findNextCollector()
targetItem := TargetItem{
JobName: target.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
Collector: chosenCollector,
}
allocator.targetItems[targetItem.hash()] = &targetItem
chosenCollector.NumTargets++
targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets))
}

// getCollectorChanges returns the new and removed collectors respectively.
// This method is called from within SetCollectors, which acquires the needed lock.
func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) {
var newCollectors []string
var removedCollectors []string
// Used as a set to check for removed collectors
tempCollectorMap := map[string]bool{}
for _, s := range collectors {
if _, found := allocator.collectors[s]; !found {
newCollectors = append(newCollectors, s)
}
tempCollectorMap[s] = true
}
for k := range allocator.collectors {
if _, found := tempCollectorMap[k]; !found {
removedCollectors = append(removedCollectors, k)
}
}
return newCollectors, removedCollectors
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
func (allocator *Allocator) SetTargets(targets []TargetItem) {
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.hash()] = i

// Make the temp map for access
tempTargetMap := make(map[string]TargetItem, len(targets))
for _, target := range targets {
tempTargetMap[target.hash()] = target
}

// Check for removals
for k, target := range allocator.targetItems {
// if the old target is no longer in the new list, remove it
if _, ok := tempTargetMap[k]; !ok {
allocator.collectors[target.Collector.Name].NumTargets--
delete(allocator.targetItems, k)
targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets))
}
}

// Check for additions
for k, target := range tempTargetMap {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Assign new set of collectors with the one different name
allocator.addTargetToTargetItems(&target)
}
}
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
collectorsAllocatable.Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}
for k := range allocator.collectors {
delete(allocator.collectors, k)
}

for _, i := range collectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
collectorsAllocatable.Set(float64(len(collectors)))
}

// AllocateTargets removes outdated targets and adds new ones from
// waitingTargets. This method needs to be called to process the new target
// updates. Until it is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances.
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.targetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available. This
// method is called after a lock has been acquired in ReallocateCollectors().
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets--
delete(allocator.targetItems, k)
}
newCollectors, removedCollectors := allocator.getCollectorChanges(collectors)
if len(newCollectors) == 0 && len(removedCollectors) == 0 {
log.Info("No changes to the collectors found")
return
}
}

// processWaitingTargets processes the newly set targets. This method is called
// after a lock has been acquired in AllocateTargets() or ReallocateCollectors().
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.targetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.targetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.targetItems[v.hash()] = &targetItem
}
// Clear existing collectors
for _, k := range removedCollectors {
delete(allocator.collectors, k)
targetsPerCollector.WithLabelValues(k).Set(0)
}
// Insert the new collectors
for _, i := range newCollectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
}

// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within processWaitingTargets(), whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
// find targets which need to be redistributed
var redistribute []*TargetItem
for _, item := range allocator.targetItems {
for _, s := range removedCollectors {
if item.Collector.Name == s {
redistribute = append(redistribute, item)
}
}

}
return col
// Re-Allocate the existing targets
for _, item := range redistribute {
allocator.addTargetToTargetItems(item)
}
}

func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
log: log,
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
}
}
Loading

0 comments on commit ca31c0c

Please sign in to comment.