Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure integration status reporting #51391

Merged
merged 46 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
72ff0fa
Protobuf and configuration for Access Graph Azure Discovery
mvbrock Dec 17, 2024
46c787e
Fixing rebase after protobuf gen
mvbrock Dec 18, 2024
661004f
Updating to use existing msgraph client
mvbrock Dec 19, 2024
84e968e
PR feedback
mvbrock Dec 20, 2024
43436cf
Using variadic options
mvbrock Jan 6, 2025
e128cc3
Removing memberOf expansion
mvbrock Jan 6, 2025
d901a23
Expanding memberships by calling memberOf on each user
mvbrock Jan 7, 2025
63fb21f
PR feedback
mvbrock Jan 9, 2025
722ac9e
Rebase go.sum stuff
mvbrock Jan 9, 2025
4cd36e8
Go mod tidy
mvbrock Jan 9, 2025
70f0aa7
Fixing go.mod
mvbrock Jan 9, 2025
4c75f29
Update lib/msgraph/paginated.go
mvbrock Jan 10, 2025
b45cd7d
PR feedback
mvbrock Jan 10, 2025
39fce41
Protobuf and configuration for Access Graph Azure Discovery
mvbrock Dec 17, 2024
a11a168
Adding Azure sync functionality which can be called by the Azure fetcher
mvbrock Dec 17, 2024
eaa6246
Protobuf update
mvbrock Dec 18, 2024
bbde601
Linting
mvbrock Jan 13, 2025
54fa6d2
PR feedback
mvbrock Jan 15, 2025
0ee708c
PR feedback
mvbrock Jan 16, 2025
f8a77ef
Updating to use existing msgraph client
mvbrock Dec 19, 2024
da8b568
PR feedback
mvbrock Dec 20, 2024
a842609
Using variadic options
mvbrock Jan 6, 2025
2e0b291
Removing memberOf expansion
mvbrock Jan 6, 2025
8aa64cf
Expanding memberships by calling memberOf on each user
mvbrock Jan 7, 2025
60b1810
PR feedback
mvbrock Jan 9, 2025
bb160be
Rebase go.sum stuff
mvbrock Jan 9, 2025
0b5214c
PR feedback
mvbrock Jan 10, 2025
0009b19
Protobuf and configuration for Access Graph Azure Discovery
mvbrock Dec 17, 2024
9e3866b
Invoking the Azure fetcher in the Discovery service
mvbrock Dec 17, 2024
8d22ecb
Protobuf gen fix
mvbrock Dec 18, 2024
af01ecc
Rebase fixes
mvbrock Jan 22, 2025
908a6ca
More cleanup
mvbrock Jan 22, 2025
7e45d6d
PR feedback
mvbrock Jan 23, 2025
fa1fcb6
Invoking token generation and returning the response
mvbrock Jan 18, 2025
3870241
Fetching the Azure OIDC token during fetcher creation and establishin…
mvbrock Jan 19, 2025
cd059ed
PR feedback; restricting token requests to auth, discovery, and proxy…
mvbrock Jan 21, 2025
2dde89d
Lint
mvbrock Jan 22, 2025
e766cc2
Rebase fxes
mvbrock Jan 23, 2025
2d91689
Adding back OIDC fetching, accidentally removed it during rebase
mvbrock Jan 23, 2025
3d97967
Initial refactoring to include Azure status reporting
mvbrock Jan 23, 2025
cb43463
Converging status sync between AWS and Azure
mvbrock Jan 23, 2025
ddc55e3
Fixing test
mvbrock Jan 23, 2025
dd7dfd8
Sending usage stats
mvbrock Jan 23, 2025
9400025
Fix imports
mvbrock Jan 23, 2025
dff9837
Add godocs and correct a few comments
mvbrock Jan 24, 2025
26f89ae
Removing the usage events for now
mvbrock Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions lib/srv/discovery/access_graph_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
return trace.Wrap(errNoAccessGraphFetchers)
}

s.awsSyncStatus.iterationStarted(allFetchers, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, fetcher := range allFetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -92,7 +94,6 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
tokens := make(chan struct{}, 3)
accountIds := map[string]struct{}{}
for _, fetcher := range allFetchers {
fetcher := fetcher
accountIds[fetcher.GetAccountID()] = struct{}{}
tokens <- struct{}{}
go func() {
Expand Down Expand Up @@ -127,8 +128,10 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result)
pushErr := push(stream, upsert, toDel)

s.awsSyncStatus.iterationFinished(allFetchers, pushErr, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, fetcher := range allFetchers {
s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -153,8 +156,8 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
}

// getAllAWSSyncFetchers returns all AWS sync fetchers.
func (s *Server) getAllAWSSyncFetchers() []aws_sync.AWSSync {
allFetchers := make([]aws_sync.AWSSync, 0, len(s.dynamicTAGAWSFetchers))
func (s *Server) getAllAWSSyncFetchers() []*aws_sync.Fetcher {
allFetchers := make([]*aws_sync.Fetcher, 0, len(s.dynamicTAGAWSFetchers))

s.muDynamicTAGAWSFetchers.RLock()
for _, fetcherSet := range s.dynamicTAGAWSFetchers {
Expand Down Expand Up @@ -483,8 +486,8 @@ func (s *Server) initTAGAWSWatchers(ctx context.Context, cfg *Config) error {
}

// accessGraphAWSFetchersFromMatchers converts Matchers into a set of AWS Sync Fetchers.
func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]aws_sync.AWSSync, error) {
var fetchers []aws_sync.AWSSync
func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matchers Matchers, discoveryConfigName string) ([]*aws_sync.Fetcher, error) {
var fetchers []*aws_sync.Fetcher
var errs []error
if matchers.AccessGraph == nil {
return fetchers, nil
Expand All @@ -498,7 +501,7 @@ func (s *Server) accessGraphAWSFetchersFromMatchers(ctx context.Context, matcher
ExternalID: awsFetcher.AssumeRole.ExternalID,
}
}
fetcher, err := aws_sync.NewAWSFetcher(
fetcher, err := aws_sync.NewFetcher(
ctx,
aws_sync.Config{
AWSConfigProvider: s.AWSConfigProvider,
Expand Down
14 changes: 14 additions & 0 deletions lib/srv/discovery/access_graph_azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func (s *Server) reconcileAccessGraphAzure(
return trace.Wrap(errNoAccessGraphFetchers)
}

for _, fetcher := range allFetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

// Fetch results concurrently
resultsC := make(chan fetcherResult, len(allFetchers))
// Restricts concurrently running fetchers to 3
Expand Down Expand Up @@ -107,6 +114,13 @@ func (s *Server) reconcileAccessGraphAzure(
upsert, toDel := azuresync.ReconcileResults(currentTAGResources, result)
pushErr := azurePush(stream, upsert, toDel)

for _, fetcher := range allFetchers {
s.tagSyncStatus.syncFinished(fetcher, pushErr, s.clock.Now())
}
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

if pushErr != nil {
s.Log.ErrorContext(ctx, "Error pushing TAGs", "error", pushErr)
return nil
Expand Down
52 changes: 28 additions & 24 deletions lib/srv/discovery/access_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
testErr := "test error"
clock := clockwork.NewFakeClock()
type args struct {
fetchers []aws_sync.AWSSync
fetchers []*fakeFetcher
pushErr error
preRun bool
}
Expand All @@ -48,8 +48,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
},
Expand All @@ -71,8 +71,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus with pushError",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
},
Expand All @@ -94,8 +94,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus with error",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
discoveryConfigName: "test",
err: errors.New(testErr),
Expand All @@ -117,8 +117,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "discar reports for non-discovery config results",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
count: 1,
},
},
Expand All @@ -128,8 +128,8 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test updateDiscoveryConfigStatus pre-run",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test",
},
},
Expand All @@ -150,16 +150,16 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "test multiple aws sync fetchers",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test1",
count: 1,
},
&fakeFetcher{
{
discoveryConfigName: "test1",
count: 1,
},
&fakeFetcher{
{
discoveryConfigName: "test2",
count: 1,
},
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "merge two errors",
args: args{
fetchers: []aws_sync.AWSSync{
fetchers: []*fakeFetcher{
&fakeFetcher{
discoveryConfigName: "test1",
err: fmt.Errorf("error in fetcher 1"),
Expand All @@ -214,12 +214,12 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
{
name: "reports error if at least one fetcher fails",
args: args{
fetchers: []aws_sync.AWSSync{
&fakeFetcher{
fetchers: []*fakeFetcher{
{
discoveryConfigName: "test1",
err: fmt.Errorf("error in fetcher 1"),
},
&fakeFetcher{
{
discoveryConfigName: "test1",
count: 2,
},
Expand Down Expand Up @@ -247,16 +247,20 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
AccessPoint: accessPoint,
clock: clock,
},
awsSyncStatus: awsSyncStatus{},
tagSyncStatus: newTagSyncStatus(),
}

if tt.args.preRun {
s.awsSyncStatus.iterationStarted(tt.args.fetchers, s.clock.Now())
for _, fetcher := range tt.args.fetchers {
s.tagSyncStatus.syncStarted(fetcher, s.clock.Now())
}
} else {
s.awsSyncStatus.iterationFinished(tt.args.fetchers, tt.args.pushErr, s.clock.Now())
for _, fetcher := range tt.args.fetchers {
s.tagSyncStatus.syncFinished(fetcher, tt.args.pushErr, s.clock.Now())
}
}

for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
for _, discoveryConfigName := range s.tagSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}

Expand All @@ -270,7 +274,7 @@ func stringPointer(s string) *string {
}

type fakeFetcher struct {
aws_sync.AWSSync
aws_sync.Fetcher
err error
count uint64
discoveryConfigName string
Expand Down
10 changes: 5 additions & 5 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ type Server struct {

// dynamicTAGAWSFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
dynamicTAGAWSFetchers map[string][]aws_sync.AWSSync
dynamicTAGAWSFetchers map[string][]*aws_sync.Fetcher
muDynamicTAGAWSFetchers sync.RWMutex
staticTAGAWSFetchers []aws_sync.AWSSync
staticTAGAWSFetchers []*aws_sync.Fetcher

// dynamicTAGAzureFetchers holds the current TAG Fetchers for the Dynamic Matchers (those coming from DiscoveryConfig resource).
// The key is the DiscoveryConfig name.
Expand All @@ -440,7 +440,7 @@ type Server struct {

dynamicDiscoveryConfig map[string]*discoveryconfig.DiscoveryConfig

awsSyncStatus awsSyncStatus
tagSyncStatus *tagSyncStatus
awsEC2ResourcesStatus awsResourcesStatus
awsRDSResourcesStatus awsResourcesStatus
awsEKSResourcesStatus awsResourcesStatus
Expand Down Expand Up @@ -477,10 +477,10 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
dynamicServerAWSFetchers: make(map[string][]server.Fetcher),
dynamicServerAzureFetchers: make(map[string][]server.Fetcher),
dynamicServerGCPFetchers: make(map[string][]server.Fetcher),
dynamicTAGAWSFetchers: make(map[string][]aws_sync.AWSSync),
dynamicTAGAWSFetchers: make(map[string][]*aws_sync.Fetcher),
dynamicTAGAzureFetchers: make(map[string][]*azure_sync.Fetcher),
dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig),
awsSyncStatus: awsSyncStatus{},
tagSyncStatus: newTagSyncStatus(),
awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2),
awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS),
awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS),
Expand Down
44 changes: 15 additions & 29 deletions lib/srv/discovery/fetchers/aws-sync/aws-sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,28 +138,14 @@ type AssumeRole struct {
ExternalID string
}

// awsFetcher is a fetcher that fetches AWS resources.
type awsFetcher struct {
// Fetcher is a fetcher that fetches AWS resources.
type Fetcher struct {
Config
lastError error
lastDiscoveredResources uint64
lastResult *Resources
}

// AWSSync is the interface for fetching AWS resources.
type AWSSync interface {
// Poll polls all AWS resources and returns the result.
Poll(context.Context, Features) (*Resources, error)
// Status reports the last known status of the fetcher.
Status() (uint64, error)
// DiscoveryConfigName returns the name of the Discovery Config.
DiscoveryConfigName() string
// IsFromDiscoveryConfig returns true if the fetcher is associated with a Discovery Config.
IsFromDiscoveryConfig() bool
// GetAccountID returns the AWS account ID.
GetAccountID() string
}

// Resources is a collection of polled AWS resources.
type Resources struct {
// Users is the list of AWS users.
Expand Down Expand Up @@ -249,12 +235,12 @@ func (r *Resources) UsageReport(numberAccounts int) *usageeventsv1.AccessGraphAW
}
}

// NewAWSFetcher creates a new AWS fetcher.
func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) {
// NewFetcher creates a new AWS fetcher.
func NewFetcher(ctx context.Context, cfg Config) (*Fetcher, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
a := &awsFetcher{
a := &Fetcher{
Config: cfg,
lastResult: &Resources{},
}
Expand All @@ -270,14 +256,14 @@ func NewAWSFetcher(ctx context.Context, cfg Config) (AWSSync, error) {
// Poll is a blocking call and will return when all resources have been fetched.
// It's possible that the call returns Resources and an error at the same time
// if some resources were fetched successfully and some were not.
func (a *awsFetcher) Poll(ctx context.Context, features Features) (*Resources, error) {
func (a *Fetcher) Poll(ctx context.Context, features Features) (*Resources, error) {
result, err := a.poll(ctx, features)
deduplicateResources(result)
a.storeReport(result, err)
return result, trace.Wrap(err)
}

func (a *awsFetcher) storeReport(rec *Resources, err error) {
func (a *Fetcher) storeReport(rec *Resources, err error) {
a.lastError = err
if rec == nil {
return
Expand All @@ -286,11 +272,11 @@ func (a *awsFetcher) storeReport(rec *Resources, err error) {
a.lastDiscoveredResources = uint64(rec.count())
}

func (a *awsFetcher) GetAccountID() string {
func (a *Fetcher) GetAccountID() string {
return a.AccountID
}

func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, error) {
func (a *Fetcher) poll(ctx context.Context, features Features) (*Resources, error) {
eGroup, ctx := errgroup.WithContext(ctx)
// Set the limit for the number of concurrent pollers running in parallel.
// This is to prevent the number of concurrent pollers from growing too large
Expand Down Expand Up @@ -371,7 +357,7 @@ func (a *awsFetcher) poll(ctx context.Context, features Features) (*Resources, e

// getAWSOptions returns a list of AWSAssumeRoleOptionFn to be used when
// creating AWS clients.
func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn {
func (a *Fetcher) getAWSOptions() []cloud.AWSOptionsFn {
opts := []cloud.AWSOptionsFn{
cloud.WithCredentialsMaybeIntegration(a.Config.Integration),
}
Expand All @@ -398,7 +384,7 @@ func (a *awsFetcher) getAWSOptions() []cloud.AWSOptionsFn {

// getAWSV2Options returns a list of options to be used when
// creating AWS clients with the v2 sdk.
func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn {
func (a *Fetcher) getAWSV2Options() []awsconfig.OptionsFn {
opts := []awsconfig.OptionsFn{
awsconfig.WithCredentialsMaybeIntegration(a.Config.Integration),
}
Expand All @@ -417,7 +403,7 @@ func (a *awsFetcher) getAWSV2Options() []awsconfig.OptionsFn {
return opts
}

func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) {
func (a *Fetcher) getAccountId(ctx context.Context) (string, error) {
stsClient, err := a.CloudClients.GetAWSSTSClient(
ctx,
"", /* region is empty because groups are global */
Expand All @@ -436,14 +422,14 @@ func (a *awsFetcher) getAccountId(ctx context.Context) (string, error) {
return aws.ToString(req.Account), nil
}

func (a *awsFetcher) DiscoveryConfigName() string {
func (a *Fetcher) DiscoveryConfigName() string {
return a.Config.DiscoveryConfigName
}

func (a *awsFetcher) IsFromDiscoveryConfig() bool {
func (a *Fetcher) IsFromDiscoveryConfig() bool {
return a.Config.DiscoveryConfigName != ""
}

func (a *awsFetcher) Status() (uint64, error) {
func (a *Fetcher) Status() (uint64, error) {
return a.lastDiscoveredResources, a.lastError
}
Loading
Loading