Skip to content

Commit

Permalink
Prevent unauthorized access to kube clusters by upserting kube_servers
Browse files Browse the repository at this point in the history
This PR changes the behavior of the kubernetes_service when validating access
to kubernetes clusters. Previously, the kubernetes_service would use the first
kubernetes cluster it found in the Auth server backend to validate access. This was
problematic because if the first kubernetes cluster was upserted with a
the same name as a kubernetes cluster the user was trying to access but
with different labels, the user would be able to access the cluster even
though they shouldn't be able to.

This PR changes the behavior of the kubernetes_service to use the
in memory kubernetes cluster representation used for heartbeats
instead of relying on the information received from the auth server. This would
block the user from accessing the cluster if the cluster was upserted
with a different set of labels since the kubernetes_service would not
have the updated labels in memory and would deny access.

Fixes #469
  • Loading branch information
tigrato authored and r0mant committed Mar 30, 2023
1 parent 93aa22b commit 393603d
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lib/authz/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func definitionForBuiltinRole(clusterName string, recConfig types.SessionRecordi
types.NewRule(types.KindRole, services.RO()),
types.NewRule(types.KindNamespace, services.RO()),
types.NewRule(types.KindLock, services.RO()),
types.NewRule(types.KindKubernetesCluster, services.RW()),
types.NewRule(types.KindKubernetesCluster, services.RO()),
types.NewRule(types.KindSemaphore, services.RW()),
},
},
Expand Down
28 changes: 28 additions & 0 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,34 @@ func TestApplicationServers(t *testing.T) {
})
}

// TestKubernetesServers tests that CRUD operations on kube servers are
// replicated from the backend to the cache.
func TestKubernetesServers(t *testing.T) {
t.Parallel()

p := newTestPack(t, ForProxy)
t.Cleanup(p.Close)

testResources(t, p, testFuncs[types.KubeServer]{
newResource: func(name string) (types.KubeServer, error) {
app, err := types.NewKubernetesClusterV3(types.Metadata{Name: name}, types.KubernetesClusterSpecV3{})
require.NoError(t, err)
return types.NewKubernetesServerV3FromCluster(app, "host", uuid.New().String())
},
create: withKeepalive(p.presenceS.UpsertKubernetesServer),
list: func(ctx context.Context) ([]types.KubeServer, error) {
return p.presenceS.GetKubernetesServers(ctx)
},
cacheList: func(ctx context.Context) ([]types.KubeServer, error) {
return p.cache.GetKubernetesServers(ctx)
},
update: withKeepalive(p.presenceS.UpsertKubernetesServer),
deleteAll: func(ctx context.Context) error {
return p.presenceS.DeleteAllKubernetesServers(ctx)
},
})
}

// TestApps tests that CRUD operations on application resources are
// replicated from the backend to the cache.
func TestApps(t *testing.T) {
Expand Down
56 changes: 32 additions & 24 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,19 @@ type Forwarder struct {
sessions map[uuid.UUID]*session
// upgrades connections to websockets
upgrader websocket.Upgrader
// getKubernetesServersForKubeCluster is a function that returns a list of
// kubernetes servers for a given kube cluster but uses different methods
// depending on the service type.
// For example, if the service type is KubeService, it will use the
// local kubernetes clusters. If the service type is Proxy, it will
// use the heartbeat clusters.
getKubernetesServersForKubeCluster getKubeServersByNameFunc
}

// getKubeServersByNameFunc is a function that returns a list of
// kubernetes servers for a given kube cluster.
type getKubeServersByNameFunc = func(ctx context.Context, name string) ([]types.KubeServer, error)

// Close signals close to all outstanding or background operations
// to complete
func (f *Forwarder) Close() error {
Expand Down Expand Up @@ -396,6 +407,9 @@ type authContext struct {
kubeResource *types.KubernetesResource
// httpMethod is the request HTTP Method.
httpMethod string
// kubeServers are the registered agents for the kubernetes cluster the request
// is targeted to.
kubeServers []types.KubeServer
}

func (c authContext) String() string {
Expand Down Expand Up @@ -729,7 +743,9 @@ func (f *Forwarder) setupContext(ctx context.Context, authCtx authz.Context, req
}

kubeCluster := identity.KubernetesCluster
if !isRemoteCluster {
// Only set a default kube cluster if the user is not accessing a specific cluster.
// The check for kubeCluster != "" is happens in the next code section.
if !isRemoteCluster && kubeCluster == "" {
kc, err := kubeutils.CheckOrSetKubeCluster(ctx, f.cfg.CachingAuthClient, identity.KubernetesCluster, teleportClusterName)
if err != nil {
if !trace.IsNotFound(err) {
Expand All @@ -746,14 +762,20 @@ func (f *Forwarder) setupContext(ctx context.Context, authCtx authz.Context, req
var (
kubeUsers, kubeGroups []string
kubeLabels map[string]string
kubeServers []types.KubeServer
err error
)
// Only check k8s principals for local clusters.
//
// For remote clusters, everything will be remapped to new roles on the
// leaf and checked there.
if !isRemoteCluster {
kubeServers, err = f.getKubernetesServersForKubeCluster(ctx, kubeCluster)
if err != nil || len(kubeServers) == 0 {
return nil, trace.NotFound("cluster %q not found", kubeCluster)
}
// check signing TTL and return a list of allowed logins for local cluster based on Kubernetes service labels.
kubeAccessDetails, err := f.getKubeAccessDetails(roles, kubeCluster, sessionTTL, kubeResource)
kubeAccessDetails, err := f.getKubeAccessDetails(kubeServers, roles, kubeCluster, sessionTTL, kubeResource)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
// roles.CheckKubeGroupsAndUsers returns trace.NotFound if the user does
Expand Down Expand Up @@ -924,7 +946,8 @@ func (f *Forwarder) setupContext(ctx context.Context, authCtx authz.Context, req
isRemote: isRemoteCluster,
isRemoteClosed: isRemoteClosed,
},
httpMethod: req.Method,
httpMethod: req.Method,
kubeServers: kubeServers,
}, nil
}

Expand Down Expand Up @@ -1008,16 +1031,12 @@ type kubeAccessDetails struct {

// getKubeAccessDetails returns the allowed kube groups/users names and the cluster labels for a local kube cluster.
func (f *Forwarder) getKubeAccessDetails(
kubeServers []types.KubeServer,
roles services.AccessChecker,
kubeClusterName string,
sessionTTL time.Duration,
kubeResource *types.KubernetesResource,
) (kubeAccessDetails, error) {
kubeServers, err := f.cfg.CachingAuthClient.GetKubernetesServers(f.ctx)
if err != nil {
return kubeAccessDetails{}, trace.Wrap(err)
}

// Find requested kubernetes cluster name and get allowed kube users/groups names.
for _, s := range kubeServers {
c := s.GetCluster()
Expand Down Expand Up @@ -1123,10 +1142,7 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error {
f.log.WithField("auth_context", actx.String()).Debug("Skipping authorization due to unknown kubernetes cluster name")
return nil
}
servers, err := f.cfg.CachingAuthClient.GetKubernetesServers(ctx)
if err != nil {
return trace.Wrap(err)
}

authPref, err := f.cfg.CachingAuthClient.GetAuthPreference(ctx)
if err != nil {
return trace.Wrap(err)
Expand All @@ -1153,7 +1169,7 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error {
//
// We assume that users won't register two identically-named clusters with
// mis-matched labels. If they do, expect weirdness.
for _, s := range servers {
for _, s := range actx.kubeServers {
ks := s.GetCluster()
if ks.GetName() != actx.kubeClusterName {
continue
Expand Down Expand Up @@ -2281,11 +2297,7 @@ func (f *Forwarder) newClusterSessionSameCluster(ctx context.Context, authCtx au
return sess, nil
}

kubeServers, err := f.cfg.CachingAuthClient.GetKubernetesServers(f.ctx)
if err != nil && !trace.IsNotFound(err) {
return nil, trace.Wrap(err)
}

kubeServers := authCtx.kubeServers
if len(kubeServers) == 0 && authCtx.kubeClusterName == authCtx.teleportCluster.name {
return nil, trace.Wrap(localErr)
}
Expand Down Expand Up @@ -2314,12 +2326,8 @@ func (f *Forwarder) newClusterSessionSameCluster(ctx context.Context, authCtx au
}

func (f *Forwarder) newClusterSessionLocal(ctx authContext) (*clusterSession, error) {
if len(f.clusterDetails) == 0 {
return nil, trace.NotFound("this Teleport process is not configured for direct Kubernetes access; you likely need to 'tsh login' into a leaf cluster or 'tsh kube login' into a different kubernetes cluster")
}

details, ok := f.clusterDetails[ctx.kubeClusterName]
if !ok {
details, err := f.findKubeDetailsByClusterName(ctx.kubeClusterName)
if err != nil {
return nil, trace.NotFound("kubernetes cluster %q not found", ctx.kubeClusterName)
}

Expand Down
Loading

0 comments on commit 393603d

Please sign in to comment.