Skip to content

Commit

Permalink
Fixing uninitialised lastNumberOfServers.
Browse files Browse the repository at this point in the history
  • Loading branch information
lamai93 committed Nov 5, 2018
1 parent c2509d5 commit 341fc3b
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions pkg/deployment/cluster_scaling_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {

// SendUpdateToCluster records the given spec to be sended to the cluster.
func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) {
ci.log.Debug().Msg("SendUpdateToCluster called")
ci.pendingUpdate.mutex.Lock()
defer ci.pendingUpdate.mutex.Unlock()
ci.pendingUpdate.spec = &spec
Expand All @@ -75,6 +76,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
start := time.Now()
goodInspections := 0
for {
ci.log.Debug().Msg("inspection loop for cluster int.")
delay := time.Second * 2

// Is deployment in running state
Expand All @@ -97,6 +99,8 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
goodInspections++
}
}
} else {
ci.log.Debug().Msg("cluster Phase not Running")
}

select {
Expand All @@ -112,6 +116,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
// Perform a single inspection of the cluster
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error {
log := ci.log
log.Debug().Msg("inspect cluster for scaling integration")
c, err := ci.depl.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
Expand All @@ -124,6 +129,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
return maskAny(err)
}
if req.Coordinators == nil && req.DBServers == nil {
log.Debug().Msg("Nothing to check")
// Nothing to check
return nil
}
Expand All @@ -132,15 +138,32 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
ci.lastNumberOfServers.mutex.Lock()
defer ci.lastNumberOfServers.mutex.Unlock()
desired := ci.lastNumberOfServers.NumberOfServers
if req.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
// #Coordinator has changed
coordinatorsChanged = true
}
if req.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
// #DBServers has changed
dbserversChanged = true
}
if !coordinatorsChanged && !dbserversChanged {
// if there is nothing to change, check if we naver have asked the cluster before
// if so, fill in the values for the first time.
// This happens, when the operator is redeployed and there has not been any
// update events yet.
if desired.Coordinators == nil || desired.DBServers == nil {
//ci.lastNumberOfServers.mutex.Lock()
//defer ci.lastNumberOfServers.mutex.Unlock()
ci.log.Debug().Msg("Some of desired is nil")
if req.Coordinators != nil {
ci.lastNumberOfServers.NumberOfServers.Coordinators = req.Coordinators
}
if req.DBServers != nil {
ci.lastNumberOfServers.NumberOfServers.DBServers = req.DBServers
}
}

ci.log.Debug().Msg("Nothing has changed")
// Nothing has changed
return nil
}
Expand All @@ -165,6 +188,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
// Restore original spec in cluster
ci.SendUpdateToCluster(current.Spec)
} else {
log.Debug().Msg("UpdatedCRSpec via agency")
if err := ci.depl.updateCRSpec(*newSpec); err != nil {
log.Warn().Err(err).Msg("Failed to update current deployment")
return maskAny(err)
Expand All @@ -176,12 +200,14 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
// updateClusterServerCount updates the intended number of servers of the cluster.
// Returns true when it is safe to ask the cluster for updates.
func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context, expectSuccess bool) (bool, error) {
ci.log.Debug().Msg("updateClusterServerCount")
// Any update needed?
ci.pendingUpdate.mutex.Lock()
spec := ci.pendingUpdate.spec
ci.pendingUpdate.mutex.Unlock()
if spec == nil {
// Nothing pending
ci.log.Debug().Msg("Nothing pending")
return true, nil
}

Expand All @@ -198,13 +224,16 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
ci.lastNumberOfServers.mutex.Unlock()

// This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop)
if coordinatorCount != lastNumberOfServers.GetCoordinators() && dbserverCount != lastNumberOfServers.GetDBServers() {
if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() {
ci.log.Debug().Msg("Set number of servers now")
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
if expectSuccess {
log.Debug().Err(err).Msg("Failed to set number of servers")
}
return false, maskAny(err)
}
} else {
ci.log.Debug().Msg("Nothing has changed")
}

// Success, now update internal state
Expand Down

0 comments on commit 341fc3b

Please sign in to comment.