Skip to content

Commit

Permalink
UPSTREAM: <carry>: patch aggregator to allow delegating resources
Browse files Browse the repository at this point in the history
Origin-commit: 14ba1f8ece9a7bb00ececb2a35b5f8f5fbeacc83

UPSTREAM: <carry>: prevent apiservice registration by CRD controller when delegating

Origin-commit: 3d216eab7adcbd8596606d72d31b6af621bfd350

UPSTREAM: <carry>: prevent CRD registration from fighting with APIServices

Origin-commit: c1c87eeade4730a2271cb98b4c6ea16af07e3e68

UPSTREAM: <carry>: always delegate namespaced resources

Origin-commit: 7f0815b5a88d57046a92fbdbc493bab2ad28a79c
  • Loading branch information
mfojtik authored and soltysh committed Sep 7, 2021
1 parent 3be1d66 commit 6299039
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
)

// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
Expand Down Expand Up @@ -193,6 +194,10 @@ func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResour
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
apiServiceName := groupVersion.Version + "." + groupVersion.Group

if apiserver.APIServiceAlreadyExists(groupVersion) {
return nil
}

// check all CRDs. There shouldn't that many, but if we have problems later we can index them
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
Expand Down
38 changes: 27 additions & 11 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type APIAggregator struct {
// handledGroups are the groups that already have routes
handledGroups sets.String

// handledAlwaysLocalDelegatePaths are the URL paths that already have routes registered
handledAlwaysLocalDelegatePaths sets.String

// lister is used to add group handling for /apis/<group> aggregator lookups based on
// controller state
lister listers.APIServiceLister
Expand Down Expand Up @@ -182,17 +185,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
)

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
handledAlwaysLocalDelegatePaths: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: c.GenericConfig.OpenAPIConfig,
egressSelector: c.GenericConfig.EgressSelector,
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
}

apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
Expand Down Expand Up @@ -436,6 +440,18 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
return nil
}

// For some resources we always want to delegate to local API server.
// These resources have to exists as CRD to be served locally.
for _, alwaysLocalDelegatePath := range alwaysLocalDelegatePathPrefixes.List() {
if s.handledAlwaysLocalDelegatePaths.Has(alwaysLocalDelegatePath) {
continue
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(alwaysLocalDelegatePath, proxyHandler.localDelegate)
// Always use local delegate for this prefix
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(alwaysLocalDelegatePath+"/", proxyHandler.localDelegate)
s.handledAlwaysLocalDelegatePaths.Insert(alwaysLocalDelegatePath)
}

// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
utilnet "k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -122,6 +123,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

// some groupResources should always be delegated
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
if alwaysLocalDelegateGroupResource[schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}] {
r.localDelegate.ServeHTTP(w, req)
return
}
}

if !handlingInfo.serviceAvailable {
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package apiserver

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
)

// alwaysLocalDelegatePrefixes specify a list of API paths that we want to delegate to Kubernetes API server
// instead of handling with OpenShift API server.
var alwaysLocalDelegatePathPrefixes = sets.NewString()

// AddAlwaysLocalDelegateForPrefix will cause the given URL prefix always be served by local API server (kube apiserver).
// This allows to move some resources from aggregated API server into CRD.
func AddAlwaysLocalDelegateForPrefix(prefix string) {
if alwaysLocalDelegatePathPrefixes.Has(prefix) {
return
}
alwaysLocalDelegatePathPrefixes.Insert(prefix)
}

var overlappingGroupVersion = map[schema.GroupVersion]bool{}

// AddOverlappingGroupVersion will stop the CRD registration controller from trying to manage an APIService.
func AddOverlappingGroupVersion(groupVersion schema.GroupVersion) {
overlappingGroupVersion[groupVersion] = true
}

var alwaysLocalDelegateGroupResource = map[schema.GroupResource]bool{}

func AddAlwaysLocalDelegateGroupResource(groupResource schema.GroupResource) {
alwaysLocalDelegateGroupResource[groupResource] = true
}

func APIServiceAlreadyExists(groupVersion schema.GroupVersion) bool {
if overlappingGroupVersion[groupVersion] {
return true
}

testPrefix := fmt.Sprintf("/apis/%s/%s/", groupVersion.Group, groupVersion.Version)
for _, prefix := range alwaysLocalDelegatePathPrefixes.List() {
if strings.HasPrefix(prefix, testPrefix) {
return true
}
}
return false
}

0 comments on commit 6299039

Please sign in to comment.