Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unnecessary and incorrect reallocation #1041

Merged
merged 14 commits into from
Aug 22, 2022
Merged
213 changes: 122 additions & 91 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets
The targets are allocated using the least connection method
Load balancer will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/
Expand Down Expand Up @@ -55,13 +56,12 @@ type collector struct {
// Allocator makes decisions to distribute work among
// a number of OpenTelemetry collectors based on the number of targets.
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
// clusters and call SetCollectors when the collectors have changed.
type Allocator struct {
// m protects targetsWaiting, collectors, and targetItems for concurrent use.
m sync.RWMutex
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem
// m protects collectors and targetItems for concurrent use.
m sync.RWMutex
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem

log logr.Logger
}
Expand All @@ -88,119 +88,150 @@ func (allocator *Allocator) Collectors() map[string]*collector {
return collectorsCopy
}

// SetWaitingTargets accepts a list of targets that will be used to make
// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within SetTargets and SetCollectors, whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}
}
return col
}

// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) {
chosenCollector := allocator.findNextCollector()
targetItem := TargetItem{
JobName: target.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
Collector: chosenCollector,
}
allocator.targetItems[targetItem.hash()] = &targetItem
chosenCollector.NumTargets++
targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets))
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}

// getCollectorChanges returns the new and removed collectors respectively.
// This method is called from within SetCollectors, which acquires the needed lock.
func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) {
var newCollectors []string
var removedCollectors []string
// Used as a set to check for removed collectors
tempCollectorMap := map[string]bool{}
for _, s := range collectors {
if _, found := allocator.collectors[s]; !found {
newCollectors = append(newCollectors, s)
}
tempCollectorMap[s] = true
}
for k := range allocator.collectors {
if _, found := tempCollectorMap[k]; !found {
removedCollectors = append(removedCollectors, k)
}
}
return newCollectors, removedCollectors
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
func (allocator *Allocator) SetTargets(targets []TargetItem) {
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.hash()] = i

// Make the temp map for access
tempTargetMap := make(map[string]TargetItem, len(targets))
for _, target := range targets {
tempTargetMap[target.hash()] = target
}

// Check for removals
for k, target := range allocator.targetItems {
// if the old target is no longer in the new list, remove it
if _, ok := tempTargetMap[k]; !ok {
allocator.collectors[target.Collector.Name].NumTargets--
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
delete(allocator.targetItems, k)
targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets))
}
}

// Check for additions
for k, target := range tempTargetMap {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Assign new set of collectors with the one different name
allocator.addTargetToTargetItems(&target)
}
}
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
collectorsAllocatable.Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
for k := range allocator.collectors {
delete(allocator.collectors, k)
}

for _, i := range collectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
collectorsAllocatable.Set(float64(len(collectors)))
}

// AllocateTargets removes outdated targets and adds new ones from
// waitingTargets. This method needs to be called to process the new target
// updates. Until it is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances.
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.targetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available. This
// method is called after a lock has been acquired in ReallocateCollectors().
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets--
delete(allocator.targetItems, k)
}
newCollectors, removedCollectors := allocator.getCollectorChanges(collectors)
if len(newCollectors) == 0 && len(removedCollectors) == 0 {
log.Info("No changes to the collectors found")
return
}
}

// processWaitingTargets processes the newly set targets. This method is called
// after a lock has been acquired in AllocateTargets() or ReallocateCollectors().
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.targetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.targetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.targetItems[v.hash()] = &targetItem
}
// Clear existing collectors
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
for _, k := range removedCollectors {
delete(allocator.collectors, k)
targetsPerCollector.WithLabelValues(k).Set(0)
}
// Insert the new collectors
for _, i := range newCollectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
}

// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within processWaitingTargets(), whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
// find targets which need to be redistributed
var redistribute []*TargetItem
for _, item := range allocator.targetItems {
for _, s := range removedCollectors {
if item.Collector.Name == s {
redistribute = append(redistribute, item)
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
}

}
return col
// Re-Allocate the existing targets
for _, item := range redistribute {
allocator.addTargetToTargetItems(item)
}
}

jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
log: log,
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
}
}
Loading