From 29947782c2cf4ca95903bacd363939f6f1194d5b Mon Sep 17 00:00:00 2001 From: Zach Cross Date: Fri, 24 Feb 2023 11:59:53 -0500 Subject: [PATCH] Refactor Registry internals to make concurrent access safe Background: #1109 Signed-off-by: Zach Cross --- pkg/model/registry.go | 241 ++++++++++++++++++++++++++----------- pkg/model/registry_test.go | 200 ++++++++++++++++++++++++++++++ 2 files changed, 369 insertions(+), 72 deletions(-) create mode 100644 pkg/model/registry_test.go diff --git a/pkg/model/registry.go b/pkg/model/registry.go index 6517923bc..2d23726f3 100644 --- a/pkg/model/registry.go +++ b/pkg/model/registry.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "sync" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,37 +46,73 @@ const ( // Registry specifies registry struct type Registry struct { - r map[EntityType][]v1.ObjectMeta + r map[EntityType]*objectMetaSet + mu sync.RWMutex +} + +// objectMetaIdentity is a simple subset of ObjectMeta used strictly within Registry for identifying +// ObjectMeta without inspecting their full contents. I.e., this is used for existence checks and comparisons. +type objectMetaIdentity struct { + name string + namespace string +} + +// objectMetaSet is an internal collections type used for efficient lookups of object metadata by identity, +// defined in this context as the combination of namespace and name. A set is expected to correspond to a single +// entity type, but set additions to not validate this property to avoid the introduction of error results onto +// associated interfaces (including the exported Registry functions). +// All accesses are synchronized. +type objectMetaSet struct { + entityType EntityType + contents map[objectMetaIdentity]v1.ObjectMeta + sync.RWMutex } // NewRegistry creates new registry func NewRegistry() *Registry { return &Registry{ - r: make(map[EntityType][]v1.ObjectMeta), + r: make(map[EntityType]*objectMetaSet), } } // Len return len of the whole registry or specified entity types +// Note that this is unsafe to call recursively, including in calls to other synchronized Registry functions +// like Walk (and therefore in the "work" function passed into iterators like Walk and WalkEntityType). func (r *Registry) Len(_what ...EntityType) int { if r == nil { return 0 } + + // Avoid coarse grained locking when we will just return the number of entity types in the registry. if len(_what) == 0 { + r.mu.RLock() + defer r.mu.RUnlock() return len(r.r) } - what := _what[0] - return len(r.r[what]) + + result := 0 + for _, entityType := range _what { + os := r.ensureObjectSetForType(entityType) + result += os.len() + } + return result } -// Walk walks over registry +// Walk walks over registry. +// Note: this is fairly expensive in the sense that it locks the entire registry from being written +// for the full duration of whatever workload is applied throughout iteration. Avoid calling when you know +// the entity type you want. func (r *Registry) Walk(f func(entityType EntityType, meta v1.ObjectMeta)) { if r == nil { return } - for et := range r.r { - for _, m := range r.r[et] { - f(et, m) - } + r.mu.RLock() + defer r.mu.RUnlock() + + for et, os := range r.r { + os.walk(func(meta v1.ObjectMeta) { + f(et, meta) + }) } } @@ -84,13 +121,9 @@ func (r *Registry) WalkEntityType(entityType EntityType, f func(meta v1.ObjectMe if r == nil { return } - for et := range r.r { - for _, m := range r.r[et] { - if et == entityType { - f(m) - } - } - } + + setForType := r.ensureObjectSetForType(entityType) + setForType.walk(f) } // String makes string representation of the registry @@ -111,17 +144,20 @@ func (r *Registry) registerEntity(entityType EntityType, meta v1.ObjectMeta) { return } - if r.hasEntity(entityType, meta) { - return - } - // Does not have such an entity - m := v1.ObjectMeta{ + // Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type + // and then begin operating on that (it uses a finer grained lock). + setForType := r.ensureObjectSetForType(entityType) + + // Create the representation that we'll attempt to add. + newObj := v1.ObjectMeta{ Namespace: meta.Namespace, Name: meta.Name, Labels: util.MergeStringMapsOverwrite(nil, meta.Labels), Annotations: util.MergeStringMapsOverwrite(nil, meta.Annotations), } - r.r[entityType] = append(r.r[entityType], m) + + // Add the object, which will only happen if no other object with the same identity is present in the set. + setForType.maybeAdd(&newObj) } // RegisterStatefulSet registers StatefulSet @@ -264,26 +300,32 @@ func (r *Registry) WalkPDB(f func(meta v1.ObjectMeta)) { r.WalkEntityType(PDB, f) } -// hasEntity -func (r *Registry) hasEntity(entityType EntityType, meta v1.ObjectMeta) bool { +// Subtract subtracts specified registry from main +func (r *Registry) Subtract(sub *Registry) *Registry { + if sub.Len() == 0 { + // Nothing to subtract, return base + return r + } if r.Len() == 0 { - return false + // Nowhere to subtract from + return r } - for et := range r.r { - if et == entityType { - // This is searched entityType - for _, m := range r.r[et] { - if r.isEqual(m, meta) { - // This is the element which is looked for. - return true - } - } - return false - } - } + sub.Walk(func(entityType EntityType, entity v1.ObjectMeta) { + r.deleteEntity(entityType, entity) + }) - return false + return r +} + +// hasEntity +func (r *Registry) hasEntity(entityType EntityType, meta v1.ObjectMeta) bool { + // Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type + // and then begin operating on that (it uses a finer grained lock). + setForType := r.ensureObjectSetForType(entityType) + + // Having acquired the type-specific ObjectMeta set, return the result of a membership check. + return setForType.contains(&meta) } // isEqual @@ -293,48 +335,103 @@ func (r *Registry) isEqual(a, b v1.ObjectMeta) bool { // deleteEntity func (r *Registry) deleteEntity(entityType EntityType, meta v1.ObjectMeta) bool { - if r.Len() == 0 { - return false + // Try to minimize coarse grained locking at the registry level. Immediately getOrCreate for the entity type + // and then begin operating on that (it uses a finer grained lock). + setForType := r.ensureObjectSetForType(entityType) + + // Having acquired the type-specific ObjectMeta set, return the result of removal success. + return setForType.remove(&meta) +} + +// ensureObjectSetForType resolves the singleton objectMetaSet for this registry, of the given entityType. +// This uses a coarse grained lock on the entire registry. +func (r *Registry) ensureObjectSetForType(entityType EntityType) *objectMetaSet { + if r == nil { + return nil } - for et := range r.r { - if et == entityType { - // This is searched entityType - for i, m := range r.r[et] { - if r.isEqual(m, meta) { - // This is the element which is looked for. - // Remove the element at index i - // - // Copy last element to index i. - r.r[et][i] = r.r[et][len(r.r[et])-1] - // Erase last element - help GC to collect - r.r[et][len(r.r[et])-1] = v1.ObjectMeta{} - // Truncate slice. - r.r[et] = r.r[et][:len(r.r[et])-1] - return true - } - } - return false - } + // get case (optimize for the most common condition of the set already existing) + r.mu.RLock() + existing, ok := r.r[entityType] + r.mu.RUnlock() + if ok { + return existing } - return false + // create case (requires write lock, but note that we have to double for existence within critical section) + r.mu.Lock() + defer r.mu.Unlock() + existing, ok = r.r[entityType] + if ok { + return existing + } + newSet := newObjectMetaSet(entityType) + r.r[entityType] = newSet + return newSet } -// Subtract subtracts specified registry from main -func (r *Registry) Subtract(sub *Registry) *Registry { - if sub.Len() == 0 { - // Nothing to subtract, return base - return r +// objIdentity derives a objectMetaIdentity from an ObjectMeta +func (s *objectMetaSet) objIdentity(obj *v1.ObjectMeta) objectMetaIdentity { + return objectMetaIdentity{ + name: obj.Name, + namespace: obj.Namespace, } - if r.Len() == 0 { - // Nowhere to subtract from - return r +} + +// maybeAdd adds an ObjectMeta to the set if an object with an equivalent identity is not already present +func (s *objectMetaSet) maybeAdd(o *v1.ObjectMeta) bool { + s.Lock() + defer s.Unlock() + if _, ok := s.contents[s.objIdentity(o)]; ok { + return false } + s.contents[s.objIdentity(o)] = *o + return true +} - sub.Walk(func(entityType EntityType, entity v1.ObjectMeta) { - r.deleteEntity(entityType, entity) - }) +// remove deletes an ObjectMeta from the set, matching only on identity +func (s *objectMetaSet) remove(o *v1.ObjectMeta) bool { + s.Lock() + defer s.Unlock() + if _, ok := s.contents[s.objIdentity(o)]; !ok { + return false + } + delete(s.contents, s.objIdentity(o)) + return true +} - return r +// contains determines if an ObjectMeta exists in the set (based on identity only) +func (s *objectMetaSet) contains(o *v1.ObjectMeta) bool { + s.RLock() + defer s.RUnlock() + _, ok := s.contents[s.objIdentity(o)] + return ok +} + +// walk provides an iterator-like access to the ObjectMetas contained in the set +// Note that this function is not safe to call recursively, due to the RWLock usage. +// This seems unlikely to be a problem. +func (s *objectMetaSet) walk(f func(meta v1.ObjectMeta)) { + s.RLock() + defer s.RUnlock() + + for _, obj := range s.contents { + f(obj) + } +} + +// len returns the number of ObjectMeta in the set +func (s *objectMetaSet) len() int { + s.RLock() + defer s.RUnlock() + + return len(s.contents) +} + +// newObjectMetaSet constructor for a set holding ObjectMeta with reader/writer synchronization +func newObjectMetaSet(entityType EntityType) *objectMetaSet { + return &objectMetaSet{ + entityType: entityType, + contents: make(map[objectMetaIdentity]v1.ObjectMeta), + } } diff --git a/pkg/model/registry_test.go b/pkg/model/registry_test.go new file mode 100644 index 000000000..26b29f6f6 --- /dev/null +++ b/pkg/model/registry_test.go @@ -0,0 +1,200 @@ +//go:build race + +package model + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1" + "sync" + "testing" +) + +const testNamespace = "mynamespace" +const otherNamespace = "othernamespace" + +var testCmA = v1.ObjectMeta{ + Name: "configmap-A", + Namespace: testNamespace, +} + +var testCmB = v1.ObjectMeta{ + Name: "configmap-B", + Namespace: testNamespace, +} + +// This fixture provides some coverage for the usage of namespace *and* name in identifying ObjectMeta entries. +// Note that the name *intentionally* collides with testCmA, but the namespace is different. +var testCmAOtherNamespace = v1.ObjectMeta{ + Name: "configmap-A", + Namespace: otherNamespace, +} + +var testPvcA = v1.ObjectMeta{ + Name: "persistentvolumeclaim-A", + Namespace: testNamespace, +} + +var testPvcB = v1.ObjectMeta{ + Name: "persistentvolumeclaim-B", + Namespace: testNamespace, +} + +var testPvcC = v1.ObjectMeta{ + Name: "persistentvolumeclaim-C", + Namespace: testNamespace, +} + +var testPvcD = v1.ObjectMeta{ + Name: "persistentvolumeclaim-D", + Namespace: testNamespace, +} + +// NB: These tests mostly exist to exercise synchronization and detect regressions related to them via the +// Golang race detector. See: https://go.dev/blog/race-detector +// In short, add -race to the go test flags when running this. +func Test_Registry_BasicOperations_ConcurrencyTest(t *testing.T) { + reg := NewRegistry() + if got := reg.Len(); got != 0 { + t.Errorf("New registry should have 0 length, got %d", got) + } + + // We'll execute two goroutines: + // Goroutine 1 will add testCmA and testPvcA. + // Goroutine 2 will add testCmA, testCmB, and testPvcB (some overlap to cover set properties of registry). + // After this, we'll verify that the post conditions match the union of those operations (easy with our set property). + startWg := sync.WaitGroup{} + doneWg := sync.WaitGroup{} + startWg.Add(2) // We will make sure both goroutines begin execution, i.e., that they don't execute sequentially. + doneWg.Add(2) // We also need to synchronize the test over the completion of both. + + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.RegisterConfigMap(testCmA) + reg.RegisterPVC(testPvcA) + doneWg.Done() + }() + + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.RegisterConfigMap(testCmA) + reg.RegisterConfigMap(testCmAOtherNamespace) + reg.RegisterConfigMap(testCmB) + reg.RegisterPVC(testPvcB) + doneWg.Done() + }() + + doneWg.Wait() // Block until both goroutines have completed execution + + // Verify the state of the Registry + if got := reg.Len(ConfigMap); got != 3 { + t.Errorf("Expected registry to have 3 config maps, got %d", got) + } + if got := reg.Len(PVC); got != 2 { + t.Errorf("Expected registry to have 2 PVCs, got %d", got) + } + for expectedMetaObj, expectedEntityType := range map[*v1.ObjectMeta]EntityType{ + &testCmA: ConfigMap, + &testCmAOtherNamespace: ConfigMap, + &testCmB: ConfigMap, + &testPvcA: PVC, + &testPvcB: PVC, + } { + if got := reg.hasEntity(expectedEntityType, *expectedMetaObj); !got { + t.Errorf( + "Expected registry to contain entity type %s:{Namespace = %s, Name = %s}", + expectedEntityType, + expectedMetaObj.Namespace, + expectedMetaObj.Name, + ) + } + } + + // We'll reset both wait groups and perform some additional operations, including deletions. + startWg.Add(2) + doneWg.Add(2) + + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.RegisterPVC(testPvcD) // Add a net-new PVC (both goroutines) + reg.deleteEntity(ConfigMap, testCmAOtherNamespace) // Delete testCmAOtherNamespace (only this goroutine) + reg.deleteEntity(ConfigMap, testCmB) // Delete testCmB (both goroutines) + doneWg.Done() + }() + + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.RegisterPVC(testPvcC) // Add a net-new PVC (only this goroutine) + reg.RegisterPVC(testPvcD) // Add a net-new PVC (both goroutines) + reg.deleteEntity(ConfigMap, testCmB) // Delete testCmB (both goroutines) + reg.deleteEntity(PVC, testPvcB) // Delete testPvcB (only this goroutine) + doneWg.Done() + }() + + doneWg.Wait() // Block until both goroutines have completed execution + + // Verify the state of the Registry + if got := reg.Len(ConfigMap); got != 1 { + t.Errorf("Expected registry to have 1 config maps, got %d", got) + } + if got := reg.Len(PVC); got != 3 { + t.Errorf("Expected registry to have 3 PVCs, got %d", got) + } + for expectedMetaObj, expectedEntityType := range map[*v1.ObjectMeta]EntityType{ + // We deleted testCmAOtherNamespace (one of the goroutines) + // We deleted testCmB (from both goroutines) + // We deleted testPvcB + &testCmA: ConfigMap, // We didn't touch testCmA + &testPvcA: PVC, // We didn't touch testPvcA + &testPvcC: PVC, // We added testPvcC (one of the goroutines) + &testPvcD: PVC, // We added testPvcD (both goroutines tried) + } { + if got := reg.hasEntity(expectedEntityType, *expectedMetaObj); !got { + t.Errorf( + "Expected registry to contain entity type %s:{Namespace = %s, Name = %s}", + expectedEntityType, + expectedMetaObj.Namespace, + expectedMetaObj.Name, + ) + } + } + + // Finally, let's walk through the structure in two goroutines + // We'll reset both wait groups and perform some additional operations, including deletions. + startWg.Add(2) + doneWg.Add(2) + + threadAObjsSeen := 0 + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.Walk(func(entityType EntityType, meta v1.ObjectMeta) { + threadAObjsSeen++ + }) + doneWg.Done() + }() + + threadBObjsSeen := 0 + go func() { + startWg.Done() + startWg.Wait() // Block until the other goroutine has begun execution + reg.Walk(func(entityType EntityType, meta v1.ObjectMeta) { + threadBObjsSeen++ + }) + doneWg.Done() + }() + doneWg.Wait() // Block until both goroutines have completed execution + + if threadAObjsSeen == 0 || threadBObjsSeen == 0 { + t.Errorf("Expected both goroutines to visit a nonzero number of objects via Walk") + } + if threadAObjsSeen != threadBObjsSeen { + t.Errorf( + "Expected both goroutines to see the same number of objects via Walk, got a = %d, b = %d", + threadAObjsSeen, + threadBObjsSeen, + ) + } +}