From aeb8da5d0280e38b43f131ab158c209bc73878cb Mon Sep 17 00:00:00 2001 From: Marco Dinis Date: Tue, 15 Oct 2024 09:20:11 +0100 Subject: [PATCH] wip --- api/types/constants.go | 5 ++++ lib/auth/authclient/api.go | 9 ++++++ lib/authz/permissions.go | 1 + lib/services/notifications.go | 53 +++++++++++++++++++++++++++++++++++ lib/srv/discovery/status.go | 40 ++++++++++++++++++-------- 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/api/types/constants.go b/api/types/constants.go index 087914dbdd045..cc3bdf5eb016e 100644 --- a/api/types/constants.go +++ b/api/types/constants.go @@ -1093,6 +1093,11 @@ const ( // NotificationUserCreatedWarningSubKind is the subkind for a user-created warning notification. NotificationUserCreatedWarningSubKind = "user-created-warning" + // NotificationUserTaskIntegrationSubKind is the subkind for a notification that warns the user about pending User Tasks for a given integration. + NotificationUserTaskIntegrationSubKind = "user-task-integration" + // NotificationIntegrationLabel is the label which contains the name of the integration. + NotificationIntegrationLabel = TeleportInternalLabelPrefix + "integration-name" + // NotificationAccessRequestPendingSubKind is the subkind for a notification for an access request pending review. NotificationAccessRequestPendingSubKind = "access-request-pending" // NotificationAccessRequestApprovedSubKind is the subkind for a notification for a user's access request being approved. diff --git a/lib/auth/authclient/api.go b/lib/auth/authclient/api.go index 80b25dc26d2b6..37542a478fd36 100644 --- a/lib/auth/authclient/api.go +++ b/lib/auth/authclient/api.go @@ -34,6 +34,7 @@ import ( integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1" machineidv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1" + notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1" userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2" userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1" usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1" @@ -809,6 +810,9 @@ type DiscoveryAccessPoint interface { // UpsertUserTask creates or updates an User Task UpsertUserTask(ctx context.Context, req *usertasksv1.UserTask) (*usertasksv1.UserTask, error) + + // UpsertGlobalNotification upserts a global notification. + UpsertGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error) } // ReadOktaAccessPoint is a read only API interface to be @@ -1459,6 +1463,11 @@ func (w *DiscoveryWrapper) UpsertUserTask(ctx context.Context, req *usertasksv1. return w.NoCache.UpsertUserTask(ctx, req) } +// UpsertGlobalNotification upserts a global notification. +func (w *DiscoveryWrapper) UpsertGlobalNotification(ctx context.Context, globalNotification *notificationsv1.GlobalNotification) (*notificationsv1.GlobalNotification, error) { + return w.NoCache.UpsertGlobalNotification(ctx, globalNotification) +} + // Close closes all associated resources func (w *DiscoveryWrapper) Close() error { err := w.NoCache.Close() diff --git a/lib/authz/permissions.go b/lib/authz/permissions.go index f8b5587018608..c6fdecba6fa9c 100644 --- a/lib/authz/permissions.go +++ b/lib/authz/permissions.go @@ -1199,6 +1199,7 @@ func definitionForBuiltinRole(clusterName string, recConfig readonly.SessionReco types.NewRule(types.KindIntegration, append(services.RO(), types.VerbUse)), types.NewRule(types.KindSemaphore, services.RW()), types.NewRule(types.KindUserTask, services.RW()), + types.NewRule(types.KindNotification, services.RW()), }, // Discovery service should only access kubes/apps/dbs that originated from discovery. KubernetesLabels: types.Labels{types.OriginLabel: []string{types.OriginCloud}}, diff --git a/lib/services/notifications.go b/lib/services/notifications.go index 47b16bbf35126..6c43fb7915865 100644 --- a/lib/services/notifications.go +++ b/lib/services/notifications.go @@ -20,9 +20,14 @@ package services import ( "context" + "encoding/binary" + "time" + "github.com/google/uuid" "github.com/gravitational/trace" + "google.golang.org/protobuf/types/known/timestamppb" + headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1" notificationsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1" "github.com/gravitational/teleport/api/types" ) @@ -106,6 +111,54 @@ func ValidateGlobalNotification(globalNotification *notificationsv1.GlobalNotifi return nil } +// notificationNameForIntegration returns a deterministic name for a Notification of type "user-task-integration" for a given Integration name. +// This method is used to ensure a single Notification is created to report issues related to User Task Integrations for a given integration. +func notificationNameForIntegration(integration string) string { + var bs []byte + bs = append(bs, binary.LittleEndian.AppendUint64(nil, uint64(len(integration)))...) + bs = append(bs, []byte(integration)...) + return uuid.NewSHA1(notificationNameForIntegrationNamespace, bs).String() +} + +// notificationNameForIntegrationNamespace is an UUID that represents the name space to be used for generating UUIDs for DiscoverEC2 User Task names. +var notificationNameForIntegrationNamespace = uuid.Must(uuid.Parse("6ba7b815-9dad-11d1-80b4-00c04fd430c9")) + +// NotificationForIntegrationPendingTasks builds a GlobalNotification for notifying users about UserTasks for an Integration. +func NotificationForIntegrationPendingTasks(integrationName string, expiration *timestamppb.Timestamp, createdTime time.Time) *notificationsv1.GlobalNotification { + return ¬ificationsv1.GlobalNotification{ + Metadata: &headerv1.Metadata{}, + Spec: ¬ificationsv1.GlobalNotificationSpec{ + Matcher: ¬ificationsv1.GlobalNotificationSpec_ByPermissions{ + ByPermissions: ¬ificationsv1.ByPermissions{ + RoleConditions: []*types.RoleConditions{{ + Rules: []types.Rule{{ + Resources: []string{types.KindIntegration}, + Verbs: []string{types.VerbList, types.VerbRead}, + }}, + }}, + }, + }, + Notification: ¬ificationsv1.Notification{ + Kind: types.KindGlobalNotification, + Version: types.V1, + Spec: ¬ificationsv1.NotificationSpec{ + Created: timestamppb.New(createdTime), + }, + SubKind: types.NotificationUserTaskIntegrationSubKind, + Metadata: &headerv1.Metadata{ + Name: notificationNameForIntegration(integrationName), + Labels: map[string]string{ + types.NotificationTitleLabel: "Your integration needs attention.", + types.NotificationIntegrationLabel: integrationName, + types.NotificationScope: "global", + }, + Expires: expiration, + }, + }, + }, + } +} + // MarshalGlobalNotification marshals a GlobalNotification resource to JSON. func MarshalGlobalNotification(globalNotification *notificationsv1.GlobalNotification, opts ...MarshalOption) ([]byte, error) { if err := ValidateGlobalNotification(globalNotification); err != nil { diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 8d6960e557e6d..f743b3b5206c0 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -429,7 +429,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun // merges them against the ones that exist in the cluster. // // All of this flow is protected by a lock to ensure there's no race between this and other DiscoveryServices. -func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) error { +func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInstances map[string]*usertasksv1.DiscoverEC2Instance) (*usertasksv1.UserTask, error) { userTaskName := usertasks.TaskNameForDiscoverEC2(usertasks.TaskNameForDiscoverEC2Parts{ Integration: taskGroup.integration, IssueType: taskGroup.issueType, @@ -439,7 +439,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta releaseFn, ctxWithLease, err := s.acquireSemaphoreForUserTask(userTaskName) if err != nil { - return trace.Wrap(err) + return nil, trace.Wrap(err) } defer releaseFn() @@ -448,7 +448,7 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta switch { case trace.IsNotFound(err): case err != nil: - return trace.Wrap(err) + return nil, trace.Wrap(err) default: failedInstances = s.discoverEC2UserTaskAddExistingInstances(currentUserTask, failedInstances) } @@ -472,14 +472,11 @@ func (s *Server) mergeUpsertDiscoverEC2Task(taskGroup awsEC2TaskKey, failedInsta usertasks.WithExpiration(taskExpiration), ) if err != nil { - return trace.Wrap(err) - } - - if _, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task); err != nil { - return trace.Wrap(err) + return nil, trace.Wrap(err) } - return nil + taskUpserted, err := s.AccessPoint.UpsertUserTask(ctxWithLease, task) + return taskUpserted, trace.Wrap(err) } // discoverEC2UserTaskAddExistingInstances takes the UserTask stored in the cluster and merges it into the existing map of failed instances. @@ -506,6 +503,16 @@ func (s *Server) discoverEC2UserTaskAddExistingInstances(currentUserTask *userta return failedInstances } +func (s *Server) upsertUserNotificationForIntegration(integrationName string, expiration *timestamppb.Timestamp) error { + notificationForIntegrationPendingTasks := services.NotificationForIntegrationPendingTasks(integrationName, expiration, s.clock.Now()) + + s.Log.WithField("Notification", notificationForIntegrationPendingTasks).Warn("Notification upsert will fail") + if _, err := s.AccessPoint.UpsertGlobalNotification(s.ctx, notificationForIntegrationPendingTasks); err != nil { + return trace.Wrap(err) + } + return nil +} + func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { s.awsEC2Tasks.mu.Lock() defer s.awsEC2Tasks.mu.Unlock() @@ -515,17 +522,28 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { continue } - if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil { + upsertedTask, err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID) + if err != nil { s.Log.WithError(err).WithFields(logrus.Fields{ "integration": g.integration, "issue_type": g.issueType, "aws_account_id": g.accountID, "aws_region": g.region, }, - ).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region) + ).Warning("Failed to create discover ec2 user task.") continue } delete(s.awsEC2Tasks.issuesSyncQueue, g) + + if err := s.upsertUserNotificationForIntegration(g.integration, upsertedTask.GetMetadata().GetExpires()); err != nil { + s.Log.WithError(err).WithFields(logrus.Fields{ + "integration": g.integration, + "issue_type": g.issueType, + "aws_account_id": g.accountID, + "aws_region": g.region, + }, + ).Warning("Failed to notify user about new task for integration.") + } } }