diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index bf5709b82..1ac59862f 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -247,6 +247,7 @@ func (c *Cluster) Ping() error { return err } +// Export exports cluster resources func (c *Cluster) Export() ([]byte, error) { var config bytes.Buffer list, err := c.client.Namespaces().List(meta_v1.ListOptions{}) diff --git a/cluster/kubernetes/resourcekinds.go b/cluster/kubernetes/resourcekinds.go index dd26a8d3d..217a2cb74 100644 --- a/cluster/kubernetes/resourcekinds.go +++ b/cluster/kubernetes/resourcekinds.go @@ -85,7 +85,7 @@ func (dk *deploymentKind) getPodControllers(c *Cluster, namespace string) ([]pod } var podControllers []podController - for i, _ := range deployments.Items { + for i := range deployments.Items { podControllers = append(podControllers, makeDeploymentPodController(&deployments.Items[i])) } @@ -95,6 +95,7 @@ func (dk *deploymentKind) getPodControllers(c *Cluster, namespace string) ([]pod func makeDeploymentPodController(deployment *apiext.Deployment) podController { var status string objectMeta, deploymentStatus := deployment.ObjectMeta, deployment.Status + if deploymentStatus.ObservedGeneration >= objectMeta.Generation { // the definition has been updated; now let's see about the replicas updated, wanted := deploymentStatus.UpdatedReplicas, *deployment.Spec.Replicas diff --git a/daemon/loop.go b/daemon/loop.go index 4f14b3cf9..cf9676a4d 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -11,6 +11,7 @@ import ( "sync" "context" + "github.com/weaveworks/flux" "github.com/weaveworks/flux/event" "github.com/weaveworks/flux/git" diff --git a/policy/policy.go b/policy/policy.go index 6d02dda9a..a54bb1a21 100644 --- a/policy/policy.go +++ b/policy/policy.go @@ -90,8 +90,9 @@ func clone(s Set) Set { return newMap } +// Contains method determines if a resource has a particular policy present func (s Set) Contains(needle Policy) bool { - for p, _ := range s { + for p := range s { if p == needle { return true } diff --git a/sync/mock.go b/sync/mock.go new file mode 100644 index 000000000..cd9d5b466 --- /dev/null +++ b/sync/mock.go @@ -0,0 +1,56 @@ +package sync + +import ( + "github.com/weaveworks/flux" + "github.com/weaveworks/flux/policy" +) + +type rsc struct { + bytes []byte + Kind string + Meta struct { + Namespace string + Name string + } +} + +type rscIgnorePolicy struct { + rsc +} + +func (rs rsc) Source() string { + return "" +} + +func (rs rsc) Bytes() []byte { + return []byte{} +} + +func (rs rsc) ResourceID() flux.ResourceID { + return flux.MakeResourceID(rs.Meta.Namespace, rs.Kind, rs.Meta.Name) +} + +func (rs rsc) Policy() policy.Set { + p := policy.Set{} + return p +} + +func (ri rscIgnorePolicy) Policy() policy.Set { + p := policy.Set{} + p[policy.Ignore] = "true" + return p +} + +func mockResourceWithoutIgnorePolicy(kind, namespace, name string) rsc { + r := rsc{Kind: kind} + r.Meta.Namespace = namespace + r.Meta.Name = name + return r +} + +func mockResourceWithIgnorePolicy(kind, namespace, name string) rscIgnorePolicy { + ri := rscIgnorePolicy{rsc{Kind: kind}} + ri.Meta.Namespace = namespace + ri.Meta.Name = name + return ri +} diff --git a/sync/sync.go b/sync/sync.go index 5c096e552..c7dba38d8 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -9,10 +9,11 @@ import ( "github.com/weaveworks/flux/resource" ) -// Synchronise the cluster to the files in a directory +// Sync synchronises the cluster to the files in a directory func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool, logger log.Logger) error { // Get a map of resources defined in the cluster clusterBytes, err := clus.Export() + if err != nil { return errors.Wrap(err, "exporting resource defs from cluster") } @@ -26,38 +27,84 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus // to figuring out what's changed, and applying that. We're // relying on Kubernetes to decide for each application if it is a // no-op. - var sync cluster.SyncDef + sync := cluster.SyncDef{} + + nsClusterResources, otherClusterResources := separateResourcesByType(clusterResources) + nsRepoResources, otherRepoResources := separateResourcesByType(repoResources) + // First tackle resources that are not Namespace kind, in case we are deleting the Namespace as well + // Deleting a Namespace first, then a resource in this namespace causes an error + + // DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and + // other pods unless the relevant manifests are part of the user repo. Needs a lot of thought + // before this cleanup cluster feature can be unleashed on the world. if deletes { - for id, res := range clusterResources { - if res.Policy().Contains(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "delete") - continue - } - if _, ok := repoResources[id]; !ok { - sync.Actions = append(sync.Actions, cluster.SyncAction{ - ResourceID: id, - Delete: res.Bytes(), - }) - } + for id, res := range otherClusterResources { + prepareSyncDelete(logger, repoResources, id, res, &sync) + } + for id, res := range nsClusterResources { + prepareSyncDelete(logger, repoResources, id, res, &sync) } } - for id, res := range repoResources { - if res.Policy().Contains(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "apply") - continue - } - if cres, ok := clusterResources[id]; ok { - if cres.Policy().Contains(policy.Ignore) { - logger.Log("resource", res.ResourceID(), "ignore", "apply") - continue - } + // To avoid errors due to a non existent namespace if a resource in that namespace is created first, + // create Namespace objects first + for id, res := range nsRepoResources { + prepareSyncApply(logger, clusterResources, id, res, &sync) + } + for id, res := range otherRepoResources { + prepareSyncApply(logger, clusterResources, id, res, &sync) + } + + return clus.Sync(sync) +} + +func separateResourcesByType(resources map[string]resource.Resource) (map[string]resource.Resource, map[string]resource.Resource) { + if len(resources) == 0 { + return nil, nil + } + nsResources := make(map[string]resource.Resource) + otherResources := make(map[string]resource.Resource) + for id, res := range resources { + _, kind, _ := res.ResourceID().Components() + if kind == "namespace" { + nsResources[id] = res + } else { + otherResources[id] = res } + } + return nsResources, otherResources +} + +func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) { + if len(repoResources) == 0 { + return + } + if res.Policy().Contains(policy.Ignore) { + logger.Log("resource", res.ResourceID(), "ignore", "delete") + return + } + if _, ok := repoResources[id]; !ok { sync.Actions = append(sync.Actions, cluster.SyncAction{ ResourceID: id, - Apply: res.Bytes(), + Delete: res.Bytes(), }) } - return clus.Sync(sync) +} + +func prepareSyncApply(logger log.Logger, clusterResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) { + if res.Policy().Contains(policy.Ignore) { + logger.Log("resource", res.ResourceID(), "ignore", "apply") + return + } + if cres, ok := clusterResources[id]; ok { + if cres.Policy().Contains(policy.Ignore) { + logger.Log("resource", res.ResourceID(), "ignore", "apply") + return + } + } + sync.Actions = append(sync.Actions, cluster.SyncAction{ + ResourceID: id, + Apply: res.Bytes(), + }) } diff --git a/sync/sync_test.go b/sync/sync_test.go index b9cd4986c..f15c5d0d5 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -66,6 +66,189 @@ func TestSync(t *testing.T) { checkClusterMatchesFiles(t, manifests, clus, checkout.ManifestDir()) } +func TestSeparateByType(t *testing.T) { + var tests = []struct { + msg string + resMap map[string]resource.Resource + expectedNS map[string]resource.Resource + expectedOthers map[string]resource.Resource + }{ + { + msg: "No resources", + resMap: make(map[string]resource.Resource), + expectedNS: nil, + expectedOthers: nil, + }, { + msg: "Only namespace resources", + resMap: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + }, + expectedNS: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + }, + expectedOthers: make(map[string]resource.Resource), + }, { + msg: "Only non-namespace resources", + resMap: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("deployment", "ns1", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("deployment", "ns2", "ns3"), + }, + expectedNS: make(map[string]resource.Resource), + expectedOthers: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("deployment", "ns1", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("deployment", "ns2", "ns3"), + }, + }, { + msg: "Mixture of resources", + resMap: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), + "res4": mockResourceWithoutIgnorePolicy("secret", "ns1", "ns2"), + "res5": mockResourceWithoutIgnorePolicy("service", "ns2", "ns2"), + }, + expectedNS: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + }, + expectedOthers: map[string]resource.Resource{ + "res3": mockResourceWithoutIgnorePolicy("deployment", "default", "ns1"), + "res4": mockResourceWithoutIgnorePolicy("secret", "ns1", "ns2"), + "res5": mockResourceWithoutIgnorePolicy("service", "ns2", "ns2"), + }, + }, + } + + for _, sc := range tests { + r1, r2 := separateResourcesByType(sc.resMap) + + if !reflect.DeepEqual(sc.expectedNS, r1) { + t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expectedNS, r1) + } + if !reflect.DeepEqual(sc.expectedOthers, r2) { + t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expectedOthers, r2) + } + } +} + +func TestPrepareSyncDelete(t *testing.T) { + var tests = []struct { + msg string + repoRes map[string]resource.Resource + id string + res resource.Resource + expected *cluster.SyncDef + }{ + { + msg: "No repo resources provided during sync delete", + repoRes: map[string]resource.Resource{}, + id: "res7", + res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{}, + }, + { + msg: "No policy to ignore in place during sync delete", + repoRes: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), + "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), + "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), + }, + id: "res7", + res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{}, + }, + { + msg: "No policy to ignore during sync delete", + repoRes: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), + "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), + "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), + }, + id: "res7", + res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{ResourceID: "res7", Delete: cluster.ResourceDef{}, Apply: cluster.ResourceDef(nil)}}}, + }, + } + + logger := log.NewNopLogger() + for _, sc := range tests { + sync := &cluster.SyncDef{} + prepareSyncDelete(logger, sc.repoRes, sc.id, sc.res, sync) + + if !reflect.DeepEqual(sc.expected, sync) { + t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) + } + } +} + +func TestPrepareSyncApply(t *testing.T) { + var tests = []struct { + msg string + clusRes map[string]resource.Resource + id string + res resource.Resource + expected *cluster.SyncDef + }{ + { + msg: "No repo resources provided during sync apply", + clusRes: map[string]resource.Resource{}, + id: "res1", + res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{}, + }, + { + msg: "No policy to ignore in place during sync apply", + clusRes: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), + "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), + "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), + }, + id: "res7", + res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{}, + }, + { + msg: "No policy to ignore during sync apply", + clusRes: map[string]resource.Resource{ + "res1": mockResourceWithoutIgnorePolicy("namespace", "ns1", "ns1"), + "res2": mockResourceWithoutIgnorePolicy("namespace", "ns2", "ns2"), + "res3": mockResourceWithoutIgnorePolicy("namespace", "ns3", "ns3"), + "res4": mockResourceWithoutIgnorePolicy("deployment", "ns1", "d1"), + "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), + "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), + }, + id: "res7", + res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), + expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{ResourceID: "res7", Apply: cluster.ResourceDef{}, Delete: cluster.ResourceDef(nil)}}}, + }, + } + + logger := log.NewNopLogger() + for _, sc := range tests { + sync := &cluster.SyncDef{} + prepareSyncApply(logger, sc.clusRes, sc.id, sc.res, sync) + + if !reflect.DeepEqual(sc.expected, sync) { + t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) + } + } +} + // --- var gitconf = git.Config{