Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconcile ClusterIngress into VirtualService #2189

Merged
merged 5 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/knative/pkg/signals"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
"github.com/knative/serving/pkg/reconciler/v1alpha1/clusteringress"
"github.com/knative/serving/pkg/reconciler/v1alpha1/configuration"
"github.com/knative/serving/pkg/reconciler/v1alpha1/labeler"
"github.com/knative/serving/pkg/reconciler/v1alpha1/revision"
Expand Down Expand Up @@ -132,6 +133,7 @@ func main() {
configurationInformer := servingInformerFactory.Serving().V1alpha1().Configurations()
revisionInformer := servingInformerFactory.Serving().V1alpha1().Revisions()
kpaInformer := servingInformerFactory.Autoscaling().V1alpha1().PodAutoscalers()
clusterIngressInformer := servingInformerFactory.Networking().V1alpha1().ClusterIngresses()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
coreServiceInformer := kubeInformerFactory.Core().V1().Services()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
Expand Down Expand Up @@ -178,6 +180,11 @@ func main() {
configurationInformer,
routeInformer,
),
clusteringress.NewController(
opt,
clusterIngressInformer,
virtualServiceInformer,
),
}

// Watch the logging config map and dynamically update logging levels.
Expand All @@ -200,6 +207,7 @@ func main() {
configurationInformer.Informer().HasSynced,
revisionInformer.Informer().HasSynced,
kpaInformer.Informer().HasSynced,
clusterIngressInformer.Informer().HasSynced,
imageInformer.Informer().HasSynced,
deploymentInformer.Informer().HasSynced,
coreServiceInformer.Informer().HasSynced,
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/serving/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ const (

// RouteLabelKey is the label key attached to a Configuration indicating by
// which Route it is configured as traffic target.
// The key can also be attached to ClusterIngress resources to indicate
// which Route triggered their creation.
RouteLabelKey = GroupName + "/route"

// RouteNamespaceLabelKey is the label key attached to a ClusterIngress indicating by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads little funny. Maybe:
RouteNamespaceLabelKey is the label key attached to a ClusterIngress by a Route to indicate which namespace the Route was created in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in a follow-up PR

// the Route in which namespace it is created.
RouteNamespaceLabelKey = GroupName + "/routeNamespace"

// RevisionLabelKey is the label key attached to k8s resources to indicate
// which Revision triggered their creation.
RevisionLabelKey = GroupName + "/revision"
Expand Down
10 changes: 7 additions & 3 deletions pkg/reconciler/testing/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type TableRow struct {
// WithReactors is a set of functions that are installed as Reactors for the execution
// of this row of the table-driven-test.
WithReactors []clientgotesting.ReactionFunc

// For cluster-scoped resources like ClusterIngress, it does not have to be
// in the same namespace with its child resources.
SkipNamespaceValidation bool
}

type Factory func(*testing.T, *TableRow) (controller.Reconciler, ActionRecorderList)
Expand All @@ -87,7 +91,7 @@ func (r *TableRow) Test(t *testing.T, factory Factory) {
continue
}
got := actions.Creates[i]
if got.GetNamespace() != expectedNamespace {
if !r.SkipNamespaceValidation && got.GetNamespace() != expectedNamespace {
t.Errorf("unexpected action[%d]: %#v", i, got)
}
obj := got.GetObject()
Expand Down Expand Up @@ -126,7 +130,7 @@ func (r *TableRow) Test(t *testing.T, factory Factory) {
if got.GetName() != want.GetName() {
t.Errorf("unexpected delete[%d]: %#v", i, got)
}
if got.GetNamespace() != expectedNamespace {
if !r.SkipNamespaceValidation && got.GetNamespace() != expectedNamespace {
t.Errorf("unexpected delete[%d]: %#v", i, got)
}
}
Expand All @@ -146,7 +150,7 @@ func (r *TableRow) Test(t *testing.T, factory Factory) {
if got.GetName() != want.GetName() {
t.Errorf("unexpected patch[%d]: %#v", i, got)
}
if got.GetNamespace() != expectedNamespace {
if !r.SkipNamespaceValidation && got.GetNamespace() != expectedNamespace {
t.Errorf("unexpected patch[%d]: %#v", i, got)
}
if diff := cmp.Diff(string(want.GetPatch()), string(got.GetPatch())); diff != "" {
Expand Down
221 changes: 221 additions & 0 deletions pkg/reconciler/v1alpha1/clusteringress/clusteringress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package clusteringress

import (
"context"
"reflect"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"

"github.com/knative/pkg/apis/istio/v1alpha3"
istioinformers "github.com/knative/pkg/client/informers/externalversions/istio/v1alpha3"
istiolisters "github.com/knative/pkg/client/listers/istio/v1alpha3"
"github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/serving/pkg/apis/networking"
"github.com/knative/serving/pkg/apis/networking/v1alpha1"
informers "github.com/knative/serving/pkg/client/informers/externalversions/networking/v1alpha1"
listers "github.com/knative/serving/pkg/client/listers/networking/v1alpha1"
"github.com/knative/serving/pkg/reconciler"
"github.com/knative/serving/pkg/reconciler/v1alpha1/clusteringress/resources"
"github.com/knative/serving/pkg/reconciler/v1alpha1/clusteringress/resources/names"
)

const controllerAgentName = "clusteringress-controller"

// Reconciler implements controller.Reconciler for ClusterIngress resources.
type Reconciler struct {
*reconciler.Base

// listers index properties about resources
clusterIngressLister listers.ClusterIngressLister
virtualServiceLister istiolisters.VirtualServiceLister
}

// Check that our Reconciler implements controller.Reconciler
var _ controller.Reconciler = (*Reconciler)(nil)

// NewController initializes the controller and is called by the generated code
// Registers eventhandlers to enqueue events.
func NewController(
opt reconciler.Options,
clusterIngressInformer informers.ClusterIngressInformer,
virtualServiceInformer istioinformers.VirtualServiceInformer,
) *controller.Impl {

c := &Reconciler{
Base: reconciler.NewBase(opt, controllerAgentName),
clusterIngressLister: clusterIngressInformer.Lister(),
virtualServiceLister: virtualServiceInformer.Lister(),
}
impl := controller.NewImpl(c, c.Logger, "ClusterIngresses")

c.Logger.Info("Setting up event handlers")
clusterIngressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

virtualServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueOwnerIngress(impl),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use EnqueueControllerOf and follow the pattern ?
If this is feasible, then we don't need enqueueOwnerIngress function here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a discussion with @mattmoor about this above: #2189 (comment)

Would like to hear your opinion on the point as well :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. Sorry I didn't see your discussion when I reviewed this part.
I get your point now. Agree that we probably need a generalized impl.EnqueueLabvelOf(key) method. we can do it later.

UpdateFunc: controller.PassNew(c.enqueueOwnerIngress(impl)),
})

return impl
}

// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the ClusterIngress resource
// with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.Logger.Errorf("invalid resource key: %s", key)
return nil
}
logger := logging.FromContext(ctx)

// Get the ClusterIngress resource with this name.
original, err := c.clusterIngressLister.Get(name)
if apierrs.IsNotFound(err) {
// The resource may no longer exist, in which case we stop processing.
logger.Errorf("clusteringress %q in work queue no longer exists", key)
return nil
} else if err != nil {
return err
}
// Don't modify the informers copy
ci := original.DeepCopy()

// Reconcile this copy of the ClusterIngress and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, ci)
if equality.Semantic.DeepEqual(original.Status, ci.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(ctx, ci); err != nil {
logger.Warn("Failed to update clusterIngress status", zap.Error(err))
c.Recorder.Eventf(ci, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update status for ClusterIngress %q: %v", ci.Name, err)
return err
}
return err
}

// Update the Status of the ClusterIngress. Caller is responsible for checking
// for semantic differences before calling.
func (c *Reconciler) updateStatus(ctx context.Context, ci *v1alpha1.ClusterIngress) (*v1alpha1.ClusterIngress, error) {
existing, err := c.clusterIngressLister.Get(ci.Name)
if err != nil {
return nil, err
}
// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, ci.Status) {
return existing, nil
}
existing.Status = ci.Status
// TODO: for CRD there's no updatestatus, so use normal update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should (unless we already have) make a tracking bug since CRDs now have /status:
https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/

Not blocking this PR, just jotting it down here.

Copy link
Contributor Author

@lichuqiang lichuqiang Oct 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also noticed this before, but have not got time to look deeper.
Not sure it's just cruft due to historic reasons, or currently there's still bug in UpdateStatus for CRDs that I'm not aware of .
Will open an issue for further discussion anyway.

updated, err := c.ServingClientSet.NetworkingV1alpha1().ClusterIngresses().Update(existing)
if err != nil {
return nil, err
}

c.Recorder.Eventf(ci, corev1.EventTypeNormal, "Updated", "Updated status for clusterIngress %q", ci.Name)
return updated, nil
}

func (c *Reconciler) reconcile(ctx context.Context, ci *v1alpha1.ClusterIngress) error {
logger := logging.FromContext(ctx)
ci.Status.InitializeConditions()
vs := resources.MakeVirtualService(ci)

logger.Infof("Reconciling clusterIngress :%v", ci)
logger.Info("Creating/Updating VirtualService")
if err := c.reconcileVirtualService(ctx, ci, vs); err != nil {
// TODO(lichuqiang): should we explicitly mark the ingress as unready
// when error reconciling VirtualService?
return err
}
// As underlying network programming (VirtualService now) is stateless,
// here we simply mark the ingress as ready if the VirtualService
// is successfully synced.
ci.Status.MarkNetworkConfigured()
ci.Status.MarkLoadBalancerReady([]v1alpha1.LoadBalancerIngressStatus{
{DomainInternal: names.K8sGatewayServiceFullname},
})
logger.Info("ClusterIngress successfully synced")
return nil
}

func (c *Reconciler) reconcileVirtualService(ctx context.Context, ci *v1alpha1.ClusterIngress,
desired *v1alpha3.VirtualService) error {
logger := logging.FromContext(ctx)
ns := desired.Namespace
name := desired.Name

vs, err := c.virtualServiceLister.VirtualServices(ns).Get(name)
if apierrs.IsNotFound(err) {
vs, err = c.SharedClientSet.NetworkingV1alpha3().VirtualServices(ns).Create(desired)
if err != nil {
logger.Error("Failed to create VirtualService", zap.Error(err))
c.Recorder.Eventf(ci, corev1.EventTypeWarning, "CreationFailed",
"Failed to create VirtualService %q/%q: %v", ns, name, err)
return err
}
c.Recorder.Eventf(ci, corev1.EventTypeNormal, "Created",
"Created VirtualService %q", desired.Name)
} else if err != nil {
return err
} else if !equality.Semantic.DeepEqual(vs.Spec, desired.Spec) {
vs.Spec = desired.Spec
vs, err = c.SharedClientSet.NetworkingV1alpha3().VirtualServices(ns).Update(vs)
if err != nil {
logger.Error("Failed to update VirtualService", zap.Error(err))
return err
}
c.Recorder.Eventf(desired, corev1.EventTypeNormal, "Updated",
"Updated status for VirtualService %q/%q", ns, name)
}

return nil
}

func (c *Reconciler) enqueueOwnerIngress(impl *controller.Impl) func(obj interface{}) {
return func(obj interface{}) {
vs, ok := obj.(*v1alpha3.VirtualService)
if !ok {
c.Logger.Infof("Ignoring non-VirtualService objects %v", obj)
return
}
// Check whether the VirtualService is referred by a ClusterIngress.
ingressName, ok := vs.Labels[networking.IngressLabelKey]
if !ok {
c.Logger.Infof("VirtualService %s/%s does not have a referring ingress", vs.Namespace, vs.Name)
return
}
impl.EnqueueKey(ingressName)
}
}
Loading