From 341fc3b47e59e6e32fd83db92c0d1a7ed1381986 Mon Sep 17 00:00:00 2001 From: lamai93 Date: Mon, 5 Nov 2018 23:15:09 +0100 Subject: [PATCH] Fixing uninitialised `lastNumberOfServers`. --- pkg/deployment/cluster_scaling_integration.go | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index 1b2e1df5c..a3533d68e 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -65,6 +65,7 @@ func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration { // SendUpdateToCluster records the given spec to be sended to the cluster. func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) { + ci.log.Debug().Msg("SendUpdateToCluster called") ci.pendingUpdate.mutex.Lock() defer ci.pendingUpdate.mutex.Unlock() ci.pendingUpdate.spec = &spec @@ -75,6 +76,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct start := time.Now() goodInspections := 0 for { + ci.log.Debug().Msg("inspection loop for cluster int.") delay := time.Second * 2 // Is deployment in running state @@ -97,6 +99,8 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct goodInspections++ } } + } else { + ci.log.Debug().Msg("cluster Phase not Running") } select { @@ -112,6 +116,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct // Perform a single inspection of the cluster func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error { log := ci.log + log.Debug().Msg("inspect cluster for scaling integration") c, err := ci.depl.clientCache.GetDatabase(ctx) if err != nil { return maskAny(err) @@ -124,6 +129,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS return maskAny(err) } if req.Coordinators == nil && req.DBServers == nil { + log.Debug().Msg("Nothing to check") // Nothing to check return nil } @@ -132,15 +138,32 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS ci.lastNumberOfServers.mutex.Lock() defer ci.lastNumberOfServers.mutex.Unlock() desired := ci.lastNumberOfServers.NumberOfServers - if req.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { + if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { // #Coordinator has changed coordinatorsChanged = true } - if req.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { + if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { // #DBServers has changed dbserversChanged = true } if !coordinatorsChanged && !dbserversChanged { + // if there is nothing to change, check if we naver have asked the cluster before + // if so, fill in the values for the first time. + // This happens, when the operator is redeployed and there has not been any + // update events yet. + if desired.Coordinators == nil || desired.DBServers == nil { + //ci.lastNumberOfServers.mutex.Lock() + //defer ci.lastNumberOfServers.mutex.Unlock() + ci.log.Debug().Msg("Some of desired is nil") + if req.Coordinators != nil { + ci.lastNumberOfServers.NumberOfServers.Coordinators = req.Coordinators + } + if req.DBServers != nil { + ci.lastNumberOfServers.NumberOfServers.DBServers = req.DBServers + } + } + + ci.log.Debug().Msg("Nothing has changed") // Nothing has changed return nil } @@ -165,6 +188,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // Restore original spec in cluster ci.SendUpdateToCluster(current.Spec) } else { + log.Debug().Msg("UpdatedCRSpec via agency") if err := ci.depl.updateCRSpec(*newSpec); err != nil { log.Warn().Err(err).Msg("Failed to update current deployment") return maskAny(err) @@ -176,12 +200,14 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS // updateClusterServerCount updates the intended number of servers of the cluster. // Returns true when it is safe to ask the cluster for updates. func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context, expectSuccess bool) (bool, error) { + ci.log.Debug().Msg("updateClusterServerCount") // Any update needed? ci.pendingUpdate.mutex.Lock() spec := ci.pendingUpdate.spec ci.pendingUpdate.mutex.Unlock() if spec == nil { // Nothing pending + ci.log.Debug().Msg("Nothing pending") return true, nil } @@ -198,13 +224,16 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex ci.lastNumberOfServers.mutex.Unlock() // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop) - if coordinatorCount != lastNumberOfServers.GetCoordinators() && dbserverCount != lastNumberOfServers.GetDBServers() { + if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() { + ci.log.Debug().Msg("Set number of servers now") if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { if expectSuccess { log.Debug().Err(err).Msg("Failed to set number of servers") } return false, maskAny(err) } + } else { + ci.log.Debug().Msg("Nothing has changed") } // Success, now update internal state