Skip to content

Commit

Permalink
Azure integration status reporting (gravitational#51391)
Browse files Browse the repository at this point in the history
* Protobuf and configuration for Access Graph Azure Discovery

* Fixing rebase after protobuf gen

* Updating to use existing msgraph client

* PR feedback

* Using variadic options

* Removing memberOf expansion

* Expanding memberships by calling memberOf on each user

* PR feedback

* Rebase go.sum stuff

* Go mod tidy

* Fixing go.mod

* Update lib/msgraph/paginated.go

Co-authored-by: Tiago Silva <tiago.silva@goteleport.com>

* PR feedback

* Protobuf and configuration for Access Graph Azure Discovery

* Adding Azure sync functionality which can be called by the Azure fetcher

* Protobuf update

* Linting

* PR feedback

* PR feedback

* Updating to use existing msgraph client

* PR feedback

* Using variadic options

* Removing memberOf expansion

* Expanding memberships by calling memberOf on each user

* PR feedback

* Rebase go.sum stuff

* PR feedback

* Protobuf and configuration for Access Graph Azure Discovery

* Invoking the Azure fetcher in the Discovery service

* Protobuf gen fix

* Rebase fixes

* More cleanup

* PR feedback

* Invoking token generation and returning the response

* Fetching the Azure OIDC token during fetcher creation and establishing a credential assertion approach

* PR feedback; restricting token requests to auth, discovery, and proxy roles.

* Lint

* Rebase fxes

* Adding back OIDC fetching, accidentally removed it during rebase

* Initial refactoring to include Azure status reporting

* Converging status sync between AWS and Azure

* Fixing test

* Sending usage stats

* Fix imports

* Add godocs and correct a few comments

* Removing the usage events for now

---------

Co-authored-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
2 people authored and carloscastrojumo committed Feb 19, 2025
1 parent cbae992 commit 9fe5c79
Show file tree
Hide file tree
Showing 19 changed files with 176 additions and 160 deletions.
23 changes: 13 additions & 10 deletions lib/srv/discovery/access_graph_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
return trace.Wrap(errNoAccessGraphFetchers)
}

s.awsSyncStatus.iterationStarted(allFetchers, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, fetcher := range allFetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -92,7 +94,6 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
tokens := make(chan struct{}, 3)
accountIds := map[string]struct{}{}
for _, fetcher := range allFetchers {
fetcher := fetcher
accountIds[fetcher.GetAccountID()] = struct{}{}
tokens <- struct{}{}
go func() {
Expand Down Expand Up @@ -127,8 +128,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result)
pushErr := push(stream, upsert, toDel)

s.awsSyncStatus.iterationFinished(allFetchers, pushErr, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, fetcher := range allFetchers {
s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -153,8 +156,8 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
}

// getAllAWSSyncFetchers returns all AWS sync fetchers.
func (s *Server) getAllAWSSyncFetchers() []aws_sync.AWSSync {
allFetchers := make([]aws_sync.AWSSync, 0, len(s.dynamicTAGAWSFetchers))
func (s *Server) getAllAWSSyncFetchers() []*aws_sync.Fetcher {
allFetchers := make([]*aws_sync.Fetcher, 0, len(s.dynamicTAGAWSFetchers))

s.muDynamicTAGAWSFetchers.RLock()
for _, fetcherSet := range s.dynamicTAGAWSFetchers {
Expand Down Expand Up @@ -483,8 +486,8 @@ func (s *Server) initTAGAWSWatchers(ctx context.Context, cfg *Config) error {
}

// accessGraphAWSFetchersFromMatchers converts Matchers into a set of AWS Sync Fetchers.
func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]aws_sync.AWSSync, error) {
var fetchers []aws_sync.AWSSync
func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]*aws_sync.Fetcher, error) {
var fetchers []*aws_sync.Fetcher
var errs []error
if matchers.AccessGraph == nil {
return fetchers, nil
Expand All @@ -498,7 +501,7 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher
ExternalID: awsFetcher.AssumeRole.ExternalID,
}
}
fetcher, err := aws_sync.NewAWSFetcher(
fetcher, err := aws_sync.NewFetcher(
ctx,
aws_sync.Config{
AWSConfigProvider: s.AWSConfigProvider,
Expand Down
14 changes: 14 additions & 0 deletions lib/srv/discovery/access_graph_azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func (s *Server) reconcileAccessGraphAzure(
return trace.Wrap(errNoAccessGraphFetchers)
}

for _, fetcher := range allFetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

// Fetch results concurrently
resultsC := make(chan fetcherResult, len(allFetchers))
// Restricts concurrently running fetchers to 3
Expand Down Expand Up @@ -107,6 +114,13 @@ func (s *Server) reconcileAccessGraphAzure(
upsert, toDel := azuresync.ReconcileResults(currentTAGResources, result)
pushErr := azurePush(stream, upsert, toDel)

for _, fetcher := range allFetchers {
s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

if pushErr != nil {
s.Log.ErrorContext(ctx, "Error pushing TAGs", "error", pushErr)
return nil
Expand Down
52 changes: 28 additions & 24 deletions lib/srv/discovery/access_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
testErr := "test error"
clock := clockwork.NewFakeClock()
type args struct {
fetchers []aws_sync.AWSSync
fetchers []*fakeFetcher
pushErr error
preRun bool
}
Expand All @@ -48,8 +48,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
},
Expand All @@ -71,8 +71,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus with pushError",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
},
Expand All @@ -94,8 +94,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus with error",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
err: errors.New(testErr),
Expand All @@ -117,8 +117,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "discar reports for non-discovery config results",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
},
},
Expand All @@ -128,8 +128,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus pre-run",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test",
},
},
Expand All @@ -150,16 +150,16 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test multiple aws sync fetchers",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test1",
count: 1,
},
&fakeFetcher{
{
discoveryConfigName: "test1",
count: 1,
},
&fakeFetcher{
{
discoveryConfigName: "test2",
count: 1,
},
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "merge two errors",
args: args{
fetchers: []aws_sync.AWSSync{
fetchers: []*fakeFetcher{
&fakeFetcher{
discoveryConfigName: "test1",
err: fmt.Errorf("error in fetcher 1"),
Expand All @@ -214,12 +214,12 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "reports error if at least one fetcher fails",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test1",
err: fmt.Errorf("error in fetcher 1"),
},
&fakeFetcher{
{
discoveryConfigName: "test1",
count: 2,
},
Expand Down Expand Up @@ -247,16 +247,20 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
AccessPoint: accessPoint,
clock: clock,
},
awsSyncStatus: awsSyncStatus{},
tagSyncStatus: newTagSyncStatus(),
}

if tt.args.preRun {
s.awsSyncStatus.iterationStarted(tt.args.fetchers, s.clock.Now())
for _, fetcher := range tt.args.fetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
} else {
s.awsSyncStatus.iterationFinished(tt.args.fetchers, tt.args.pushErr, s.clock.Now())
for _, fetcher := range tt.args.fetchers {
s.tagSyncStatus.syncFinished(fetcher, tt.args.pushErr, s.clock.Now())
}
}

for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -270,7 +274,7 @@ func stringPointer(s string) *string {
}

type fakeFetcher struct {
aws_sync.AWSSync
aws_sync.Fetcher
err error
count uint64
discoveryConfigName string
Expand Down
10 changes: 5 additions & 5 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ type Server struct {

// dynamicTAGAWSFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicTAGAWSFetchers map[string][]aws_sync.AWSSync
dynamicTAGAWSFetchers map[string][]*aws_sync.Fetcher
muDynamicTAGAWSFetchers sync.RWMutex
staticTAGAWSFetchers []aws_sync.AWSSync
staticTAGAWSFetchers []*aws_sync.Fetcher

// dynamicTAGAzureFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
Expand All @@ -440,7 +440,7 @@ type Server struct {

dynamicDiscoveryConfig map[string]*discoveryconfig.DiscoveryConfig

awsSyncStatus awsSyncStatus
tagSyncStatus *tagSyncStatus
awsEC2ResourcesStatus awsResourcesStatus
awsRDSResourcesStatus awsResourcesStatus
awsEKSResourcesStatus awsResourcesStatus
Expand Down Expand Up @@ -477,10 +477,10 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGAWSFetchers: make(map[string][]aws_sync.AWSSync),
dynamicTAGAWSFetchers: make(map[string][]*aws_sync.Fetcher),
dynamicTAGAzureFetchers: make(map[string][]*azure_sync.Fetcher),
dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig),
awsSyncStatus: awsSyncStatus{},
tagSyncStatus: newTagSyncStatus(),
awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2),
awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS),
awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS),
Expand Down
44 changes: 15 additions & 29 deletions lib/srv/discovery/fetchers/aws-sync/aws-sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,28 +138,14 @@ type AssumeRole struct {
ExternalID string
}

// awsFetcher is a fetcher that fetches AWS resources.
type awsFetcher struct {
// Fetcher is a fetcher that fetches AWS resources.
type Fetcher struct {
Config
lastError error
lastDiscoveredResources uint64
lastResult *Resources
}

// AWSSync is the interface for fetching AWS resources.
type AWSSync interface {
// Poll polls all AWS resources and returns the result.
Poll(context.Context, Features) (*Resources, error)
// Status reports the last known status of the fetcher.
Status() (uint64, error)
// DiscoveryConfigName returns the name of the Discovery Config.
DiscoveryConfigName() string
// IsFromDiscoveryConfig returns true if the fetcher is associated with a Discovery Config.
IsFromDiscoveryConfig() bool
// GetAccountID returns the AWS account ID.
GetAccountID() string
}

// Resources is a collection of polled AWS resources.
type Resources struct {
// Users is the list of AWS users.
Expand Down Expand Up @@ -249,12 +235,12 @@ func (r *Resources) UsageReport(numberAccounts int) *usageeventsv1.AccessGraphAW
}
}

// NewAWSFetcher creates a new AWS fetcher.
func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) {
// NewFetcher creates a new AWS fetcher.
func NewFetcher(ctx context.Context, cfg Config) (*Fetcher, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
a := &awsFetcher{
a := &Fetcher{
Config: cfg,
lastResult: &Resources{},
}
Expand All @@ -270,14 +256,14 @@ func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) {
// Poll is a blocking call and will return when all resources have been fetched.
// It's possible that the call returns Resources and an error at the same time
// if some resources were fetched successfully and some were not.
func (a *awsFetcher) Poll(ctx context.Context, features Features) (*Resources, error) {
func (a *Fetcher) Poll(ctx context.Context, features Features) (*Resources, error) {
result, err := a.poll(ctx, features)
deduplicateResources(result)
a.storeReport(result, err)
return result, trace.Wrap(err)
}

func (a *awsFetcher) storeReport(rec *Resources, err error) {
func (a *Fetcher) storeReport(rec *Resources, err error) {
a.lastError = err
if rec == nil {
return
Expand All @@ -286,11 +272,11 @@ func (a *awsFetcher) storeReport(rec *Resources, err error) {
a.lastDiscoveredResources = uint64(rec.count())
}

func (a *awsFetcher) GetAccountID() string {
func (a *Fetcher) GetAccountID() string {
return a.AccountID
}

func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, error) {
func (a *Fetcher) poll(ctx context.Context, features Features) (*Resources, error) {
eGroup, ctx := errgroup.WithContext(ctx)
// Set the limit for the number of concurrent pollers running in parallel.
// This is to prevent the number of concurrent pollers from growing too large
Expand Down Expand Up @@ -371,7 +357,7 @@ func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, e

// getAWSOptions returns a list of AWSAssumeRoleOptionFn to be used when
// creating AWS clients.
func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn {
func (a *Fetcher) getAWSOptions() []cloud.AWSOptionsFn {
opts := []cloud.AWSOptionsFn{
cloud.WithCredentialsMaybeIntegration(a.Config.Integration),
}
Expand All @@ -398,7 +384,7 @@ func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn {

// getAWSV2Options returns a list of options to be used when
// creating AWS clients with the v2 sdk.
func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn {
func (a *Fetcher) getAWSV2Options() []awsconfig.OptionsFn {
opts := []awsconfig.OptionsFn{
awsconfig.WithCredentialsMaybeIntegration(a.Config.Integration),
}
Expand All @@ -417,7 +403,7 @@ func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn {
return opts
}

func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) {
func (a *Fetcher) getAccountId(ctx context.Context) (string, error) {
stsClient, err := a.CloudClients.GetAWSSTSClient(
ctx,
"", /* region is empty because groups are global */
Expand All @@ -436,14 +422,14 @@ func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) {
return aws.ToString(req.Account), nil
}

func (a *awsFetcher) DiscoveryConfigName() string {
func (a *Fetcher) DiscoveryConfigName() string {
return a.Config.DiscoveryConfigName
}

func (a *awsFetcher) IsFromDiscoveryConfig() bool {
func (a *Fetcher) IsFromDiscoveryConfig() bool {
return a.Config.DiscoveryConfigName != ""
}

func (a *awsFetcher) Status() (uint64, error) {
func (a *Fetcher) Status() (uint64, error) {
return a.lastDiscoveredResources, a.lastError
}
Loading

0 comments on commit 9fe5c79

Please sign in to comment.