Skip to content

Commit

Permalink
Merge pull request #1115 from zcross/zcross/concurrency/registry
Browse files Browse the repository at this point in the history
Refactor Registry internals to make concurrent access safe
  • Loading branch information
sunsingerus authored Apr 13, 2023
2 parents 41fee46 + 2994778 commit 314b8f9
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 72 deletions.
241 changes: 169 additions & 72 deletions pkg/model/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model

import (
"fmt"
"sync"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -45,37 +46,73 @@ const (

// Registry specifies registry struct
type Registry struct {
r map[EntityType][]v1.ObjectMeta
r map[EntityType]*objectMetaSet
mu sync.RWMutex
}

// objectMetaIdentity is a simple subset of ObjectMeta used strictly within Registry for identifying
// ObjectMeta without inspecting their full contents. I.e., this is used for existence checks and comparisons.
type objectMetaIdentity struct {
name string
namespace string
}

// objectMetaSet is an internal collections type used for efficient lookups of object metadata by identity,
// defined in this context as the combination of namespace and name. A set is expected to correspond to a single
// entity type, but set additions to not validate this property to avoid the introduction of error results onto
// associated interfaces (including the exported Registry functions).
// All accesses are synchronized.
type objectMetaSet struct {
entityType EntityType
contents map[objectMetaIdentity]v1.ObjectMeta
sync.RWMutex
}

// NewRegistry creates new registry
func NewRegistry() *Registry {
return &Registry{
r: make(map[EntityType][]v1.ObjectMeta),
r: make(map[EntityType]*objectMetaSet),
}
}

// Len return len of the whole registry or specified entity types
// Note that this is unsafe to call recursively, including in calls to other synchronized Registry functions
// like Walk (and therefore in the "work" function passed into iterators like Walk and WalkEntityType).
func (r *Registry) Len(_what ...EntityType) int {
if r == nil {
return 0
}

// Avoid coarse grained locking when we will just return the number of entity types in the registry.
if len(_what) == 0 {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.r)
}
what := _what[0]
return len(r.r[what])

result := 0
for _, entityType := range _what {
os := r.ensureObjectSetForType(entityType)
result += os.len()
}
return result
}

// Walk walks over registry
// Walk walks over registry.
// Note: this is fairly expensive in the sense that it locks the entire registry from being written
// for the full duration of whatever workload is applied throughout iteration. Avoid calling when you know
// the entity type you want.
func (r *Registry) Walk(f func(entityType EntityType, meta v1.ObjectMeta)) {
if r == nil {
return
}
for et := range r.r {
for _, m := range r.r[et] {
f(et, m)
}
r.mu.RLock()
defer r.mu.RUnlock()

for et, os := range r.r {
os.walk(func(meta v1.ObjectMeta) {
f(et, meta)
})
}
}

Expand All @@ -84,13 +121,9 @@ func (r *Registry) WalkEntityType(entityType EntityType, f func(meta v1.ObjectMe
if r == nil {
return
}
for et := range r.r {
for _, m := range r.r[et] {
if et == entityType {
f(m)
}
}
}

setForType := r.ensureObjectSetForType(entityType)
setForType.walk(f)
}

// String makes string representation of the registry
Expand All @@ -111,17 +144,20 @@ func (r *Registry) registerEntity(entityType EntityType, meta v1.ObjectMeta) {
return
}

if r.hasEntity(entityType, meta) {
return
}
// Does not have such an entity
m := v1.ObjectMeta{
// Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type
// and then begin operating on that (it uses a finer grained lock).
setForType := r.ensureObjectSetForType(entityType)

// Create the representation that we'll attempt to add.
newObj := v1.ObjectMeta{
Namespace: meta.Namespace,
Name: meta.Name,
Labels: util.MergeStringMapsOverwrite(nil, meta.Labels),
Annotations: util.MergeStringMapsOverwrite(nil, meta.Annotations),
}
r.r[entityType] = append(r.r[entityType], m)

// Add the object, which will only happen if no other object with the same identity is present in the set.
setForType.maybeAdd(&newObj)
}

// RegisterStatefulSet registers StatefulSet
Expand Down Expand Up @@ -264,26 +300,32 @@ func (r *Registry) WalkPDB(f func(meta v1.ObjectMeta)) {
r.WalkEntityType(PDB, f)
}

// hasEntity
func (r *Registry) hasEntity(entityType EntityType, meta v1.ObjectMeta) bool {
// Subtract subtracts specified registry from main
func (r *Registry) Subtract(sub *Registry) *Registry {
if sub.Len() == 0 {
// Nothing to subtract, return base
return r
}
if r.Len() == 0 {
return false
// Nowhere to subtract from
return r
}

for et := range r.r {
if et == entityType {
// This is searched entityType
for _, m := range r.r[et] {
if r.isEqual(m, meta) {
// This is the element which is looked for.
return true
}
}
return false
}
}
sub.Walk(func(entityType EntityType, entity v1.ObjectMeta) {
r.deleteEntity(entityType, entity)
})

return false
return r
}

// hasEntity
func (r *Registry) hasEntity(entityType EntityType, meta v1.ObjectMeta) bool {
// Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type
// and then begin operating on that (it uses a finer grained lock).
setForType := r.ensureObjectSetForType(entityType)

// Having acquired the type-specific ObjectMeta set, return the result of a membership check.
return setForType.contains(&meta)
}

// isEqual
Expand All @@ -293,48 +335,103 @@ func (r *Registry) isEqual(a, b v1.ObjectMeta) bool {

// deleteEntity
func (r *Registry) deleteEntity(entityType EntityType, meta v1.ObjectMeta) bool {
if r.Len() == 0 {
return false
// Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type
// and then begin operating on that (it uses a finer grained lock).
setForType := r.ensureObjectSetForType(entityType)

// Having acquired the type-specific ObjectMeta set, return the result of removal success.
return setForType.remove(&meta)
}

// ensureObjectSetForType resolves the singleton objectMetaSet for this registry, of the given entityType.
// This uses a coarse grained lock on the entire registry.
func (r *Registry) ensureObjectSetForType(entityType EntityType) *objectMetaSet {
if r == nil {
return nil
}

for et := range r.r {
if et == entityType {
// This is searched entityType
for i, m := range r.r[et] {
if r.isEqual(m, meta) {
// This is the element which is looked for.
// Remove the element at index i
//
// Copy last element to index i.
r.r[et][i] = r.r[et][len(r.r[et])-1]
// Erase last element - help GC to collect
r.r[et][len(r.r[et])-1] = v1.ObjectMeta{}
// Truncate slice.
r.r[et] = r.r[et][:len(r.r[et])-1]
return true
}
}
return false
}
// get case (optimize for the most common condition of the set already existing)
r.mu.RLock()
existing, ok := r.r[entityType]
r.mu.RUnlock()
if ok {
return existing
}

return false
// create case (requires write lock, but note that we have to double for existence within critical section)
r.mu.Lock()
defer r.mu.Unlock()
existing, ok = r.r[entityType]
if ok {
return existing
}
newSet := newObjectMetaSet(entityType)
r.r[entityType] = newSet
return newSet
}

// Subtract subtracts specified registry from main
func (r *Registry) Subtract(sub *Registry) *Registry {
if sub.Len() == 0 {
// Nothing to subtract, return base
return r
// objIdentity derives a objectMetaIdentity from an ObjectMeta
func (s *objectMetaSet) objIdentity(obj *v1.ObjectMeta) objectMetaIdentity {
return objectMetaIdentity{
name: obj.Name,
namespace: obj.Namespace,
}
if r.Len() == 0 {
// Nowhere to subtract from
return r
}

// maybeAdd adds an ObjectMeta to the set if an object with an equivalent identity is not already present
func (s *objectMetaSet) maybeAdd(o *v1.ObjectMeta) bool {
s.Lock()
defer s.Unlock()
if _, ok := s.contents[s.objIdentity(o)]; ok {
return false
}
s.contents[s.objIdentity(o)] = *o
return true
}

sub.Walk(func(entityType EntityType, entity v1.ObjectMeta) {
r.deleteEntity(entityType, entity)
})
// remove deletes an ObjectMeta from the set, matching only on identity
func (s *objectMetaSet) remove(o *v1.ObjectMeta) bool {
s.Lock()
defer s.Unlock()
if _, ok := s.contents[s.objIdentity(o)]; !ok {
return false
}
delete(s.contents, s.objIdentity(o))
return true
}

return r
// contains determines if an ObjectMeta exists in the set (based on identity only)
func (s *objectMetaSet) contains(o *v1.ObjectMeta) bool {
s.RLock()
defer s.RUnlock()
_, ok := s.contents[s.objIdentity(o)]
return ok
}

// walk provides an iterator-like access to the ObjectMetas contained in the set
// Note that this function is not safe to call recursively, due to the RWLock usage.
// This seems unlikely to be a problem.
func (s *objectMetaSet) walk(f func(meta v1.ObjectMeta)) {
s.RLock()
defer s.RUnlock()

for _, obj := range s.contents {
f(obj)
}
}

// len returns the number of ObjectMeta in the set
func (s *objectMetaSet) len() int {
s.RLock()
defer s.RUnlock()

return len(s.contents)
}

// newObjectMetaSet constructor for a set holding ObjectMeta with reader/writer synchronization
func newObjectMetaSet(entityType EntityType) *objectMetaSet {
return &objectMetaSet{
entityType: entityType,
contents: make(map[objectMetaIdentity]v1.ObjectMeta),
}
}
Loading

0 comments on commit 314b8f9

Please sign in to comment.