From 40377711bcb804eb50306adfd559d98c8f3364b4 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 12 Mar 2020 12:59:46 +0100 Subject: [PATCH 1/2] Remove unneeded locks in add_kubernetes_metadata Read/write mutexes in matchers and indexers are only locked for read, making them unnecesary. These locks are acquired for any event that is enriched by the processor, so unneeded locking may affect performance. Also, refactor locking in indexing registry so it is used in all cases. --- .../add_kubernetes_metadata/indexers.go | 8 ---- .../add_kubernetes_metadata/indexing.go | 44 +++++++++++++++---- .../add_kubernetes_metadata/kubernetes.go | 12 +---- .../add_kubernetes_metadata/matchers.go | 6 --- 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/indexers.go b/libbeat/processors/add_kubernetes_metadata/indexers.go index 673337fa793..76c9c002c11 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers.go @@ -19,7 +19,6 @@ package add_kubernetes_metadata import ( "fmt" - "sync" "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" @@ -55,7 +54,6 @@ type MetadataIndex struct { } type Indexers struct { - sync.RWMutex indexers []Indexer } @@ -91,8 +89,6 @@ func NewIndexers(configs PluginConfig, metaGen metadata.MetaGen) *Indexers { // GetIndexes returns the composed index list from all registered indexers func (i *Indexers) GetIndexes(pod *kubernetes.Pod) []string { var indexes []string - i.RLock() - defer i.RUnlock() for _, indexer := range i.indexers { for _, i := range indexer.GetIndexes(pod) { indexes = append(indexes, i) @@ -104,8 +100,6 @@ func (i *Indexers) GetIndexes(pod *kubernetes.Pod) []string { // GetMetadata returns the composed metadata list from all registered indexers func (i *Indexers) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { var metadata []MetadataIndex - i.RLock() - defer i.RUnlock() for _, indexer := range i.indexers { for _, m := range indexer.GetMetadata(pod) { metadata = append(metadata, m) @@ -116,8 +110,6 @@ func (i *Indexers) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { // Empty returns true if indexers list is empty func (i *Indexers) Empty() bool { - i.RLock() - defer i.RUnlock() if len(i.indexers) == 0 { return true } diff --git a/libbeat/processors/add_kubernetes_metadata/indexing.go b/libbeat/processors/add_kubernetes_metadata/indexing.go index d011d0c7a97..0afe7ad9b71 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexing.go +++ b/libbeat/processors/add_kubernetes_metadata/indexing.go @@ -50,30 +50,36 @@ func NewRegister() *Register { // AddIndexer to the register func (r *Register) AddIndexer(name string, indexer IndexerConstructor) { - r.RWMutex.Lock() - defer r.RWMutex.Unlock() + r.Lock() + defer r.Unlock() r.indexers[name] = indexer } // AddMatcher to the register func (r *Register) AddMatcher(name string, matcher MatcherConstructor) { - r.RWMutex.Lock() - defer r.RWMutex.Unlock() + r.Lock() + defer r.Unlock() r.matchers[name] = matcher } // AddIndexer to the register func (r *Register) AddDefaultIndexerConfig(name string, config common.Config) { + r.Lock() + defer r.Unlock() r.defaultIndexerConfigs[name] = config } // AddMatcher to the register func (r *Register) AddDefaultMatcherConfig(name string, config common.Config) { + r.Lock() + defer r.Unlock() r.defaultMatcherConfigs[name] = config } // AddIndexer to the register func (r *Register) GetIndexer(name string) IndexerConstructor { + r.RLock() + defer r.RUnlock() indexer, ok := r.indexers[name] if ok { return indexer @@ -84,6 +90,8 @@ func (r *Register) GetIndexer(name string) IndexerConstructor { // AddMatcher to the register func (r *Register) GetMatcher(name string) MatcherConstructor { + r.RLock() + defer r.RUnlock() matcher, ok := r.matchers[name] if ok { return matcher @@ -92,10 +100,30 @@ func (r *Register) GetMatcher(name string) MatcherConstructor { } } -func (r *Register) GetDefaultIndexerConfigs() map[string]common.Config { - return r.defaultIndexerConfigs +// GetDefaultIndexerConfigs obtains the plugin configuration for the default indexer +// configurations registered +func (r *Register) GetDefaultIndexerConfigs() PluginConfig { + r.RLock() + defer r.RUnlock() + + configs := make(PluginConfig, 0, len(r.indexers)) + for key, cfg := range r.defaultIndexerConfigs { + configs = append(configs, map[string]common.Config{key: cfg}) + } + + return configs } -func (r *Register) GetDefaultMatcherConfigs() map[string]common.Config { - return r.defaultMatcherConfigs +// GetDefaultMatcherConfigs obtains the plugin configuration for the default matcher +// configurations registered +func (r *Register) GetDefaultMatcherConfigs() PluginConfig { + r.RLock() + defer r.RUnlock() + + configs := make(PluginConfig, 0, len(r.indexers)) + for key, cfg := range r.defaultMatcherConfigs { + configs = append(configs, map[string]common.Config{key: cfg}) + } + + return configs } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 97bb88da02d..9b7bc5653f8 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -100,20 +100,12 @@ func New(cfg *common.Config) (processors.Processor, error) { //Load default indexer configs if config.DefaultIndexers.Enabled == true { - Indexing.RLock() - for key, cfg := range Indexing.GetDefaultIndexerConfigs() { - config.Indexers = append(config.Indexers, map[string]common.Config{key: cfg}) - } - Indexing.RUnlock() + config.Indexers = Indexing.GetDefaultIndexerConfigs() } //Load default matcher configs if config.DefaultMatchers.Enabled == true { - Indexing.RLock() - for key, cfg := range Indexing.GetDefaultMatcherConfigs() { - config.Matchers = append(config.Matchers, map[string]common.Config{key: cfg}) - } - Indexing.RUnlock() + config.Matchers = Indexing.GetDefaultMatcherConfigs() } processor := &kubernetesAnnotator{ diff --git a/libbeat/processors/add_kubernetes_metadata/matchers.go b/libbeat/processors/add_kubernetes_metadata/matchers.go index ca14746f130..291afafbd2a 100644 --- a/libbeat/processors/add_kubernetes_metadata/matchers.go +++ b/libbeat/processors/add_kubernetes_metadata/matchers.go @@ -19,7 +19,6 @@ package add_kubernetes_metadata import ( "fmt" - "sync" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -43,7 +42,6 @@ type Matcher interface { } type Matchers struct { - sync.RWMutex matchers []Matcher } @@ -76,8 +74,6 @@ func NewMatchers(configs PluginConfig) *Matchers { // MetadataIndex returns the index string for the first matcher from the Registry returning one func (m *Matchers) MetadataIndex(event common.MapStr) string { - m.RLock() - defer m.RUnlock() for _, matcher := range m.matchers { index := matcher.MetadataIndex(event) if index != "" { @@ -90,8 +86,6 @@ func (m *Matchers) MetadataIndex(event common.MapStr) string { } func (m *Matchers) Empty() bool { - m.RLock() - defer m.RUnlock() if len(m.matchers) == 0 { return true } From dbca3a3d3d069f667cdfe850fc1d8603e75bb266 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 12 Mar 2020 13:47:11 +0100 Subject: [PATCH 2/2] Fix initialization of plugin config list --- libbeat/processors/add_kubernetes_metadata/indexing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/add_kubernetes_metadata/indexing.go b/libbeat/processors/add_kubernetes_metadata/indexing.go index 0afe7ad9b71..19f66ea1212 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexing.go +++ b/libbeat/processors/add_kubernetes_metadata/indexing.go @@ -106,7 +106,7 @@ func (r *Register) GetDefaultIndexerConfigs() PluginConfig { r.RLock() defer r.RUnlock() - configs := make(PluginConfig, 0, len(r.indexers)) + configs := make(PluginConfig, 0, len(r.defaultIndexerConfigs)) for key, cfg := range r.defaultIndexerConfigs { configs = append(configs, map[string]common.Config{key: cfg}) } @@ -120,7 +120,7 @@ func (r *Register) GetDefaultMatcherConfigs() PluginConfig { r.RLock() defer r.RUnlock() - configs := make(PluginConfig, 0, len(r.indexers)) + configs := make(PluginConfig, 0, len(r.defaultMatcherConfigs)) for key, cfg := range r.defaultMatcherConfigs { configs = append(configs, map[string]common.Config{key: cfg}) }