Skip to content

Commit

Permalink
Prevent unauthorized access to kube clusters by upserting kube_servers (
Browse files Browse the repository at this point in the history
#470)

This PR changes the behavior of the kubernetes_service when validating access
to kubernetes clusters. Previously, the kubernetes_service would use the first
kubernetes cluster it found in the Auth server backend to validate access. This was
problematic because if the first kubernetes cluster was upserted with a
the same name as a kubernetes cluster the user was trying to access but
with different labels, the user would be able to access the cluster even
though they shouldn't be able to.

This PR changes the behavior of the kubernetes_service to use the
in memory kubernetes cluster representation used for heartbeats
instead of relying on the information received from the auth server. This would
block the user from accessing the cluster if the cluster was upserted
with a different set of labels since the kubernetes_service would not
have the updated labels in memory and would deny access.

Fixes #469
  • Loading branch information
tigrato authored and r0mant committed Mar 30, 2023
1 parent b58759b commit de657ce
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
17 changes: 14 additions & 3 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,19 @@ type Forwarder struct {
sessions map[uuid.UUID]*session
// upgrades connections to websockets
upgrader websocket.Upgrader
// getKubernetesServersForKubeCluster is a function that returns a list of
// kubernetes services for a given kube cluster but uses different methods
// depending on the service type.
// For example, if the service type is KubeService, it will use the
// local kubernetes clusters. If the service type is Proxy, it will
// use the heartbeat clusters.
getKubernetesServersForKubeCluster getKubeServicesByNameFunc
}

// getKubeServicesByNameFunc is a function that returns a list of
// kubernetes services that might contain the given kube cluster.
type getKubeServicesByNameFunc = func(ctx context.Context, name string) ([]types.Server, error)

// Close signals close to all outstanding or background operations
// to complete
func (f *Forwarder) Close() error {
Expand Down Expand Up @@ -714,7 +725,7 @@ func (f *Forwarder) getKubeAccessDetails(
kubeClusterName string,
sessionTTL time.Duration,
) (kubeAccessDetails, error) {
kubeServices, err := f.cfg.CachingAuthClient.GetKubeServices(f.ctx)
kubeServices, err := f.getKubernetesServersForKubeCluster(f.ctx, kubeClusterName)
if err != nil {
return kubeAccessDetails{}, trace.Wrap(err)
}
Expand Down Expand Up @@ -761,7 +772,7 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error {
f.log.WithField("auth_context", actx.String()).Debug("Skipping authorization due to unknown kubernetes cluster name")
return nil
}
servers, err := f.cfg.CachingAuthClient.GetKubeServices(ctx)
servers, err := f.getKubernetesServersForKubeCluster(f.ctx, actx.kubeCluster)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -1818,7 +1829,7 @@ func (f *Forwarder) newClusterSessionSameCluster(ctx authContext) (*clusterSessi
return sess, nil
}

kubeServices, err := f.cfg.CachingAuthClient.GetKubeServices(f.ctx)
kubeServices, err := f.getKubernetesServersForKubeCluster(f.ctx, ctx.kubeCluster)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}
Expand Down
11 changes: 11 additions & 0 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func TestAuthenticate(t *testing.T) {
},
}

f.getKubernetesServersForKubeCluster = func(ctx context.Context, kubeCluster string) ([]types.Server, error) {
return f.cfg.CachingAuthClient.GetKubeServices(ctx)
}

const remoteAddr = "user.example.com"
activeAccessRequests := []string{uuid.NewString(), uuid.NewString()}
tests := []struct {
Expand Down Expand Up @@ -805,6 +809,10 @@ func TestNewClusterSessionLocal(t *testing.T) {
},
}

f.getKubernetesServersForKubeCluster = func(ctx context.Context, kubeCluster string) ([]types.Server, error) {
return f.cfg.CachingAuthClient.GetKubeServices(ctx)
}

// Fail when kubeCluster is not specified
authCtx.kubeCluster = ""
_, err := f.newClusterSession(authCtx)
Expand Down Expand Up @@ -863,6 +871,9 @@ func TestNewClusterSessionRemote(t *testing.T) {
func TestNewClusterSessionDirect(t *testing.T) {
ctx := context.Background()
f := newMockForwader(ctx, t)
f.getKubernetesServersForKubeCluster = func(ctx context.Context, kubeCluster string) ([]types.Server, error) {
return f.cfg.CachingAuthClient.GetKubeServices(ctx)
}
authCtx := mockAuthCtx(ctx, t, "kube-cluster", false)

// helper function to create kube services
Expand Down
62 changes: 62 additions & 0 deletions lib/kube/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package proxy

import (
"context"
"crypto/tls"
"net"
"net/http"
Expand Down Expand Up @@ -184,6 +185,10 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) {
log.Debug("No local kube credentials on proxy, will not start kubernetes_service heartbeats")
}

fwd.getKubernetesServersForKubeCluster, err = server.getKubernetesServiceFunc()
if err != nil {
return nil, trace.Wrap(err)
}
return server, nil
}

Expand Down Expand Up @@ -280,3 +285,60 @@ func (t *TLSServer) GetServerInfo() (types.Resource, error) {

return srv, nil
}

// getKubernetesServiceFunc returns a function that returns the kubernetes services
func (t *TLSServer) getKubernetesServiceFunc() (getKubeServicesByNameFunc, error) {
switch t.KubeServiceType {
case KubeService:
return func(_ context.Context, name string) ([]types.Server, error) {
resource, err := t.GetServerInfo()
if err != nil {
return nil, trace.Wrap(err)
}
srv, ok := resource.(types.Server)
if !ok {
return nil, trace.BadParameter("unexpected type %T", resource)
}
return []types.Server{srv}, nil
}, nil
case ProxyService:
return t.getAuthKubeServices, nil
case LegacyProxyService:
return func(ctx context.Context, name string) ([]types.Server, error) {
servers, err := t.getLocalKubeServiceForCluster(name)
if err != nil {
servers, err := t.getAuthKubeServices(ctx, name)
return servers, trace.Wrap(err)
}
return servers, nil
}, nil
default:
return nil, trace.BadParameter("unknown kubernetes service type %q", t.KubeServiceType)
}
}

// getAuthKubeServers returns the kubernetes servers for a given kube cluster
// using the Auth server client.
func (t *TLSServer) getAuthKubeServices(ctx context.Context, name string) ([]types.Server, error) {
servers, err := t.CachingAuthClient.GetKubeServices(ctx)
return servers, trace.Wrap(err)
}

// getLocalKubeServiceForCluster returns the local kubernetes service if it
// includes the given cluster.
func (t *TLSServer) getLocalKubeServiceForCluster(clusterName string) ([]types.Server, error) {
resource, err := t.GetServerInfo()
if err != nil {
return nil, trace.Wrap(err)
}
srv, ok := resource.(types.Server)
if !ok {
return nil, trace.BadParameter("unexpected type %T", resource)
}
for _, cluster := range srv.GetKubernetesClusters() {
if cluster.Name == clusterName {
return []types.Server{srv}, nil
}
}
return nil, trace.NotFound("kubernetes cluster %q not found", clusterName)
}

0 comments on commit de657ce

Please sign in to comment.