From 5d33bf9af51168639be16656523b63aa1a82dc3d Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Wed, 10 Apr 2019 14:11:00 -0700 Subject: [PATCH] SKS use one service (#3711) * Use generic pkg/resources helpers for endpoint counting * Move SKS helpers to a shared place. For #1997. This will be reused in HPA and KPA changes. * some transient state * remove public service and endpoint reconciliation * remove public service and endpoint reconciliation * formatting unification --- .../serverlessservice/serverlessservice.go | 40 ++-- .../serverlessservice_test.go | 174 +++--------------- 2 files changed, 50 insertions(+), 164 deletions(-) diff --git a/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go b/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go index e0e7f1e2349e..d0f4c1c4e7d2 100644 --- a/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go +++ b/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go @@ -23,6 +23,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" + perrors "github.com/pkg/errors" "github.com/knative/pkg/controller" "github.com/knative/pkg/logging" @@ -166,11 +167,9 @@ func (r *reconciler) reconcile(ctx context.Context, sks *netv1alpha1.ServerlessS sks.SetDefaults(ctx) sks.Status.InitializeConditions() - // TODO(#1997): implement: proxy mode, activator probing and positive handoff. + // TODO(#1997): implement: public service, proxy mode, activator probing and positive handoff. for i, fn := range []func(context.Context, *netv1alpha1.ServerlessService) error{ - r.reconcilePrivateService, // First make sure our data source is setup. - r.reconcilePublicService, // Make sure the service is setup. - r.reconcilePublicEndpoints, // Now populate endpoints, if there are healthy ones. + r.reconcilePrivateService, // First make sure our data source is setup. } { if err := fn(ctx, sks); err != nil { logger.Debugw(fmt.Sprintf("%d: reconcile failed", i), zap.Error(err)) @@ -294,21 +293,21 @@ func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alph sn := names.PrivateService(sks) svc, err := r.serviceLister.Services(sks.Namespace).Get(sn) if errors.IsNotFound(err) { - logger.Infof("K8s service %q does not exist; creating.", sn) + logger.Infof("K8s service %s does not exist; creating.", sn) sks.Status.MarkEndpointsNotReady("CreatingPrivateService") svc = resources.MakePrivateService(sks) _, err := r.KubeClientSet.CoreV1().Services(sks.Namespace).Create(svc) if err != nil { - logger.Errorw(fmt.Sprintf("Error creating K8s Service %s: ", sn), zap.Error(err)) + logger.Errorw(fmt.Sprint("Error creating K8s Service:", sn), zap.Error(err)) return err } - logger.Infof("Created K8s service: %q", sn) + logger.Infof("Created K8s service: %s", sn) } else if err != nil { - logger.Errorw(fmt.Sprintf("Error getting K8s Service %s: ", sn), zap.Error(err)) + logger.Errorw(fmt.Sprint("Error getting K8s Service:", sn), zap.Error(err)) return err } else if !metav1.IsControlledBy(svc, sks) { sks.Status.MarkEndpointsNotOwned("Service", sn) - return fmt.Errorf("SKS: %q does not own Service: %q", sks.Name, sn) + return fmt.Errorf("SKS: %s does not own Service: %s", sks.Name, sn) } tmpl := resources.MakePrivateService(sks) want := svc.DeepCopy() @@ -318,13 +317,30 @@ func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alph if !equality.Semantic.DeepEqual(svc.Spec, want.Spec) { sks.Status.MarkEndpointsNotReady("UpdatingPrivateService") - logger.Infof("Private K8s Service changed; reconciling: %s", sn) + logger.Info("Private K8s Service changed; reconciling:", sn) if _, err = r.KubeClientSet.CoreV1().Services(sks.Namespace).Update(want); err != nil { - logger.Errorw(fmt.Sprintf("Error updating private K8s Service %s: ", sn), zap.Error(err)) + logger.Errorw(fmt.Sprint("Error updating private K8s Service:", sn), zap.Error(err)) return err } } + + // TODO(#1997): temporarily we have one service since istio cannot handle our load. + // So they are smudged together. In the end this has to go. + eps, err := presources.FetchReadyAddressCount(r.endpointsLister, sks.Namespace, sn) + switch { + case err != nil: + return perrors.Wrapf(err, "error fetching endpoints %s/%s", sks.Namespace, sn) + case eps > 0: + logger.Infof("Endpoints %s/%s has %d ready endpoints", sks.Namespace, sn, eps) + sks.Status.MarkEndpointsReady() + default: + logger.Infof("Endpoints %s/%s has no ready endpoints", sks.Namespace, sn) + sks.Status.MarkEndpointsNotReady("NoHealthyBackends") + } + sks.Status.ServiceName = sn + // End TODO. + sks.Status.PrivateServiceName = sn - logger.Debugf("Done reconciling private K8s service %s", sn) + logger.Debug("Done reconciling private K8s service", sn) return nil } diff --git a/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice_test.go b/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice_test.go index 95a7eb5d4ec9..79567b8b1f51 100644 --- a/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice_test.go +++ b/pkg/reconciler/v1alpha1/serverlessservice/serverlessservice_test.go @@ -27,6 +27,7 @@ import ( informers "github.com/knative/serving/pkg/client/informers/externalversions" rpkg "github.com/knative/serving/pkg/reconciler" "github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources" + "github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources/names" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -80,50 +81,24 @@ func TestReconcile(t *testing.T) { Name: "steady state", Key: "steady/state", Objects: []runtime.Object{ - SKS("steady", "state", markHappy, WithPubService, WithPrivateService), + SKS("steady", "state", markHappy, withTempPubService, WithPrivateService), svcpub("steady", "state"), svcpriv("steady", "state"), endpointspub("steady", "state", WithSubsets), endpointspriv("steady", "state", WithSubsets), }, - }, { - Name: "pod change", - Key: "pod/change", - Objects: []runtime.Object{ - SKS("pod", "change", markHappy, WithPubService, WithPrivateService), - svcpub("pod", "change"), - svcpriv("pod", "change"), - endpointspub("pod", "change", WithSubsets), - endpointspriv("pod", "change", withOtherSubsets), - }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: endpointspub("pod", "change", withOtherSubsets), - }}, }, { Name: "user changes priv svc", Key: "private/svc-change", Objects: []runtime.Object{ - SKS("private", "svc-change", markHappy, WithPubService, WithPrivateService), + SKS("private", "svc-change", markHappy, withTempPubService, WithPrivateService), svcpub("private", "svc-change"), svcpriv("private", "svc-change", withTimeSelector), endpointspub("private", "svc-change", withOtherSubsets), endpointspriv("private", "svc-change", WithSubsets), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: svcpriv("private", "svc-change")}, { - Object: endpointspub("private", "svc-change", WithSubsets)}}, - }, { - Name: "user changes public svc", - Key: "public/svc-change", - Objects: []runtime.Object{ - SKS("public", "svc-change", markHappy, WithPubService, WithPrivateService), - svcpub("public", "svc-change", withTimeSelector), - svcpriv("public", "svc-change"), - endpointspub("public", "svc-change", WithSubsets), - endpointspriv("public", "svc-change", WithSubsets), - }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: svcpub("public", "svc-change"), + Object: svcpriv("private", "svc-change"), }}, }, { Name: "OnCreate-deployment-exists", @@ -135,11 +110,9 @@ func TestReconcile(t *testing.T) { }, WantCreates: []metav1.Object{ svcpriv("on", "cde"), - svcpub("on", "cde"), - endpointspub("on", "cde", WithSubsets), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: SKS("on", "cde", markHappy, WithPubService, WithPrivateService), + Object: SKS("on", "cde", markHappy, withTempPubService, WithPrivateService), }}, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "Updated", `Successfully updated ServerlessService "on/cde"`), @@ -153,11 +126,9 @@ func TestReconcile(t *testing.T) { }, WantCreates: []metav1.Object{ svcpriv("on", "cneps"), - svcpub("on", "cneps"), - endpointspub("on", "cneps"), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: SKS("on", "cneps", markNoEndpoints, WithPubService, WithPrivateService), + Object: SKS("on", "cneps", markNoEndpoints, withTempPubService, WithPrivateService), }}, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "Updated", `Successfully updated ServerlessService "on/cneps"`), @@ -179,49 +150,12 @@ func TestReconcile(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for create services"), }, - }, { - Name: "svc-fail-pub", - Key: "svc/fail2", - WantErr: true, - Objects: []runtime.Object{ - SKS("svc", "fail2"), - svcpriv("svc", "fail2"), - endpointspriv("svc", "fail2"), - }, - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("create", "services"), - }, - WantCreates: []metav1.Object{ - svcpub("svc", "fail2"), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for create services"), - }, - }, { - Name: "eps-fail-pub", - Key: "eps/fail3", - WantErr: true, - Objects: []runtime.Object{ - SKS("eps", "fail3"), - svcpriv("eps", "fail3"), - endpointspriv("eps", "fail3"), - }, - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("create", "endpoints"), - }, - WantCreates: []metav1.Object{ - svcpub("eps", "fail3"), - endpointspub("eps", "fail3"), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for create endpoints"), - }, }, { Name: "update-sks-fail", Key: "update-sks/fail4", WantErr: true, Objects: []runtime.Object{ - SKS("update-sks", "fail4", WithPubService, WithPrivateService), + SKS("update-sks", "fail4", withTempPubService, WithPrivateService), svcpub("update-sks", "fail4"), svcpriv("update-sks", "fail4"), endpointspub("update-sks", "fail4", WithSubsets), @@ -232,79 +166,31 @@ func TestReconcile(t *testing.T) { }, // We still record update, but it fails. WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: SKS("update-sks", "fail4", markHappy, WithPubService, WithPrivateService), + Object: SKS("update-sks", "fail4", markHappy, withTempPubService, WithPrivateService), }}, WantEvents: []string{ Eventf(corev1.EventTypeWarning, "UpdateFailed", "Failed to update status: inducing failure for update serverlessservices"), }, }, { - Name: "ronin-pub-service/fail5", - Key: "ronin-pub-service/fail5", - WantErr: true, - Objects: []runtime.Object{ - SKS("ronin-pub-service", "fail5", WithPubService, WithPrivateService), - svcpub("ronin-pub-service", "fail5", WithK8sSvcOwnersRemoved), - svcpriv("ronin-pub-service", "fail5"), - endpointspub("ronin-pub-service", "fail5", WithSubsets), - endpointspriv("ronin-pub-service", "fail5", WithSubsets), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", `InternalError: SKS: "fail5" does not own Service: "fail5-pub"`), - }, - }, { - Name: "ronin-priv-service/fail6", - Key: "ronin-priv-service/fail6", - WantErr: true, - Objects: []runtime.Object{ - SKS("ronin-priv-service", "fail6", WithPubService, WithPrivateService), - svcpub("ronin-priv-service", "fail6"), - svcpriv("ronin-priv-service", "fail6", WithK8sSvcOwnersRemoved), - endpointspub("ronin-priv-service", "fail6", WithSubsets), - endpointspriv("ronin-priv-service", "fail6", WithSubsets), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", `InternalError: SKS: "fail6" does not own Service: "fail6-priv"`), - }, - }, { - Name: "ronin-pub-eps/fail7", - Key: "ronin-pub-eps/fail7", + Name: "ronin-priv-service/fail5", + Key: "ronin-priv-service/fail5", WantErr: true, Objects: []runtime.Object{ - SKS("ronin-pub-eps", "fail7", WithPubService), - svcpub("ronin-pub-eps", "fail7"), - svcpriv("ronin-pub-eps", "fail7"), - endpointspub("ronin-pub-eps", "fail7", WithSubsets, WithEndpointsOwnersRemoved), - endpointspriv("ronin-pub-eps", "fail7", WithSubsets), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", `InternalError: SKS: "fail7" does not own Endpoints: "fail7-pub"`), - }, - }, { - Name: "update-svc-fail-pub", - Key: "update-svc/fail8", - WantErr: true, - Objects: []runtime.Object{ - SKS("update-svc", "fail8", WithPubService), - svcpub("update-svc", "fail8", withTimeSelector), - svcpriv("update-svc", "fail8"), - endpointspub("update-svc", "fail8", WithSubsets), - endpointspriv("update-svc", "fail8", WithSubsets), - }, - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("update", "services"), + SKS("ronin-priv-service", "fail5", withTempPubService, WithPrivateService), + svcpub("ronin-priv-service", "fail5"), + svcpriv("ronin-priv-service", "fail5", WithK8sSvcOwnersRemoved), + endpointspub("ronin-priv-service", "fail5", WithSubsets), + endpointspriv("ronin-priv-service", "fail5", WithSubsets), }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: svcpub("update-svc", "fail8"), - }}, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for update services"), + Eventf(corev1.EventTypeWarning, "UpdateFailed", `InternalError: SKS: fail5 does not own Service: fail5-priv`), }, }, { Name: "update-svc-fail-priv", Key: "update-svc/fail9", WantErr: true, Objects: []runtime.Object{ - SKS("update-svc", "fail9", WithPubService, WithPrivateService), + SKS("update-svc", "fail9", withTempPubService, WithPrivateService), svcpub("update-svc", "fail9"), svcpriv("update-svc", "fail9", withTimeSelector), endpointspub("update-svc", "fail9"), @@ -320,27 +206,6 @@ func TestReconcile(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for update services"), }, - }, { - Name: "update-eps-fail", - Key: "update-eps/failA", - WantErr: true, - Objects: []runtime.Object{ - SKS("update-eps", "failA", WithPubService, WithPrivateService), - svcpub("update-eps", "failA"), - svcpriv("update-eps", "failA"), - endpointspub("update-eps", "failA"), - endpointspriv("update-eps", "failA", WithSubsets), - }, - WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("update", "endpoints"), - }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: endpointspub("update-eps", "failA", WithSubsets), // The attempted update. - }}, - - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "UpdateFailed", "InternalError: inducing failure for update endpoints"), - }, }} defer ClearAllLoggers() @@ -415,3 +280,8 @@ func endpointspub(namespace, name string, eo ...EndpointsOption) *corev1.Endpoin func withTimeSelector(svc *corev1.Service) { svc.Spec.Selector = map[string]string{"pod-x": fmt.Sprintf("a-%d", time.Now().UnixNano())} } + +// TODO(vagababov): temp while we don't create separate public service. +func withTempPubService(sks *nv1a1.ServerlessService) { + sks.Status.ServiceName = names.PrivateService(sks) +}