Skip to content

Commit

Permalink
Initial refactoring to include Azure status reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
mvbrock committed Jan 23, 2025
1 parent c86b4a3 commit d1bc0b5
Show file tree
Hide file tree
Showing 6 changed files with 772 additions and 390 deletions.
1,092 changes: 729 additions & 363 deletions api/gen/proto/go/usageevents/v1/usageevents.pb.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions api/proto/teleport/usageevents/v1/usageevents.proto
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,19 @@ message AccessGraphAWSScanEvent {
uint64 total_accounts = 11;
}

// AccessGraphAzureScanEvent is emitted when the Access Graph
// Azure scan is enabled.
message AccessGraphAzureScanEvent {
// total_vms is the total amount of virtual machines found in the Azure scan.
uint64 total_vms = 1;
// total_principals is the total amount of users found in the Azure scan.
uint64 total_principals = 2;
// total_role_definitions is the total amount of roles definitions found in the Azure scan.
uint64 total_role_definitions = 3;
// total_role_assignments is the total amount of roles assignments found in the Azure scan.
uint64 total_role_assignments = 4;
}

// UIAccessGraphCrownJewelDiffViewEvent is emitted when a user reviews the output of a Crown Jewel access path dff.
message UIAccessGraphCrownJewelDiffViewEvent {
// affected_resource_source is the source of the affected resource.
Expand Down Expand Up @@ -899,6 +912,7 @@ message UsageEventOneOf {
UIAccessGraphCrownJewelDiffViewEvent ui_access_graph_crown_jewel_diff_view = 59;
UserTaskStateEvent user_task_state_event = 60;
UIIntegrationEnrollStepEvent ui_integration_enroll_step_event = 61;
AccessGraphAzureScanEvent access_graph_azure_scan_event = 62;
}
reserved 2; //UIOnboardGetStartedClickEvent
reserved "ui_onboard_get_started_click";
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/discovery/access_graph_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
return trace.Wrap(errNoAccessGraphFetchers)
}

s.awsSyncStatus.iterationStarted(allFetchers, s.clock.Now())
s.awsSyncStatus.awsIterationStarted(allFetchers, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources *
upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, result)
pushErr := push(stream, upsert, toDel)

s.awsSyncStatus.iterationFinished(allFetchers, pushErr, s.clock.Now())
s.awsSyncStatus.awsIterationFinished(allFetchers, pushErr, s.clock.Now())
for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
s.updateDiscoveryConfigStatus(discoveryConfigName)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/srv/discovery/access_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ func TestServer_updateDiscoveryConfigStatus(t *testing.T) {
AccessPoint: accessPoint,
clock: clock,
},
awsSyncStatus: awsSyncStatus{},
awsSyncStatus: tagSyncStatus{},
}

if tt.args.preRun {
s.awsSyncStatus.iterationStarted(tt.args.fetchers, s.clock.Now())
s.awsSyncStatus.awsIterationStarted(tt.args.fetchers, s.clock.Now())
} else {
s.awsSyncStatus.iterationFinished(tt.args.fetchers, tt.args.pushErr, s.clock.Now())
s.awsSyncStatus.awsIterationFinished(tt.args.fetchers, tt.args.pushErr, s.clock.Now())
}

for _, discoveryConfigName := range s.awsSyncStatus.discoveryConfigs() {
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ type Server struct {

dynamicDiscoveryConfig map[string]*discoveryconfig.DiscoveryConfig

awsSyncStatus awsSyncStatus
awsSyncStatus tagSyncStatus
awsEC2ResourcesStatus awsResourcesStatus
awsRDSResourcesStatus awsResourcesStatus
awsEKSResourcesStatus awsResourcesStatus
Expand Down Expand Up @@ -480,7 +480,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
dynamicTAGAWSFetchers: make(map[string][]aws_sync.AWSSync),
dynamicTAGAzureFetchers: make(map[string][]*azure_sync.Fetcher),
dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig),
awsSyncStatus: awsSyncStatus{},
awsSyncStatus: tagSyncStatus{},
awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2),
awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS),
awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS),
Expand Down
42 changes: 22 additions & 20 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) {
// Merge AWS Sync (TAG) status
discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

// Merge Azure Sync (TAG) status

// Merge AWS EC2 Instances (auto discovery) status
discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

Expand All @@ -84,16 +86,16 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) {
}
}

// awsSyncStatus contains all the status for aws_sync Fetchers grouped by DiscoveryConfig.
type awsSyncStatus struct {
// tagSyncStatus contains all the status for both AWS and Azure fetchers grouped by DiscoveryConfig.
type tagSyncStatus struct {
mu sync.RWMutex
// awsSyncResults maps the DiscoveryConfig name to a aws_sync result.
// Each DiscoveryConfig might have multiple `aws_sync` matchers.
awsSyncResults map[string][]awsSyncResult
// syncResults maps the DiscoveryConfig name to a AWS or Azure fetcher result.
// Each DiscoveryConfig might have multiple AWS or Azure matchers.
syncResults map[string][]tagSyncResult
}

// awsSyncResult stores the result of the aws_sync Matchers for a given DiscoveryConfig.
type awsSyncResult struct {
// tagSyncResult stores the result of the aws_sync Matchers for a given DiscoveryConfig.
type tagSyncResult struct {
// state is the State for the DiscoveryConfigStatus.
// Allowed values are:
// - DISCOVERY_CONFIG_STATE_SYNCING
Expand All @@ -105,11 +107,11 @@ type awsSyncResult struct {
discoveredResources uint64
}

func (d *awsSyncStatus) iterationFinished(fetchers []aws_sync.AWSSync, pushErr error, lastUpdate time.Time) {
func (d *tagSyncStatus) awsIterationFinished(fetchers []aws_sync.AWSSync, pushErr error, lastUpdate time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

d.awsSyncResults = make(map[string][]awsSyncResult)
d.syncResults = make(map[string][]tagSyncResult)
for _, fetcher := range fetchers {
// Only update the status for fetchers that are from the discovery config.
if !fetcher.IsFromDiscoveryConfig() {
Expand All @@ -119,7 +121,7 @@ func (d *awsSyncStatus) iterationFinished(fetchers []aws_sync.AWSSync, pushErr e
count, statusErr := fetcher.Status()
statusAndPushErr := trace.NewAggregate(statusErr, pushErr)

fetcherResult := awsSyncResult{
fetcherResult := tagSyncResult{
state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_RUNNING.String(),
lastSyncTime: lastUpdate,
discoveredResources: count,
Expand All @@ -131,46 +133,46 @@ func (d *awsSyncStatus) iterationFinished(fetchers []aws_sync.AWSSync, pushErr e
fetcherResult.state = discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_ERROR.String()
}

d.awsSyncResults[fetcher.DiscoveryConfigName()] = append(d.awsSyncResults[fetcher.DiscoveryConfigName()], fetcherResult)
d.syncResults[fetcher.DiscoveryConfigName()] = append(d.syncResults[fetcher.DiscoveryConfigName()], fetcherResult)
}
}

func (d *awsSyncStatus) discoveryConfigs() []string {
func (d *tagSyncStatus) discoveryConfigs() []string {
d.mu.RLock()
defer d.mu.RUnlock()

ret := make([]string, 0, len(d.awsSyncResults))
for k := range d.awsSyncResults {
ret := make([]string, 0, len(d.syncResults))
for k := range d.syncResults {
ret = append(ret, k)
}
return ret
}

func (d *awsSyncStatus) iterationStarted(fetchers []aws_sync.AWSSync, lastUpdate time.Time) {
func (d *tagSyncStatus) awsIterationStarted(fetchers []aws_sync.AWSSync, lastUpdate time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

d.awsSyncResults = make(map[string][]awsSyncResult)
d.syncResults = make(map[string][]tagSyncResult)
for _, fetcher := range fetchers {
// Only update the status for fetchers that are from the discovery config.
if !fetcher.IsFromDiscoveryConfig() {
continue
}

fetcherResult := awsSyncResult{
fetcherResult := tagSyncResult{
state: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(),
lastSyncTime: lastUpdate,
}

d.awsSyncResults[fetcher.DiscoveryConfigName()] = append(d.awsSyncResults[fetcher.DiscoveryConfigName()], fetcherResult)
d.syncResults[fetcher.DiscoveryConfigName()] = append(d.syncResults[fetcher.DiscoveryConfigName()], fetcherResult)
}
}

func (d *awsSyncStatus) mergeIntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status {
func (d *tagSyncStatus) mergeIntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status {
d.mu.RLock()
defer d.mu.RUnlock()

awsStatusFetchers, found := d.awsSyncResults[discoveryConfigName]
awsStatusFetchers, found := d.syncResults[discoveryConfigName]
if !found {
return existingStatus
}
Expand Down

0 comments on commit d1bc0b5

Please sign in to comment.