Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKS use one service #3711

Merged
merged 9 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/reconciler/v1alpha1/revision/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ func getBuildDoneCondition(build *duckv1alpha1.KResource) *duckv1alpha1.Conditio
return nil
}

func isServiceReady(e *corev1.Endpoints) bool {
for _, es := range e.Subsets {
if len(es.Addresses) > 0 {
return true
}
}
return false
}

func hasDeploymentTimedOut(deployment *appsv1.Deployment) bool {
// as per https://kubernetes.io/docs/concepts/workloads/controllers/deployment
for _, cond := range deployment.Status.Conditions {
Expand Down
37 changes: 0 additions & 37 deletions pkg/reconciler/v1alpha1/revision/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,43 +92,6 @@ func TestGetBuildDoneCondition(t *testing.T) {

}

func TestGetIsServiceReady(t *testing.T) {
tests := []struct {
description string
endpoints *corev1.Endpoints
ready bool
}{{
description: "no subsets",
endpoints: &corev1.Endpoints{},
}, {
description: "subset no address",
endpoints: &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{},
}},
},
}, {
description: "subset with address",
endpoints: &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
}},
}},
},
ready: true,
}}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ready := isServiceReady(test.endpoints)
if ready != test.ready {
t.Errorf("getIsServiceReady(%v) = %v, want %v", test.endpoints, ready, test.ready)
}
})
}
}

func TestGetDeploymentProgressCondition(t *testing.T) {
tests := []struct {
description string
Expand Down
7 changes: 5 additions & 2 deletions pkg/reconciler/v1alpha1/revision/reconcile_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

"go.uber.org/zap"

"github.com/knative/pkg/kmp"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/logging/logkey"
Expand All @@ -29,7 +31,8 @@ import (
"github.com/knative/serving/pkg/reconciler/v1alpha1/revision/config"
"github.com/knative/serving/pkg/reconciler/v1alpha1/revision/resources"
resourcenames "github.com/knative/serving/pkg/reconciler/v1alpha1/revision/resources/names"
"go.uber.org/zap"
presources "github.com/knative/serving/pkg/resources"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -237,7 +240,7 @@ func (c *Reconciler) reconcileService(ctx context.Context, rev *v1alpha1.Revisio
// If the endpoints resource indicates that the Service it sits in front of is ready,
// then surface this in our Revision status as resources available (pods were scheduled)
// and container healthy (endpoints should be gated by any provided readiness checks).
if isServiceReady(endpoints) {
if presources.ReadyAddressCount(endpoints) > 0 {
rev.Status.MarkResourcesAvailable()
rev.Status.MarkContainerHealthy()
} else if !rev.Status.IsActivationRequired() {
Expand Down
131 changes: 22 additions & 109 deletions pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"fmt"
"reflect"

"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
perrors "github.com/pkg/errors"

"github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/system"
Expand All @@ -33,6 +34,7 @@ import (
rbase "github.com/knative/serving/pkg/reconciler"
"github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources"
"github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources/names"
presources "github.com/knative/serving/pkg/resources"
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -167,11 +169,9 @@ func (r *reconciler) reconcile(ctx context.Context, sks *netv1alpha1.ServerlessS
sks.SetDefaults(ctx)
sks.Status.InitializeConditions()

// TODO(#1997): implement: proxy mode, activator probing and positive handoff.
// TODO(#1997): implement: public service, proxy mode, activator probing and positive handoff.
for i, fn := range []func(context.Context, *netv1alpha1.ServerlessService) error{
r.reconcilePrivateService, // First make sure our data source is setup.
r.reconcilePublicService, // Make sure the service is setup.
r.reconcilePublicEndpoints, // Now populate endpoints, if there are healthy ones.
r.reconcilePrivateService, // First make sure our data source is setup.
} {
if err := fn(ctx, sks); err != nil {
logger.Debugw(fmt.Sprintf("%d: reconcile failed", i), zap.Error(err))
Expand All @@ -196,110 +196,6 @@ func (r *reconciler) updateStatus(sks *netv1alpha1.ServerlessService) (*netv1alp
return r.ServingClientSet.NetworkingV1alpha1().ServerlessServices(sks.Namespace).UpdateStatus(original)
}

func (r *reconciler) reconcilePublicService(ctx context.Context, sks *netv1alpha1.ServerlessService) error {
logger := logging.FromContext(ctx)
sn := names.PublicService(sks)
srv, err := r.serviceLister.Services(sks.Namespace).Get(sn)
if errors.IsNotFound(err) {
logger.Infof("K8s service %q does not exist; creating.", sn)
// We've just created the service, so it has no endpoints.
sks.Status.MarkEndpointsNotReady("CreatingPublicService")
srv = resources.MakePublicService(sks)
_, err := r.KubeClientSet.CoreV1().Services(sks.Namespace).Create(srv)
if err != nil {
logger.Errorw(fmt.Sprintf("Error creating K8s Service %s: ", sn), zap.Error(err))
return err
}
logger.Infof("Created K8s service: %q", sn)
} else if err != nil {
logger.Errorw(fmt.Sprintf("Error getting K8s Service %s: ", sn), zap.Error(err))
return err
} else if !metav1.IsControlledBy(srv, sks) {
sks.Status.MarkEndpointsNotOwned("Service", sn)
return fmt.Errorf("SKS: %q does not own Service: %q", sks.Name, sn)
} else {
tmpl := resources.MakePublicService(sks)
want := srv.DeepCopy()
want.Spec.Ports = tmpl.Spec.Ports
want.Spec.Selector = nil

sks.Status.MarkEndpointsNotReady("UpdatingPublicService")
if !equality.Semantic.DeepEqual(want.Spec, srv.Spec) {
logger.Infof("Public K8s Service changed; reconciling: %s", sn)
if _, err = r.KubeClientSet.CoreV1().Services(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprintf("Error updating public K8s Service %s: ", sn), zap.Error(err))
return err
}
}
}
sks.Status.ServiceName = sn
logger.Debugf("Done reconciling public K8s service %s", sn)
return nil
}

func (r *reconciler) reconcilePublicEndpoints(ctx context.Context, sks *netv1alpha1.ServerlessService) error {
logger := logging.FromContext(ctx)

// Service and Endpoints have the same name.
// Get private endpoints first, since if they are not available there's nothing we can do.
psn := names.PrivateService(sks)
srcEps, err := r.endpointsLister.Endpoints(sks.Namespace).Get(psn)
if err != nil {
logger.Error(fmt.Sprintf("Error obtaining private service endpoints: %s", psn), zap.Error(err))
return err
}
logger.Debugf("Public endpoints: %s", spew.Sprint(srcEps))

sn := names.PublicService(sks)
eps, err := r.endpointsLister.Endpoints(sks.Namespace).Get(sn)

if errors.IsNotFound(err) {
logger.Infof("K8s endpoints %q does not exist; creating.", sn)
sks.Status.MarkEndpointsNotReady("CreatingPublicEndpoints")
eps, err = r.KubeClientSet.CoreV1().Endpoints(sks.Namespace).Create(resources.MakePublicEndpoints(sks, srcEps))
if err != nil {
logger.Errorw(fmt.Sprintf("Error creating K8s Endpoints %s: ", sn), zap.Error(err))
return err
}
logger.Infof("Created K8s Endpoints: %q", sn)
} else if err != nil {
logger.Errorw(fmt.Sprintf("Error getting K8s Endpoints %s: ", sn), zap.Error(err))
return err
} else if !metav1.IsControlledBy(eps, sks) {
sks.Status.MarkEndpointsNotOwned("Endpoints", sn)
return fmt.Errorf("SKS: %q does not own Endpoints: %q", sks.Name, sn)
} else {
want := eps.DeepCopy()
want.Subsets = srcEps.Subsets

if !equality.Semantic.DeepEqual(want.Subsets, eps.Subsets) {
logger.Infof("Public K8s Endpoints changed; reconciling: %s", sn)
if _, err = r.KubeClientSet.CoreV1().Endpoints(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprintf("Error updating public K8s Endpoints %s: ", sn), zap.Error(err))
return err
}
}
}
if hasEndpoints(eps) {
sks.Status.MarkEndpointsReady()
} else {
sks.Status.MarkEndpointsNotReady("NoHealthyBackends")
}
logger.Debugf("Done reconciling public K8s endpoints %s", sn)
return nil
}

// hasEndpoints returns true if Endpoints resource has at least one endpoint.
func hasEndpoints(eps *corev1.Endpoints) bool {
for _, ss := range eps.Subsets {
if len(ss.Addresses) > 0 {
return true
}

}
return false
}

func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alpha1.ServerlessService) error {
logger := logging.FromContext(ctx)
sn := names.PrivateService(sks)
Expand Down Expand Up @@ -335,6 +231,23 @@ func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alph
return err
}
}

// TODO(#1997): temporarily we have one service since istio cannot handle our load.
// So they are smudged together. In the end this has to go.
eps, err := presources.FetchReadyAddressCount(r.endpointsLister, sks.Namespace, sn)
switch {
case err != nil:
return perrors.Wrapf(err, "error fetching endpoints %s/%s", sks.Namespace, sn)
case eps > 0:
logger.Infof("Endpoints %s/%s has %d ready endpoints", sks.Namespace, sn, eps)
sks.Status.MarkEndpointsReady()
default:
logger.Infof("Endpoints %s/%s has no ready endpoints", sks.Namespace, sn)
sks.Status.MarkEndpointsNotReady("NoHealthyBackends")
}
sks.Status.ServiceName = sn
// End TODO.

sks.Status.PrivateServiceName = sn
logger.Debugf("Done reconciling private K8s service %s", sn)
return nil
Expand Down
Loading