diff --git a/pkg/controller/elasticsearch/driver/driver.go b/pkg/controller/elasticsearch/driver/driver.go index 57020bf0e9..69ef89cc6c 100644 --- a/pkg/controller/elasticsearch/driver/driver.go +++ b/pkg/controller/elasticsearch/driver/driver.go @@ -181,6 +181,12 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { if min == nil { min = &d.Version } + + isServiceReady, err := services.IsServiceReady(d.Client, *internalService) + if err != nil { + return results.WithError(err) + } + observedState := d.Observers.ObservedStateResolver( ctx, d.ES, @@ -191,6 +197,7 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { *min, trustedHTTPCertificates, ), + isServiceReady, ) // Always update the Elasticsearch state bits with the latest observed state. @@ -233,11 +240,6 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results { ) defer esClient.Close() - isServiceReady, err := services.IsServiceReady(d.Client, *internalService) - if err != nil { - return results.WithError(err) - } - // use unknown health as a proxy for a cluster not responding to requests hasKnownHealthState := observedState() != esv1.ElasticsearchUnknownHealth esReachable := isServiceReady && hasKnownHealthState diff --git a/pkg/controller/elasticsearch/observer/manager.go b/pkg/controller/elasticsearch/observer/manager.go index 536328ea7d..990c6d297c 100644 --- a/pkg/controller/elasticsearch/observer/manager.go +++ b/pkg/controller/elasticsearch/observer/manager.go @@ -14,6 +14,7 @@ import ( esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" ) @@ -44,8 +45,13 @@ func NewManager(defaultInterval time.Duration, tracer *apm.Tracer) *Manager { // ObservedStateResolver returns a function that returns the last known state of the given cluster, // as expected by the main reconciliation driver -func (m *Manager) ObservedStateResolver(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client) func() esv1.ElasticsearchHealth { - observer := m.Observe(ctx, cluster, esClient) +func (m *Manager) ObservedStateResolver( + ctx context.Context, + cluster esv1.Elasticsearch, + esClient client.Client, + isServiceReady bool, +) func() esv1.ElasticsearchHealth { + observer := m.Observe(ctx, cluster, esClient, isServiceReady) return func() esv1.ElasticsearchHealth { return observer.LastHealth() } @@ -61,7 +67,8 @@ func (m *Manager) getObserver(key types.NamespacedName) (*Observer, bool) { // Observe gets or create a cluster state observer for the given cluster // In case something has changed in the given esClient (eg. different caCert), the observer is recreated accordingly -func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client) *Observer { +func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client, isServiceReady bool) *Observer { + defer tracing.Span(&ctx)() nsName := k8s.ExtractNamespacedName(&cluster) settings := m.extractObserverSettings(ctx, cluster) @@ -69,16 +76,27 @@ func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esCli switch { case !exists: - return m.createOrReplaceObserver(nsName, settings, esClient) + // This Elasticsearch resource has not being observed yet, create the observer and maybe do a first observation. + observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient) case exists && (!observer.esClient.Equal(esClient) || observer.settings != settings): - return m.createOrReplaceObserver(nsName, settings, esClient) + // This Elasticsearch resource is already being observed asynchronously, no need to do a first observation. + observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient) case exists && settings.ObservationInterval <= 0: // in case asynchronous observation has been disabled ensure at least one observation at reconciliation time. - return m.getAndObserveSynchronously(nsName) + return m.getAndObserveSynchronously(ctx, nsName) default: + // No change, close the provided Client and return the existing observer. esClient.Close() return observer } + + if !exists && isServiceReady { + // there was no existing observer and Service is ready: let's try an initial synchronous observation + observer.observe(ctx) + } + // start the new observer + observer.Start() + return observer } // extractObserverSettings extracts observer settings from the annotations on the Elasticsearch resource. @@ -90,7 +108,9 @@ func (m *Manager) extractObserverSettings(ctx context.Context, cluster esv1.Elas } // createOrReplaceObserver creates a new observer and adds it to the observers map, replacing existing observers if necessary. -func (m *Manager) createOrReplaceObserver(cluster types.NamespacedName, settings Settings, esClient client.Client) *Observer { +// The new observer is not started, it is up to the caller to invoke observer.Start(ctx) +func (m *Manager) createOrReplaceObserver(ctx context.Context, cluster types.NamespacedName, settings Settings, esClient client.Client) *Observer { + defer tracing.Span(&ctx)() m.observerLock.Lock() defer m.observerLock.Unlock() @@ -100,24 +120,21 @@ func (m *Manager) createOrReplaceObserver(cluster types.NamespacedName, settings observer.Stop() delete(m.observers, cluster) } - observer = NewObserver(cluster, esClient, settings, m.notifyListeners) - observer.Start() - m.observers[cluster] = observer - return observer } // getAndObserveSynchronously retrieves the currently configured observer and trigger a synchronous observation. -func (m *Manager) getAndObserveSynchronously(cluster types.NamespacedName) *Observer { +func (m *Manager) getAndObserveSynchronously(ctx context.Context, cluster types.NamespacedName) *Observer { + defer tracing.Span(&ctx)() m.observerLock.RLock() defer m.observerLock.RUnlock() // invariant: this method must only be called when existence of observer is given observer := m.observers[cluster] // force a synchronous observation - observer.observe() + observer.observe(ctx) return observer } diff --git a/pkg/controller/elasticsearch/observer/manager_test.go b/pkg/controller/elasticsearch/observer/manager_test.go index 6dacfb5f90..2f7c05c923 100644 --- a/pkg/controller/elasticsearch/observer/manager_test.go +++ b/pkg/controller/elasticsearch/observer/manager_test.go @@ -112,7 +112,7 @@ func TestManager_Observe(t *testing.T) { if initial, exists := tt.initiallyObserved[tt.clusterToObserve]; exists { initialCreationTime = initial.creationTime } - observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), tt.clusterToObserveClient) + observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), tt.clusterToObserveClient, true) // returned observer should be the correct one require.Equal(t, tt.clusterToObserve, observer.cluster) // list of observers should have been updated @@ -181,8 +181,8 @@ func TestManager_ObserveSync(t *testing.T) { name := cluster("es1") cluster := esObject(name) results := []esv1.ElasticsearchHealth{ - tt.manager.ObservedStateResolver(context.Background(), cluster, esClient)(), - tt.manager.ObservedStateResolver(context.Background(), cluster, esClient)(), + tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(), + tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(), } require.Equal(t, tt.expectedHealth, results) tt.manager.StopObserving(name) // let's clean up the go-routines @@ -277,9 +277,9 @@ func TestManager_AddObservationListener(t *testing.T) { close(doneCh) }() // observe 2 clusters - obs1 := m.Observe(ctx, cluster1, fakeEsClient200(client.BasicAuth{})) + obs1 := m.Observe(ctx, cluster1, fakeEsClient200(client.BasicAuth{}), true) defer obs1.Stop() - obs2 := m.Observe(ctx, cluster2, fakeEsClient200(client.BasicAuth{})) + obs2 := m.Observe(ctx, cluster2, fakeEsClient200(client.BasicAuth{}), true) defer obs2.Stop() <-doneCh } diff --git a/pkg/controller/elasticsearch/observer/observer.go b/pkg/controller/elasticsearch/observer/observer.go index c3bbf286d4..26b89f0954 100644 --- a/pkg/controller/elasticsearch/observer/observer.go +++ b/pkg/controller/elasticsearch/observer/observer.go @@ -72,8 +72,6 @@ func NewObserver(cluster types.NamespacedName, esClient client.Client, settings // The first observation is synchronous to allow to retrieve the cluster state immediately after the start. // Then, observations are performed periodically in a separate goroutine until the observer stop channel is closed. func (o *Observer) Start() { - // initial synchronous observation - o.observe() if o.settings.ObservationInterval <= 0 { return // asynchronous observations are effectively disabled } @@ -85,7 +83,7 @@ func (o *Observer) Start() { for { select { case <-ticker.C: - o.observe() + o.observe(context.Background()) case <-o.stopChan: log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name) return @@ -111,11 +109,12 @@ func (o *Observer) LastHealth() esv1.ElasticsearchHealth { // observe retrieves the current ES state, executes onObservation, // and stores the new state -func (o *Observer) observe() { - ctx, cancelFunc := context.WithTimeout(context.Background(), nonNegativeTimeout(o.settings.ObservationInterval)) +func (o *Observer) observe(ctx context.Context) { + defer tracing.Span(&ctx)() + ctx, cancelFunc := context.WithTimeout(ctx, nonNegativeTimeout(o.settings.ObservationInterval)) defer cancelFunc() - if o.settings.Tracer != nil { + if o.settings.Tracer != nil && apm.TransactionFromContext(ctx) == nil { tx := o.settings.Tracer.StartTransaction(name, string(tracing.PeriodicTxType)) defer tx.End() ctx = apm.ContextWithTransaction(ctx, tx) @@ -128,10 +127,13 @@ func (o *Observer) observe() { if o.onObservation != nil { o.onObservation(o.cluster, o.LastHealth(), newHealth) } + o.updateHealth(newHealth) +} +func (o *Observer) updateHealth(newHealth esv1.ElasticsearchHealth) { o.mutex.Lock() + defer o.mutex.Unlock() o.lastHealth = newHealth - o.mutex.Unlock() } func nonNegativeTimeout(observationInterval time.Duration) time.Duration { diff --git a/pkg/controller/elasticsearch/observer/observer_test.go b/pkg/controller/elasticsearch/observer/observer_test.go index db20d37ab2..7f0737aab3 100644 --- a/pkg/controller/elasticsearch/observer/observer_test.go +++ b/pkg/controller/elasticsearch/observer/observer_test.go @@ -55,9 +55,9 @@ func TestObserver_observe(t *testing.T) { esClient: fakeEsClient, onObservation: onObservation, } - observer.observe() + observer.observe(context.Background()) require.Equal(t, int32(1), atomic.LoadInt32(&counter)) - observer.observe() + observer.observe(context.Background()) require.Equal(t, int32(2), atomic.LoadInt32(&counter)) } @@ -69,7 +69,7 @@ func TestObserver_observe_nilFunction(t *testing.T) { onObservation: nilFunc, } // should not panic - observer.observe() + observer.observe(context.Background()) } func TestNewObserver(t *testing.T) { @@ -97,7 +97,7 @@ func TestObserver_Stop(t *testing.T) { } observer := createAndRunTestObserver(onObservation) // force at least one observation - observer.observe() + observer.observe(context.Background()) // stop the observer observer.Stop() // should be safe to call multiple times