Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync one ingress #106

Merged
merged 10 commits into from
Jan 29, 2018
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
neg "k8s.io/ingress-gce/pkg/networkendpointgroup"
neg "k8s.io/ingress-gce/pkg/neg"

"k8s.io/ingress-gce/cmd/glbc/app"
"k8s.io/ingress-gce/pkg/flags"
Expand Down
8 changes: 4 additions & 4 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/networkendpointgroup"
"k8s.io/ingress-gce/pkg/neg"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -58,7 +58,7 @@ var (
)

func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) (*Backends, healthchecks.HealthCheckProvider) {
negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
healthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestBackendPoolSync(t *testing.T) {
func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
namespace, name, port := "ns", "name", "port"
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNEG := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
Expand Down
63 changes: 58 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,88 @@ package context
import (
"time"

"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

// ControllerContext holds
type ControllerContext struct {
kubeClient kubernetes.Interface

IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer

// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventRecorder per namespace?

Copy link
Contributor

@nicksardo nicksardo Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this too. The service controller doesn't specify the namespace:

https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/service/service_controller.go#L118

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this for now, I'll remove in a later commit once we resolve the unit test log spam

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

}

// NewControllerContext returns a new shared set of informers.
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}
context := &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
kubeClient: kubeClient,
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, newIndexer()),
recorders: map[string]record.EventRecorder{},
}
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer())
}

return context
}

// HasSynced returns true if all relevant informers has been synced.
func (ctx *ControllerContext) HasSynced() bool {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

funcs := []func() bool{
ctx.IngressInformer.HasSynced,
ctx.ServiceInformer.HasSynced,
ctx.PodInformer.HasSynced,
ctx.NodeInformer.HasSynced,
}
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}
for _, f := range funcs {
if !f() {
return false
}
}
return true
}

func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
if rec, ok := ctx.recorders[ns]; ok {
return rec
}

broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
Interface: ctx.kubeClient.Core().Events(ns),
})
rec := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"})
ctx.recorders[ns] = rec

return rec
}

// Start all of the informers.
func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.IngressInformer.Run(stopCh)
Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (c *ClusterManager) shutdown() error {
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, firewallPorts []int64) ([]*compute.InstanceGroup, error) {
glog.V(4).Infof("Checkpoint %q, len(lbs)=%v, len(nodeNames)=%v, lne(backendServicePorts)=%v, len(namedPorts)=%v, len(firewallPorts)=%v", len(lbs), len(nodeNames), len(backendServicePorts), len(namedPorts), len(firewallPorts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided only 5 values - expected 6.

Typo: "lne(backendServicePorts)"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
Expand Down Expand Up @@ -172,15 +174,17 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// This method ignores googleapi 404 errors (StatusNotFound). (line 155)
Is this true? If so, where does it happen in this function?


// TODO(ingress#120): Move this to the backend pool so it mirrors creation
var igErr error
if len(lbNames) == 0 {
igName := c.ClusterNamer.InstanceGroup()
glog.Infof("Deleting instance group %v", igName)
igErr = c.instancePool.DeleteInstanceGroup(igName)
if err := c.instancePool.DeleteInstanceGroup(igName); err != err {
return err
}
}
if igErr != nil {
return igErr
if len(lbNames) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge it with the previous if statement?

Add a comment to say if there is no lbs. Remove instance group and firewall

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

c.firewallPool.Shutdown()
}

return nil
}

Expand Down
Loading