Skip to content

Commit

Permalink
Move first ES cluster state observation out of go routine (elastic#5783)
Browse files Browse the repository at this point in the history
Move the first observation out of the goroutine to make it synchronous to avoid
an unnecessary PDB update caused when the operator restarts and we reconcile
Elasticsearch with an 'unknown' health. This occured at the second reconciliation loop
because we couldn't get the health at the first reconcile loop due to completely asynchronous
observers.

Co-authored-by: guozhi.li <guozhi.li@daocloud.io>
Co-authored-by: Thibault Richard <thbkrkr@users.noreply.github.com>
  • Loading branch information
3 people authored and fantapsody committed Jan 3, 2023
1 parent 9623ef9 commit 215ae8b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 32 deletions.
17 changes: 10 additions & 7 deletions pkg/controller/elasticsearch/observer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,21 @@ func TestManager_AddObservationListener(t *testing.T) {
eventsCluster2 <- cluster
}
})

doneCh := make(chan struct{})
go func() {
// events should be propagated to both listeners
<-eventsCluster1
<-eventsCluster2
<-eventsCluster1
<-eventsCluster2
close(doneCh)
}()
// observe 2 clusters
obs1 := m.Observe(cluster1, fakeEsClient200(client.BasicAuth{}))
defer obs1.Stop()
obs2 := m.Observe(cluster2, fakeEsClient200(client.BasicAuth{}))
defer obs2.Stop()

// events should be propagated to both listeners
<-eventsCluster1
<-eventsCluster2
<-eventsCluster1
<-eventsCluster2
<-doneCh
}

func esObject(n types.NamespacedName) esv1.Elasticsearch {
Expand Down
41 changes: 20 additions & 21 deletions pkg/controller/elasticsearch/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,27 @@ func NewObserver(cluster types.NamespacedName, esClient client.Client, settings
return &observer
}

// Start the observer in a separate goroutine
// Start starts the Observer in a separate goroutine after a first synchronous observation.
// The first observation is synchronous to allow to retrieve the cluster state immediately after the start.
// Then, observations are performed periodically in a separate goroutine until the observer stop channel is closed.
func (o *Observer) Start() {
go o.runPeriodically()
// initial synchronous observation
o.observe()
// periodic asynchronous observations
go func() {
ticker := time.NewTicker(o.settings.ObservationInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
o.observe()
case <-o.stopChan:
log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name)
return
}
}
}()
}

// Stop the observer loop
Expand All @@ -85,25 +103,6 @@ func (o *Observer) LastHealth() esv1.ElasticsearchHealth {
return o.lastHealth
}

// runPeriodically triggers a state retrieval every tick,
// until the given context is cancelled
func (o *Observer) runPeriodically() {
o.observe()

ticker := time.NewTicker(o.settings.ObservationInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
o.observe()
case <-o.stopChan:
log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name)
return
}
}
}

// observe retrieves the current ES state, executes onObservation,
// and stores the new state
func (o *Observer) observe() {
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/elasticsearch/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,17 @@ func TestNewObserver(t *testing.T) {
onObservation := func(cluster types.NamespacedName, previousHealth, newHealth esv1.ElasticsearchHealth) {
events <- cluster
}
doneCh := make(chan struct{})
go func() {
// let it observe at least 3 times
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
close(doneCh)
}()
observer := createAndRunTestObserver(onObservation)
defer observer.Stop()
// let it observe at least 3 times
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events)
<-doneCh
}

func TestObserver_Stop(t *testing.T) {
Expand Down

0 comments on commit 215ae8b

Please sign in to comment.