From 02d11330343e89646336c910aa3cc8745ab2a8d2 Mon Sep 17 00:00:00 2001 From: SudhanshuBawane Date: Thu, 29 Feb 2024 20:58:45 +0530 Subject: [PATCH] WIP Signed-off-by: SudhanshuBawane --- backend/agentd/agentd.go | 9 +- backend/agentd/session.go | 140 +++++++++++++++++++++++++++++-- backend/agentd/watcher.go | 9 +- backend/messaging/message_bus.go | 6 ++ backend/store/store.go | 7 +- 5 files changed, 153 insertions(+), 18 deletions(-) diff --git a/backend/agentd/agentd.go b/backend/agentd/agentd.go index 0af79565c9..beb7e84b6b 100644 --- a/backend/agentd/agentd.go +++ b/backend/agentd/agentd.go @@ -324,8 +324,13 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error { if event.User == nil { return errors.New("nil entry received from the user config watcher") } - a.bus.Publish("userChanges", event.User.Username) - + topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.GetMetadata().Name) + if err := a.bus.Publish(topic, &event); err != nil { + return err + } + //a.bus.Publish("userChanges", event.User.Username) + logger.WithField("topic", topic). + Debug("successfully published an user config update to the bus") return nil } diff --git a/backend/agentd/session.go b/backend/agentd/session.go index 3f96bb3361..aea91dbc77 100644 --- a/backend/agentd/session.go +++ b/backend/agentd/session.go @@ -25,6 +25,8 @@ import ( ) const ( + UserNotFound = "not found" + deletedEventSentinel = -1 // Time to wait before force close on connection. @@ -227,6 +229,10 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) { subscriptions: make(chan messaging.Subscription, 1), updatesChannel: make(chan interface{}, 10), }, + userConfig: &userConfig{ + subscription: make(chan messaging.Subscription, 1), + updatesChannel: make(chan interface{}, 10), + }, } // Optionally subscribe to burial notifications @@ -356,17 +362,65 @@ func (s *Session) sender() { for { var msg *transport.Message select { - //2608 ---- user ----- + //---- user -----// case u := <-s.userReceiver.ch: - user, ok := u.(corev2.User) + user, ok := u.(*corev2.User) if !ok { - + logger.WithField("key", ok) } + if user.Disabled && user.Username == s.user { return } - // -----entity ------- + //case u := <-s.userConfig.updatesChannel: + // watchEvent, ok := u.(*store.WatchEventUserConfig) + // fmt.Println("========== usrConfig Updates ========", watchEvent) + // if !ok { + // logger.Errorf("session received unexoected struct : %T", u) + // continue + // } + // + // if watchEvent.User.Disabled && watchEvent.User.Username == s.user { + // return + // } + // //fmt.Println("========== usrConfig Updates ========", watchEvent) + ////// Handle the delete/disable event + ////switch watchEvent.Action { + ////case store.WatchDelete: + //// return + ////} + // + //if watchEvent.User == nil { + // logger.Error("session received nil user in watch event") + //} + //// + //lagger := logger.WithFields(logrus.Fields{ + // "action": watchEvent.Action.String(), + // "user": watchEvent.User.GetMetadata().Name, + // "namespace": watchEvent.User.GetMetadata().Namespace, + //}) + //lagger.Debug("user update received") + // + //configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User) + //wrapper, err := storev2.WrapResource(watchEvent.User) + //if err != nil { + // lagger.WithError(err).Error("could not warp the user config") + // continue + //} + // + //if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil { + // sessionErrorCounter.WithLabelValues(err.Error()).Inc() + // lagger.WithError(err).Error("could not update the user config") + //} + + //bytes, err := s.marshal(watchEvent.User) + //if err != nil { + // lagger.WithError(err).Error("session failed to serialize user config") + //} + //msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes) + + // ---- entity ----// case e := <-s.entityConfig.updatesChannel: watchEvent, ok := e.(*store.WatchEventEntityConfig) if !ok { @@ -495,6 +549,8 @@ func (s *Session) sender() { // 3. Start goroutine that waits for context cancellation, and shuts down service. func (s *Session) Start() (err error) { defer close(s.entityConfig.subscriptions) + defer close(s.userConfig.subscription) + sessionCounter.WithLabelValues(s.cfg.Namespace).Inc() s.wg = &sync.WaitGroup{} s.wg.Add(2) @@ -518,21 +574,84 @@ func (s *Session) Start() (err error) { "namespace": s.cfg.Namespace, }) - // Subscribe the agent to its entity_config topic + // Subscribe the agent to its entity_config and user_config topic topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName) + userTopic := messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName) lager.WithField("topic", topic).Debug("subscribing to topic") + logger.WithField("topic", userTopic).Debug("subscribing to topic") // Get a unique name for the agent, which will be used as the consumer of the // bus, in order to avoid problems with an agent reconnecting before its // session is ended agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName) + + // Determine if user already exits + userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig) + if usrErr != nil { + lager.WithError(err).Error("error starting subscription") + return err + } + s.userConfig.subscription <- userSubscription + usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName()) + usrWrapper, err := s.storev2.Get(usrReq) + if err != nil { + // Just exit but don't send error about absence of user config + var errNotFound *store.ErrNotFound + if !errors.As(err, &errNotFound) { + lager.WithError(err).Error("error querying the user config") + return err + } + lager.Debug("no user config found") + + // Indicate to the agent that this user does not exist + meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace) + watchEvent := &store.WatchEventUserConfig{ + User: &corev2.User{ + Username: s.user, + }, + Action: store.WatchCreate, + Metadata: &meta, + } + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + if err != nil { + lager.WithError(err).Error("error publishing user config") + return err + } + } else { + // A user config already exists, therefore we should the stored user subscriptions + // rather than what the agent provided us for the subscriptions + lager.Debug("an user config was found") + + var storedUserConfig corev2.User + err = usrWrapper.UnwrapInto(&storedUserConfig) + if err != nil { + lager.WithError(err).Error("error unwrapping user config") + return err + } + + // Remove the managed_by label if the value is sensu-agent, in case of disabled user + if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel) + } + + // Send back this user config to the agent so it uses that rather than it's local config + watchEvent := &store.WatchEventUserConfig{ + Action: store.WatchUpdate, + User: &storedUserConfig, + } + err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent) + if err != nil { + lager.WithError(err).Error("error publishing user config") + return err + } + } + + // Determine if the entity already exists subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig) if err != nil { lager.WithError(err).Error("error starting subscription") return err } s.entityConfig.subscriptions <- subscription - - // Determine if the entity already exists req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName()) wrapper, err := s.storev2.Get(req) if err != nil { @@ -624,6 +743,7 @@ func (s *Session) stop() { } }() defer close(s.entityConfig.updatesChannel) + defer close(s.userConfig.updatesChannel) defer close(s.checkChannel) sessionCounter.WithLabelValues(s.cfg.Namespace).Dec() @@ -648,6 +768,12 @@ func (s *Session) stop() { } } + for sub := range s.userConfig.subscription { + if err := sub.Cancel(); err != nil { + logger.WithError(err).Error("unable to unsubscribe from message bus") + } + } + // Unsubscribe the session from every configured check subscriptions s.unsubscribe(s.cfg.Subscriptions) } diff --git a/backend/agentd/watcher.go b/backend/agentd/watcher.go index ee2e413307..68b703b02e 100644 --- a/backend/agentd/watcher.go +++ b/backend/agentd/watcher.go @@ -6,7 +6,6 @@ import ( "github.com/gogo/protobuf/proto" corev2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" - "github.com/sensu/sensu-go/agent" "github.com/sensu/sensu-go/backend/store" etcdstore "github.com/sensu/sensu-go/backend/store/etcd" storev2 "github.com/sensu/sensu-go/backend/store/v2" @@ -110,12 +109,8 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s } // Remove the managed_by label if the value is sensu-agent, in case the user is disabled - //if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { - // delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) - //} - - if userConfig.Disabled { - agent.GracefulShutdown(cancel) + if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" { + delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel) } ch <- store.WatchEventUserConfig{ diff --git a/backend/messaging/message_bus.go b/backend/messaging/message_bus.go index 07823b25d2..5e42ff99ec 100644 --- a/backend/messaging/message_bus.go +++ b/backend/messaging/message_bus.go @@ -14,6 +14,8 @@ const ( // to agents TopicEntityConfig = "sensu:entity-config" + TopicUserConfig = "sensu:user-config" + // TopicEvent is the topic for events that have been written to Etcd and // normalized by eventd. TopicEvent = "sensu:event" @@ -104,6 +106,10 @@ func EntityConfigTopic(namespace, name string) string { return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name) } +func UserConfigTopic(namespace, name string) string { + return fmt.Sprintf("%s:%s:%s", TopicUserConfig, namespace, name) +} + // SubscriptionTopic is a helper to determine the proper topic name for a // subscription based on the namespace func SubscriptionTopic(namespace, sub string) string { diff --git a/backend/store/store.go b/backend/store/store.go index b7e222c68f..fb01ae130b 100644 --- a/backend/store/store.go +++ b/backend/store/store.go @@ -6,6 +6,7 @@ import ( "fmt" corev2 "github.com/sensu/core/v2" + v2 "github.com/sensu/core/v2" corev3 "github.com/sensu/core/v3" "github.com/sensu/sensu-go/backend/store/patch" "github.com/sensu/sensu-go/types" @@ -159,8 +160,10 @@ type WatchEventEntityConfig struct { // WatchEventUserConfig contains and updated entity config and the action that // occurred during this modification type WatchEventUserConfig struct { - User *corev2.User - Action WatchActionType + User *corev2.User + Action WatchActionType + Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"` + //Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"` } // Store is used to abstract the durable storage used by the Sensu backend