-
Notifications
You must be signed in to change notification settings - Fork 729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move first ES cluster state observation out of go routine #5783
Conversation
…certain. As a result, the lasthealth in the observer structure is unknown, and pkg/controller/elasticsearch/pdb/reconcile.go:196 returns 0. The probability of restarting the operator will cause *v1beta1.PodDisruptionBudgetStatus minavailable is updated from 1 (if it is the correct 1 before operator is restarted) to 2. After the observe is executed, the lasthealth is green, and the minavailable is updated from 2 to 1. Therefore, restart the operator *v1beta1.PodDisruptionBudgetStatus is updated twice.
…certain. As a result, the lasthealth in the observer structure is unknown, and pkg/controller/elasticsearch/pdb/reconcile.go:196 returns 0. The probability of restarting the operator will cause *v1beta1.PodDisruptionBudgetStatus minavailable is updated from 1 (if it is the correct 1 before operator is restarted) to 2. After the observe is executed, the lasthealth is green, and the minavailable is updated from 2 to 1. Therefore, restart the operator *v1beta1.PodDisruptionBudgetStatus is updated twice.
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
If you look at runPeriodically()
, there was an intention to trigger a first observation before doing it periodically but it has been done before the go routine starts. So I would rather we fix runPeriodically() / Start()
. I think we can merge the two because the indirection provided by runPeriodically()
makes reading the flow a bit harder and let us make this bug exist, and then move the first observation out of the go routine, like this:
diff:
diff --git a/pkg/controller/elasticsearch/observer/observer.go b/pkg/controller/elasticsearch/observer/observer.go
index 1fb8bc8aa..31fd5d415 100644
--- a/pkg/controller/elasticsearch/observer/observer.go
+++ b/pkg/controller/elasticsearch/observer/observer.go
@@ -65,9 +65,26 @@ func NewObserver(cluster types.NamespacedName, esClient client.Client, settings
return &observer
}
-// Start the observer in a separate goroutine
+// Start starts the Observer in a separate goroutine.
+// The cluster state is observed periodically until the observer stop channel is closed.
func (o *Observer) Start() {
- go o.runPeriodically()
+ // initial synchronous observation
+ o.observe()
+ // periodic asynchronous observations
+ go func() {
+ ticker := time.NewTicker(o.settings.ObservationInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ o.observe()
+ case <-o.stopChan:
+ log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name)
+ return
+ }
+ }
+ }()
}
// Stop the observer loop
@@ -85,25 +102,6 @@ func (o *Observer) LastHealth() esv1.ElasticsearchHealth {
return o.lastHealth
}
-// runPeriodically triggers a state retrieval every tick,
-// until the given context is cancelled
-func (o *Observer) runPeriodically() {
- o.observe()
-
- ticker := time.NewTicker(o.settings.ObservationInterval)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- o.observe()
- case <-o.stopChan:
- log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name)
- return
- }
- }
-}
-
💚 CLA has been signed |
This comment was marked as resolved.
This comment was marked as resolved.
Jenkins test this please |
I have make event chan asynchronous。But I don't know if it's consistent with the original logic。 @thbkrkr pkg/controller/elasticsearch/observer/watch.go
@@ -17,8 +17,9 @@ import (
// whose health has changed between 2 observations.
// Aimed to be used for triggering a reconciliation.
func WatchClusterHealthChange(m *Manager) *source.Channel {
- evtChan := make(chan event.GenericEvent)
+ var evtChan chan event.GenericEvent
m.AddObservationListener(healthChangeListener(evtChan))
+ evtChan = make(chan event.GenericEvent, len(m.listeners))
return &source.Channel{
// Each event in Source will be consumed and turned into
// a reconciliation request. |
I opened #5812 to discuss it because it is more involved than I thought. |
I modified the E2E. Becase our usage scenarios do not conform to the E2E. So there is a problem with the E2E test, and the program runs without problem. @thbkrkr code // Start implements Source and should only be called by the Controller.
func (cs *Channel) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify Channel.Source")
}
// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}
// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
}
dst := make(chan event.GenericEvent, cs.DestBufferSize)
cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
cs.destLock.Unlock()
cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop(ctx)
})
go func() {
for evt := range dst {
shouldHandle := true
for _, p := range prct {
if !p.Generic(evt) {
shouldHandle = false
break
}
}
if shouldHandle {
handler.Generic(evt, queue)
}
}
}()
return nil
} E2E diff eventsCluster2 <- cluster
}
})
+ doneCh := make(chan struct{})
+ go func() {
+ // events should be propagated to both listeners
+ <-eventsCluster1
+ <-eventsCluster2
+ <-eventsCluster1
+ <-eventsCluster2
+ doneCh <- struct{}{}
+ }()
// observe 2 clusters
obs1 := m.Observe(cluster1, fakeEsClient200(client.BasicAuth{}))
defer obs1.Stop()
obs2 := m.Observe(cluster2, fakeEsClient200(client.BasicAuth{}))
defer obs2.Stop()
- // events should be propagated to both listeners
- <-eventsCluster1
- <-eventsCluster2
- <-eventsCluster1
- <-eventsCluster2
+ <-doneCh
}
func esObject(n types.NamespacedName) esv1.Elasticsearch { |
It is possible that other test cases are affected. I should do more tests. |
I have tested package observer all test Fun。Make jenkins test again。 @thbkrkr |
review again @barkbay |
godoc Co-authored-by: Thibault Richard <thbkrkr@users.noreply.github.com>
review again @thbkrkr |
Jenkins test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I think this PR has actually introduced some performance issues described here: #6078 |
Couple of improvements on the observer: * The first synchronous observation implemented in #5783 is no longer invoked if the Elasticsearch Service is not "ready" * observer.Start() is invoked outside of any lock-protected section An APM span added around observer.observe(...)
Move the first observation out of the goroutine to make it synchronous to avoid an unnecessary PDB update caused when the operator restarts and we reconcile Elasticsearch with an 'unknown' health. This occured at the second reconciliation loop because we couldn't get the health at the first reconcile loop due to completely asynchronous observers. Co-authored-by: guozhi.li <guozhi.li@daocloud.io> Co-authored-by: Thibault Richard <thbkrkr@users.noreply.github.com>
There is a process to execute in Start, but the execution point is uncertain. As a result, the
lastHealth
in the observer structure is unknown, and pkg/controller/elasticsearch/pdb/reconcile.go:196 returns 0. The probability of restarting the operator will cause *v1beta1.PodDisruptionBudgetStatusMinAvailable
to be updated from 1 (it is the correct 1 before operator restart) to 2. After the observe is executed, thelastHealth
is green, and theMinAvailable
is updated from 2 to 1. Therefore, restart the operator *v1beta1.PodDisruptionBudgetStatus is updated twice.Repro steps:
fmt.Println("update!") line 182 of
pkg/controller/common/reconciler/reconciler.go`:Resolves #5812.