Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert the change that removed the public service creation in SKS #3786

Merged
merged 5 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 40 additions & 58 deletions pkg/reconciler/v1alpha1/serverlessservice/serverlessservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"fmt"
"reflect"

"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
perrors "github.com/pkg/errors"
"go.uber.org/zap"

"github.com/knative/pkg/apis"
"github.com/knative/pkg/controller"
Expand All @@ -37,7 +37,7 @@ import (
"github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources"
"github.com/knative/serving/pkg/reconciler/v1alpha1/serverlessservice/resources/names"
presources "github.com/knative/serving/pkg/resources"
"go.uber.org/zap"

autoscalingapi "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -174,9 +174,11 @@ func (r *reconciler) reconcile(ctx context.Context, sks *netv1alpha1.ServerlessS
sks.SetDefaults(ctx)
sks.Status.InitializeConditions()

// TODO(#1997): implement: public service, proxy mode, activator probing and positive handoff.
// TODO(#1997): implement: proxy mode, activator probing and positive handoff.
for i, fn := range []func(context.Context, *netv1alpha1.ServerlessService) error{
r.reconcilePrivateService, // First make sure our data source is setup.
r.reconcilePublicService,
r.reconcilePublicEndpoints,
} {
if err := fn(ctx, sks); err != nil {
logger.Debugw(fmt.Sprintf("%d: reconcile failed", i), zap.Error(err))
Expand Down Expand Up @@ -206,39 +208,37 @@ func (r *reconciler) reconcilePublicService(ctx context.Context, sks *netv1alpha
sn := names.PublicService(sks.Name)
srv, err := r.serviceLister.Services(sks.Namespace).Get(sn)
if errors.IsNotFound(err) {
logger.Infof("K8s service %q does not exist; creating.", sn)
logger.Infof("K8s service %s does not exist; creating.", sn)
// We've just created the service, so it has no endpoints.
sks.Status.MarkEndpointsNotReady("CreatingPublicService")
srv = resources.MakePublicService(sks)
_, err := r.KubeClientSet.CoreV1().Services(sks.Namespace).Create(srv)
if err != nil {
logger.Errorw(fmt.Sprintf("Error creating K8s Service %s: ", sn), zap.Error(err))
logger.Errorw(fmt.Sprint("Error creating K8s Service:", sn), zap.Error(err))
return err
}
logger.Infof("Created K8s service: %q", sn)
logger.Info("Created K8s service:", sn)
} else if err != nil {
logger.Errorw(fmt.Sprintf("Error getting K8s Service %s: ", sn), zap.Error(err))
logger.Errorw(fmt.Sprint("Error getting K8s Service:", sn), zap.Error(err))
return err
} else if !metav1.IsControlledBy(srv, sks) {
sks.Status.MarkEndpointsNotOwned("Service", sn)
return fmt.Errorf("SKS: %q does not own Service: %q", sks.Name, sn)
} else {
tmpl := resources.MakePublicService(sks)
want := srv.DeepCopy()
want.Spec.Ports = tmpl.Spec.Ports
want.Spec.Selector = nil

sks.Status.MarkEndpointsNotReady("UpdatingPublicService")
if !equality.Semantic.DeepEqual(want.Spec, srv.Spec) {
logger.Infof("Public K8s Service changed; reconciling: %s", sn)
if _, err = r.KubeClientSet.CoreV1().Services(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprintf("Error updating public K8s Service %s: ", sn), zap.Error(err))
return err
}
return fmt.Errorf("SKS: %s does not own Service: %s", sks.Name, sn)
}
tmpl := resources.MakePublicService(sks)
want := srv.DeepCopy()
want.Spec.Ports = tmpl.Spec.Ports
want.Spec.Selector = nil

if !equality.Semantic.DeepEqual(want.Spec, srv.Spec) {
logger.Info("Public K8s Service changed; reconciling: ", sn)
if _, err = r.KubeClientSet.CoreV1().Services(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprint("Error updating public K8s Service:", sn), zap.Error(err))
return err
}
}
sks.Status.ServiceName = sn
logger.Debugf("Done reconciling public K8s service %s", sn)
logger.Debug("Done reconciling public K8s service: ", sn)
return nil
}

Expand All @@ -250,10 +250,9 @@ func (r *reconciler) reconcilePublicEndpoints(ctx context.Context, sks *netv1alp
psn := names.PrivateService(sks.Name)
srcEps, err := r.endpointsLister.Endpoints(sks.Namespace).Get(psn)
if err != nil {
logger.Error(fmt.Sprintf("Error obtaining private service endpoints: %s", psn), zap.Error(err))
logger.Errorw(fmt.Sprint("Error obtaining private service endpoints:", psn), zap.Error(err))
return err
}
logger.Debugf("Public endpoints: %s", spew.Sprint(srcEps))

sn := names.PublicService(sks.Name)
eps, err := r.endpointsLister.Endpoints(sks.Namespace).Get(sn)
Expand All @@ -263,35 +262,34 @@ func (r *reconciler) reconcilePublicEndpoints(ctx context.Context, sks *netv1alp
sks.Status.MarkEndpointsNotReady("CreatingPublicEndpoints")
eps, err = r.KubeClientSet.CoreV1().Endpoints(sks.Namespace).Create(resources.MakePublicEndpoints(sks, srcEps))
if err != nil {
logger.Errorw(fmt.Sprintf("Error creating K8s Endpoints %s: ", sn), zap.Error(err))
logger.Errorw(fmt.Sprint("Error creating K8s Endpoints:", sn), zap.Error(err))
return err
}
logger.Infof("Created K8s Endpoints: %q", sn)
logger.Info("Created K8s Endpoints: ", sn)
} else if err != nil {
logger.Errorw(fmt.Sprintf("Error getting K8s Endpoints %s: ", sn), zap.Error(err))
logger.Errorw(fmt.Sprint("Error getting K8s Endpoints:", sn), zap.Error(err))
return err
} else if !metav1.IsControlledBy(eps, sks) {
sks.Status.MarkEndpointsNotOwned("Endpoints", sn)
return fmt.Errorf("SKS: %q does not own Endpoints: %q", sks.Name, sn)
} else {
want := eps.DeepCopy()
want.Subsets = srcEps.Subsets

if !equality.Semantic.DeepEqual(want.Subsets, eps.Subsets) {
logger.Infof("Public K8s Endpoints changed; reconciling: %s", sn)
if _, err = r.KubeClientSet.CoreV1().Endpoints(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprintf("Error updating public K8s Endpoints %s: ", sn), zap.Error(err))
return err
}
return fmt.Errorf("SKS: %s does not own Endpoints: %s", sks.Name, sn)
}
want := eps.DeepCopy()
want.Subsets = srcEps.Subsets

if !equality.Semantic.DeepEqual(want.Subsets, eps.Subsets) {
logger.Info("Public K8s Endpoints changed; reconciling: ", sn)
if _, err = r.KubeClientSet.CoreV1().Endpoints(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprint("Error updating public K8s Endpoints:", sn), zap.Error(err))
return err
}
}
if r := presources.ReadyAddressCount(eps); r > 0 {
sks.Status.MarkEndpointsReady()
} else {
logger.Info("Endpoints %s/%s has no ready endpoints")
logger.Info("Endpoints %s has no ready endpoints", sn)
sks.Status.MarkEndpointsNotReady("NoHealthyBackends")
}
logger.Debugf("Done reconciling public K8s endpoints %s", sn)
logger.Debug("Done reconciling public K8s endpoints: ", sn)
return nil
}

Expand All @@ -314,7 +312,7 @@ func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alph
logger.Errorw(fmt.Sprint("Error creating K8s Service:", sn), zap.Error(err))
return err
}
logger.Infof("Created K8s service: %s", sn)
logger.Info("Created K8s service: ", sn)
} else if err != nil {
logger.Errorw(fmt.Sprint("Error getting K8s Service:", sn), zap.Error(err))
return err
Expand All @@ -330,29 +328,13 @@ func (r *reconciler) reconcilePrivateService(ctx context.Context, sks *netv1alph

if !equality.Semantic.DeepEqual(svc.Spec, want.Spec) {
sks.Status.MarkEndpointsNotReady("UpdatingPrivateService")
logger.Info("Private K8s Service changed; reconciling:", sn)
logger.Info("Private K8s Service changed; reconciling: ", sn)
if _, err = r.KubeClientSet.CoreV1().Services(sks.Namespace).Update(want); err != nil {
logger.Errorw(fmt.Sprint("Error updating private K8s Service:", sn), zap.Error(err))
return err
}
}

// TODO(#1997): temporarily we have one service since istio cannot handle our load.
// So they are smudged together. In the end this has to go.
eps, err := presources.FetchReadyAddressCount(r.endpointsLister, sks.Namespace, sn)
switch {
case err != nil:
return perrors.Wrapf(err, "error fetching endpoints %s/%s", sks.Namespace, sn)
case eps > 0:
logger.Infof("Endpoints %s/%s has %d ready endpoints", sks.Namespace, sn, eps)
sks.Status.MarkEndpointsReady()
default:
logger.Infof("Endpoints %s/%s has no ready endpoints", sks.Namespace, sn)
sks.Status.MarkEndpointsNotReady("NoHealthyBackends")
}
sks.Status.ServiceName = sn
// End TODO.

sks.Status.PrivateServiceName = sn
logger.Debug("Done reconciling private K8s service", sn)
return nil
Expand Down
Loading