Skip to content

Commit

Permalink
UPSTREAM: <carry>: wait for oauth-apiserver accessibility
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k authored and soltysh committed Sep 8, 2021
1 parent b85d5c1 commit 156317b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 23 deletions.
7 changes: 6 additions & 1 deletion openshift-kube-apiserver/openshiftkubeapiserver/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func OpenShiftKubeAPIServerConfigPatch(genericConfig *genericapiserver.Config, k
// END HANDLER CHAIN

openshiftAPIServiceReachabilityCheck := newOpenshiftAPIServiceReachabilityCheck()
genericConfig.ReadyzChecks = append(genericConfig.ReadyzChecks, openshiftAPIServiceReachabilityCheck)
oauthAPIServiceReachabilityCheck := newOAuthPIServiceReachabilityCheck()
genericConfig.ReadyzChecks = append(genericConfig.ReadyzChecks, openshiftAPIServiceReachabilityCheck, oauthAPIServiceReachabilityCheck)

genericConfig.AddPostStartHookOrDie("openshift.io-startkubeinformers", func(context genericapiserver.PostStartHookContext) error {
go openshiftInformers.Start(context.StopCh)
Expand All @@ -90,6 +91,10 @@ func OpenShiftKubeAPIServerConfigPatch(genericConfig *genericapiserver.Config, k
go openshiftAPIServiceReachabilityCheck.checkForConnection(context)
return nil
})
genericConfig.AddPostStartHookOrDie("openshift.io-oauth-apiserver-reachable", func(context genericapiserver.PostStartHookContext) error {
go oauthAPIServiceReachabilityCheck.checkForConnection(context)
return nil
})
enablement.AppendPostStartHooksOrDie(genericConfig)

return nil
Expand Down
64 changes: 42 additions & 22 deletions openshift-kube-apiserver/openshiftkubeapiserver/sdn_readyz_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,40 @@ import (
"k8s.io/klog/v2"
)

func newOpenshiftAPIServiceReachabilityCheck() *openshiftAPIServiceAvailabilityCheck {
return &openshiftAPIServiceAvailabilityCheck{done: make(chan struct{})}
func newOpenshiftAPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-apiserver", "api")
}

type openshiftAPIServiceAvailabilityCheck struct {
func newOAuthPIServiceReachabilityCheck() *aggregatedAPIServiceAvailabilityCheck {
return newAggregatedAPIServiceReachabilityCheck("openshift-oauth-apiserver", "api")
}

// if the API service is not found, then this check returns quickly.
// if the endpoint is not accessible within 60 seconds, we report ready no matter what
// otherwise, wait for up to 60 seconds to be able to reach the apiserver
func newAggregatedAPIServiceReachabilityCheck(namespace, service string) *aggregatedAPIServiceAvailabilityCheck {
return &aggregatedAPIServiceAvailabilityCheck{
done: make(chan struct{}),
namespace: namespace,
serviceName: service,
}
}

type aggregatedAPIServiceAvailabilityCheck struct {
// done indicates that this check is complete (success or failure) and the check should return true
done chan struct{}

// namespace is the namespace hosting the service for the aggregated api
namespace string
// serviceName is used to get a list of endpoints to directly dial
serviceName string
}

func (c *openshiftAPIServiceAvailabilityCheck) Name() string {
return "openshift-apiservices-available"
func (c *aggregatedAPIServiceAvailabilityCheck) Name() string {
return fmt.Sprintf("%s-%s-available", c.serviceName, c.namespace)
}

func (c *openshiftAPIServiceAvailabilityCheck) Check(req *http.Request) error {
func (c *aggregatedAPIServiceAvailabilityCheck) Check(req *http.Request) error {
select {
case <-c.done:
return nil
Expand All @@ -40,11 +60,11 @@ func (c *openshiftAPIServiceAvailabilityCheck) Check(req *http.Request) error {
}
}

func (c *openshiftAPIServiceAvailabilityCheck) checkForConnection(context genericapiserver.PostStartHookContext) {
func (c *aggregatedAPIServiceAvailabilityCheck) checkForConnection(context genericapiserver.PostStartHookContext) {
defer utilruntime.HandleCrash()

reachedOpenshiftAPIServer := make(chan struct{})
noOpenshiftAPIServer := make(chan struct{})
reachedAggregatedAPIServer := make(chan struct{})
noAggregatedAPIServer := make(chan struct{})
waitUntilCh := make(chan struct{})
defer func() {
close(waitUntilCh) // this stops the endpoint check
Expand All @@ -58,8 +78,8 @@ func (c *openshiftAPIServiceAvailabilityCheck) checkForConnection(context generi
panic(err)
}

// Start a thread which repeatedly tries to connect to any openshift-apiserver endpoint.
// 1. if the openshift-apiserver endpoint doesn't exist, logs a warning and reports ready
// Start a thread which repeatedly tries to connect to any aggregated apiserver endpoint.
// 1. if the aggregated apiserver endpoint doesn't exist, logs a warning and reports ready
// 2. if a connection cannot be made, after 60 seconds logs an error and reports ready -- this avoids a rebootstrapping cycle
// 3. as soon as a connection can be made, logs a time to be ready and reports ready.
go func() {
Expand All @@ -76,11 +96,11 @@ func (c *openshiftAPIServiceAvailabilityCheck) checkForConnection(context generi

wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
ctx := gocontext.TODO()
openshiftEndpoints, err := kubeClient.CoreV1().Endpoints("openshift-apiserver").Get(ctx, "api", metav1.GetOptions{})
openshiftEndpoints, err := kubeClient.CoreV1().Endpoints(c.namespace).Get(ctx, c.serviceName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// if we have no openshift apiserver endpoint, we have no reason to wait
klog.Warning("api.openshift-apiserver.svc endpoints were not found")
close(noOpenshiftAPIServer)
// if we have no aggregated apiserver endpoint, we have no reason to wait
klog.Warningf("%s.%s.svc endpoints were not found", c.serviceName, c.namespace)
close(noAggregatedAPIServer)
return true, nil
}
if err != nil {
Expand All @@ -94,7 +114,7 @@ func (c *openshiftAPIServiceAvailabilityCheck) checkForConnection(context generi
if err == nil { // any http response is fine. it means that we made contact
response, dumpErr := httputil.DumpResponse(resp, true)
klog.V(4).Infof("reached to connect to %q: %v\n%v", url, dumpErr, string(response))
close(reachedOpenshiftAPIServer)
close(reachedAggregatedAPIServer)
resp.Body.Close()
return true, nil
}
Expand All @@ -109,18 +129,18 @@ func (c *openshiftAPIServiceAvailabilityCheck) checkForConnection(context generi
select {
case <-time.After(60 * time.Second):
// if we timeout, always return ok so that we can start from a case where all kube-apiservers are down and the SDN isn't coming up
utilruntime.HandleError(fmt.Errorf("openshift.io-openshift-apiserver-reachable never reached openshift apiservice"))
utilruntime.HandleError(fmt.Errorf("%s never reached apiserver", c.Name()))
return
case <-context.StopCh:
utilruntime.HandleError(fmt.Errorf("openshift.io-openshift-apiserver-reachable interrupted"))
utilruntime.HandleError(fmt.Errorf("%s interrupted", c.Name()))
return
case <-noOpenshiftAPIServer:
utilruntime.HandleError(fmt.Errorf("openshift.io-openshift-apiserver-reachable did not find an openshift-apiserver endpoint"))
case <-noAggregatedAPIServer:
utilruntime.HandleError(fmt.Errorf("%s did not find an %s endpoint", c.Name(), c.namespace))
return

case <-reachedOpenshiftAPIServer:
case <-reachedAggregatedAPIServer:
end := time.Now()
klog.Infof("reached openshift apiserver via SDN after %v milliseconds", end.Sub(start).Milliseconds())
klog.Infof("reached %s via SDN after %v milliseconds", c.namespace, end.Sub(start).Milliseconds())
return
}
}

0 comments on commit 156317b

Please sign in to comment.