Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #812 from weaveworks/756b-include-errors-in-sync-e…
Browse files Browse the repository at this point in the history
…vents

756b include errors in sync events
  • Loading branch information
tamarakaufler authored Oct 25, 2017
2 parents 09accd1 + 0052d31 commit 65c823b
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 27 deletions.
1 change: 1 addition & 0 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (c *Cluster) Ping() error {
return err
}

// Export exports cluster resources
func (c *Cluster) Export() ([]byte, error) {
var config bytes.Buffer
list, err := c.client.Namespaces().List(meta_v1.ListOptions{})
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (dk *deploymentKind) getPodControllers(c *Cluster, namespace string) ([]pod
}

var podControllers []podController
for i, _ := range deployments.Items {
for i := range deployments.Items {
podControllers = append(podControllers, makeDeploymentPodController(&deployments.Items[i]))
}

Expand All @@ -95,6 +95,7 @@ func (dk *deploymentKind) getPodControllers(c *Cluster, namespace string) ([]pod
func makeDeploymentPodController(deployment *apiext.Deployment) podController {
var status string
objectMeta, deploymentStatus := deployment.ObjectMeta, deployment.Status

if deploymentStatus.ObservedGeneration >= objectMeta.Generation {
// the definition has been updated; now let's see about the replicas
updated, wanted := deploymentStatus.UpdatedReplicas, *deployment.Spec.Replicas
Expand Down
1 change: 1 addition & 0 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/event"
"github.com/weaveworks/flux/git"
Expand Down
3 changes: 2 additions & 1 deletion policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func clone(s Set) Set {
return newMap
}

// Contains method determines if a resource has a particular policy present
func (s Set) Contains(needle Policy) bool {
for p, _ := range s {
for p := range s {
if p == needle {
return true
}
Expand Down
56 changes: 56 additions & 0 deletions sync/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sync

import (
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/policy"
)

type rsc struct {
bytes []byte
Kind string
Meta struct {
Namespace string
Name string
}
}

type rscIgnorePolicy struct {
rsc
}

func (rs rsc) Source() string {
return ""
}

func (rs rsc) Bytes() []byte {
return []byte{}
}

func (rs rsc) ResourceID() flux.ResourceID {
return flux.MakeResourceID(rs.Meta.Namespace, rs.Kind, rs.Meta.Name)
}

func (rs rsc) Policy() policy.Set {
p := policy.Set{}
return p
}

func (ri rscIgnorePolicy) Policy() policy.Set {
p := policy.Set{}
p[policy.Ignore] = "true"
return p
}

func mockResourceWithoutIgnorePolicy(kind, namespace, name string) rsc {
r := rsc{Kind: kind}
r.Meta.Namespace = namespace
r.Meta.Name = name
return r
}

func mockResourceWithIgnorePolicy(kind, namespace, name string) rscIgnorePolicy {
ri := rscIgnorePolicy{rsc{Kind: kind}}
ri.Meta.Namespace = namespace
ri.Meta.Name = name
return ri
}
97 changes: 72 additions & 25 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/weaveworks/flux/resource"
)

// Synchronise the cluster to the files in a directory
// Sync synchronises the cluster to the files in a directory
func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool, logger log.Logger) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

if err != nil {
return errors.Wrap(err, "exporting resource defs from cluster")
}
Expand All @@ -26,38 +27,84 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
// to figuring out what's changed, and applying that. We're
// relying on Kubernetes to decide for each application if it is a
// no-op.
var sync cluster.SyncDef
sync := cluster.SyncDef{}

nsClusterResources, otherClusterResources := separateResourcesByType(clusterResources)
nsRepoResources, otherRepoResources := separateResourcesByType(repoResources)

// First tackle resources that are not Namespace kind, in case we are deleting the Namespace as well
// Deleting a Namespace first, then a resource in this namespace causes an error

// DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and
// other pods unless the relevant manifests are part of the user repo. Needs a lot of thought
// before this cleanup cluster feature can be unleashed on the world.
if deletes {
for id, res := range clusterResources {
if res.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "delete")
continue
}
if _, ok := repoResources[id]; !ok {
sync.Actions = append(sync.Actions, cluster.SyncAction{
ResourceID: id,
Delete: res.Bytes(),
})
}
for id, res := range otherClusterResources {
prepareSyncDelete(logger, repoResources, id, res, &sync)
}
for id, res := range nsClusterResources {
prepareSyncDelete(logger, repoResources, id, res, &sync)
}
}

for id, res := range repoResources {
if res.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "apply")
continue
}
if cres, ok := clusterResources[id]; ok {
if cres.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "apply")
continue
}
// To avoid errors due to a non existent namespace if a resource in that namespace is created first,
// create Namespace objects first
for id, res := range nsRepoResources {
prepareSyncApply(logger, clusterResources, id, res, &sync)
}
for id, res := range otherRepoResources {
prepareSyncApply(logger, clusterResources, id, res, &sync)
}

return clus.Sync(sync)
}

func separateResourcesByType(resources map[string]resource.Resource) (map[string]resource.Resource, map[string]resource.Resource) {
if len(resources) == 0 {
return nil, nil
}
nsResources := make(map[string]resource.Resource)
otherResources := make(map[string]resource.Resource)
for id, res := range resources {
_, kind, _ := res.ResourceID().Components()
if kind == "namespace" {
nsResources[id] = res
} else {
otherResources[id] = res
}
}
return nsResources, otherResources
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
if len(repoResources) == 0 {
return
}
if res.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "delete")
return
}
if _, ok := repoResources[id]; !ok {
sync.Actions = append(sync.Actions, cluster.SyncAction{
ResourceID: id,
Apply: res.Bytes(),
Delete: res.Bytes(),
})
}
return clus.Sync(sync)
}

func prepareSyncApply(logger log.Logger, clusterResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
if res.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "apply")
return
}
if cres, ok := clusterResources[id]; ok {
if cres.Policy().Contains(policy.Ignore) {
logger.Log("resource", res.ResourceID(), "ignore", "apply")
return
}
}
sync.Actions = append(sync.Actions, cluster.SyncAction{
ResourceID: id,
Apply: res.Bytes(),
})
}
Loading

0 comments on commit 65c823b

Please sign in to comment.