diff --git a/pkg/controller/elasticsearch/observer/manager_test.go b/pkg/controller/elasticsearch/observer/manager_test.go index 7b40d05f9b..d4e35080f7 100644 --- a/pkg/controller/elasticsearch/observer/manager_test.go +++ b/pkg/controller/elasticsearch/observer/manager_test.go @@ -197,18 +197,21 @@ func TestManager_AddObservationListener(t *testing.T) { eventsCluster2 <- cluster } }) - + doneCh := make(chan struct{}) + go func() { + // events should be propagated to both listeners + <-eventsCluster1 + <-eventsCluster2 + <-eventsCluster1 + <-eventsCluster2 + close(doneCh) + }() // observe 2 clusters obs1 := m.Observe(cluster1, fakeEsClient200(client.BasicAuth{})) defer obs1.Stop() obs2 := m.Observe(cluster2, fakeEsClient200(client.BasicAuth{})) defer obs2.Stop() - - // events should be propagated to both listeners - <-eventsCluster1 - <-eventsCluster2 - <-eventsCluster1 - <-eventsCluster2 + <-doneCh } func esObject(n types.NamespacedName) esv1.Elasticsearch { diff --git a/pkg/controller/elasticsearch/observer/observer.go b/pkg/controller/elasticsearch/observer/observer.go index f2a8e98d21..b51bc13dfd 100644 --- a/pkg/controller/elasticsearch/observer/observer.go +++ b/pkg/controller/elasticsearch/observer/observer.go @@ -65,9 +65,27 @@ func NewObserver(cluster types.NamespacedName, esClient client.Client, settings return &observer } -// Start the observer in a separate goroutine +// Start starts the Observer in a separate goroutine after a first synchronous observation. +// The first observation is synchronous to allow to retrieve the cluster state immediately after the start. +// Then, observations are performed periodically in a separate goroutine until the observer stop channel is closed. func (o *Observer) Start() { - go o.runPeriodically() + // initial synchronous observation + o.observe() + // periodic asynchronous observations + go func() { + ticker := time.NewTicker(o.settings.ObservationInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + o.observe() + case <-o.stopChan: + log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name) + return + } + } + }() } // Stop the observer loop @@ -85,25 +103,6 @@ func (o *Observer) LastHealth() esv1.ElasticsearchHealth { return o.lastHealth } -// runPeriodically triggers a state retrieval every tick, -// until the given context is cancelled -func (o *Observer) runPeriodically() { - o.observe() - - ticker := time.NewTicker(o.settings.ObservationInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - o.observe() - case <-o.stopChan: - log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name) - return - } - } -} - // observe retrieves the current ES state, executes onObservation, // and stores the new state func (o *Observer) observe() { diff --git a/pkg/controller/elasticsearch/observer/observer_test.go b/pkg/controller/elasticsearch/observer/observer_test.go index 5fd742d826..7b306fc2be 100644 --- a/pkg/controller/elasticsearch/observer/observer_test.go +++ b/pkg/controller/elasticsearch/observer/observer_test.go @@ -78,12 +78,17 @@ func TestNewObserver(t *testing.T) { onObservation := func(cluster types.NamespacedName, previousHealth, newHealth esv1.ElasticsearchHealth) { events <- cluster } + doneCh := make(chan struct{}) + go func() { + // let it observe at least 3 times + require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) + require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) + require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) + close(doneCh) + }() observer := createAndRunTestObserver(onObservation) defer observer.Stop() - // let it observe at least 3 times - require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) - require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) - require.Equal(t, types.NamespacedName{Namespace: "ns", Name: "cluster"}, <-events) + <-doneCh } func TestObserver_Stop(t *testing.T) {