Skip to content

Commit

Permalink
Ensure Cache types.WatchKinds and proto.WatchEvents are in sync (#…
Browse files Browse the repository at this point in the history
…11692) (#11925)

This adds a test, `TestCacheWatchKindExistsInEvents`, that ensures
all `types.WatchKind` registered for all Cache components have a
corresponding `proto.Event` defined. By doing this we can ensure
that for a release there will be no `resource type X is not
suppported` errors causing the Cache to be unhealthy. Unfortunately
this does not prevent issues across releases which may have different
resources being emitted.

(cherry picked from commit f5d2542)

# Conflicts:
#	api/client/streamwatcher.go
#	lib/auth/grpcserver.go
  • Loading branch information
rosstimothy authored Apr 19, 2022
1 parent 85af18c commit d3e54f0
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 241 deletions.
258 changes: 258 additions & 0 deletions api/client/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Copyright 2022 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/trace"
)

// EventToGRPC converts types.Event to proto.Event
func EventToGRPC(in types.Event) (*proto.Event, error) {
eventType, err := EventTypeToGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := proto.Event{
Type: eventType,
}
if in.Type == types.OpInit {
return &out, nil
}
switch r := in.Resource.(type) {
case *types.ResourceHeader:
out.Resource = &proto.Event_ResourceHeader{
ResourceHeader: r,
}
case *types.CertAuthorityV2:
out.Resource = &proto.Event_CertAuthority{
CertAuthority: r,
}
case *types.StaticTokensV2:
out.Resource = &proto.Event_StaticTokens{
StaticTokens: r,
}
case *types.ProvisionTokenV2:
out.Resource = &proto.Event_ProvisionToken{
ProvisionToken: r,
}
case *types.ClusterNameV2:
out.Resource = &proto.Event_ClusterName{
ClusterName: r,
}
case *types.ClusterConfigV3:
out.Resource = &proto.Event_ClusterConfig{
ClusterConfig: r,
}
case *types.UserV2:
out.Resource = &proto.Event_User{
User: r,
}
case *types.RoleV4:
out.Resource = &proto.Event_Role{
Role: r,
}
case *types.Namespace:
out.Resource = &proto.Event_Namespace{
Namespace: r,
}
case *types.ServerV2:
out.Resource = &proto.Event_Server{
Server: r,
}
case *types.ReverseTunnelV2:
out.Resource = &proto.Event_ReverseTunnel{
ReverseTunnel: r,
}
case *types.TunnelConnectionV2:
out.Resource = &proto.Event_TunnelConnection{
TunnelConnection: r,
}
case *types.AccessRequestV3:
out.Resource = &proto.Event_AccessRequest{
AccessRequest: r,
}
case *types.WebSessionV2:
switch r.GetSubKind() {
case types.KindAppSession:
out.Resource = &proto.Event_AppSession{
AppSession: r,
}
case types.KindWebSession:
out.Resource = &proto.Event_WebSession{
WebSession: r,
}
default:
return nil, trace.BadParameter("only %q supported", types.WebSessionSubKinds)
}
case *types.WebTokenV3:
out.Resource = &proto.Event_WebToken{
WebToken: r,
}
case *types.RemoteClusterV3:
out.Resource = &proto.Event_RemoteCluster{
RemoteCluster: r,
}
case *types.DatabaseServerV3:
out.Resource = &proto.Event_DatabaseServer{
DatabaseServer: r,
}
case *types.ClusterAuditConfigV2:
out.Resource = &proto.Event_ClusterAuditConfig{
ClusterAuditConfig: r,
}
case *types.ClusterNetworkingConfigV2:
out.Resource = &proto.Event_ClusterNetworkingConfig{
ClusterNetworkingConfig: r,
}
case *types.SessionRecordingConfigV2:
out.Resource = &proto.Event_SessionRecordingConfig{
SessionRecordingConfig: r,
}
case *types.AuthPreferenceV2:
out.Resource = &proto.Event_AuthPreference{
AuthPreference: r,
}
case *types.LockV2:
out.Resource = &proto.Event_Lock{
Lock: r,
}
case *types.NetworkRestrictionsV4:
out.Resource = &proto.Event_NetworkRestrictions{
NetworkRestrictions: r,
}
default:
return nil, trace.BadParameter("resource type %T is not supported", in.Resource)
}
return &out, nil
}

// EventTypeToGRPC converts types.OpType to proto.Operation
func EventTypeToGRPC(in types.OpType) (proto.Operation, error) {
switch in {
case types.OpInit:
return proto.Operation_INIT, nil
case types.OpPut:
return proto.Operation_PUT, nil
case types.OpDelete:
return proto.Operation_DELETE, nil
default:
return -1, trace.BadParameter("event type %v is not supported", in)
}
}

// EventFromGRPC converts proto.Event to types.Event
func EventFromGRPC(in proto.Event) (*types.Event, error) {
eventType, err := EventTypeFromGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := types.Event{
Type: eventType,
}
if eventType == types.OpInit {
return &out, nil
}
if r := in.GetResourceHeader(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetCertAuthority(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetStaticTokens(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetProvisionToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterName(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetUser(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRole(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNamespace(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetReverseTunnel(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetTunnelConnection(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAccessRequest(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRemoteCluster(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabaseServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterAuditConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterNetworkingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetSessionRecordingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAuthPreference(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetLock(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNetworkRestrictions(); r != nil {
out.Resource = r
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
}

// EventTypeFromGRPC converts proto.Operation to types.OpType
func EventTypeFromGRPC(in proto.Operation) (types.OpType, error) {
switch in {
case proto.Operation_INIT:
return types.OpInit, nil
case proto.Operation_PUT:
return types.OpPut, nil
case proto.Operation_DELETE:
return types.OpDelete, nil
default:
return types.OpInvalid, trace.BadParameter("unsupported operation type: %v", in)
}
}
104 changes: 1 addition & 103 deletions api/client/streamwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (w *streamWatcher) receiveEvents() {
w.closeWithError(trail.FromGRPC(err))
return
}
out, err := eventFromGRPC(*event)
out, err := EventFromGRPC(*event)
if err != nil {
w.closeWithError(trail.FromGRPC(err))
return
Expand All @@ -100,108 +100,6 @@ func (w *streamWatcher) receiveEvents() {
}
}

// eventFromGRPC converts an proto.Event to a types.Event
func eventFromGRPC(in proto.Event) (*types.Event, error) {
eventType, err := eventTypeFromGRPC(in.Type)
if err != nil {
return nil, trace.Wrap(err)
}
out := types.Event{
Type: eventType,
}
if eventType == types.OpInit {
return &out, nil
}
if r := in.GetResourceHeader(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetCertAuthority(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetStaticTokens(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetProvisionToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterName(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetUser(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRole(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNamespace(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetReverseTunnel(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetTunnelConnection(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAccessRequest(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAppSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebSession(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetWebToken(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetRemoteCluster(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetDatabaseServer(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterAuditConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetClusterNetworkingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetSessionRecordingConfig(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetAuthPreference(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetLock(); r != nil {
out.Resource = r
return &out, nil
} else if r := in.GetNetworkRestrictions(); r != nil {
out.Resource = r
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
}

func eventTypeFromGRPC(in proto.Operation) (types.OpType, error) {
switch in {
case proto.Operation_INIT:
return types.OpInit, nil
case proto.Operation_PUT:
return types.OpPut, nil
case proto.Operation_DELETE:
return types.OpDelete, nil
default:
return types.OpInvalid, trace.BadParameter("unsupported operation type: %v", in)
}
}

// Done returns a channel that closes once the streamWatcher is Closed
func (w *streamWatcher) Done() <-chan struct{} {
return w.ctx.Done()
Expand Down
Loading

0 comments on commit d3e54f0

Please sign in to comment.