Skip to content

Commit

Permalink
Merge branch 'branch/v17' into bot/backport-51678-branch/v17
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenGravy authored Jan 31, 2025
2 parents 6f9215e + 4af89a9 commit a9f7b23
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 66 deletions.
14 changes: 4 additions & 10 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (s *Server) startDatabaseWatchers() error {

go func() {
for {
discoveryConfigsChanged := map[string]struct{}{}
resourcesFoundByGroup := make(map[awsResourceGroup]int)

select {
Expand All @@ -101,7 +100,6 @@ func (s *Server) startDatabaseWatchers() error {

resourceGroup := awsResourceGroupFromLabels(db.GetStaticLabels())
resourcesFoundByGroup[resourceGroup] += 1
discoveryConfigsChanged[resourceGroup.discoveryConfigName] = struct{}{}

dbs = append(dbs, db)

Expand Down Expand Up @@ -138,9 +136,6 @@ func (s *Server) startDatabaseWatchers() error {
return
}

for dc := range discoveryConfigsChanged {
s.updateDiscoveryConfigStatus(dc)
}
s.upsertTasksForAWSRDSFailedEnrollments()
}
}()
Expand Down Expand Up @@ -205,16 +200,15 @@ func (s *Server) databaseWatcherIterationStarted() {
},
)

for _, g := range awsResultGroups {
s.awsRDSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := slices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
for _, g := range awsResultGroups {
s.awsRDSResourcesStatus.iterationStarted(g)
}

s.awsRDSTasks.reset()
}

Expand Down
29 changes: 6 additions & 23 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(s.ec2WatcherIterationStarted),
server.WithClock(s.clock),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -543,15 +544,14 @@ func (s *Server) ec2WatcherIterationStarted() {
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEC2ResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)
s.awsEC2ResourcesStatus.reset()
for _, g := range awsResultGroups {
s.awsEC2ResourcesStatus.iterationStarted(g)
}

s.awsEC2Tasks.reset()
}
Expand Down Expand Up @@ -699,15 +699,7 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa
s.ctx, s.getAllAzureServerFetchers,
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getAllAzureServerFetchers(),
func(f server.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)
}),
server.WithClock(s.clock),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -766,15 +758,7 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC
s.ctx, s.getAllGCPServerFetchers,
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getAllGCPServerFetchers(),
func(f server.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)
}),
server.WithClock(s.clock),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -1202,7 +1186,6 @@ func (s *Server) handleEC2Discovery() {
s.logHandleInstancesErr(err)
}

s.updateDiscoveryConfigStatus(instances.EC2.DiscoveryConfigName)
s.upsertTasksForAWSEC2FailedEnrollments()
case <-s.ctx.Done():
s.ec2Watcher.Stop()
Expand Down
55 changes: 38 additions & 17 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,12 +970,22 @@ func TestDiscoveryServer(t *testing.T) {
if tc.wantDiscoveryConfigStatus != nil {
// It can take a while for the status to be updated.
require.Eventually(t, func() bool {
fakeClock.Advance(server.PollInterval)
storedDiscoveryConfig, err := tlsServer.Auth().DiscoveryConfigs.GetDiscoveryConfig(ctx, tc.discoveryConfig.GetName())
require.NoError(t, err)
if len(storedDiscoveryConfig.Status.IntegrationDiscoveredResources) == 0 {
return false
}
require.Equal(t, *tc.wantDiscoveryConfigStatus, storedDiscoveryConfig.Status)
want := *tc.wantDiscoveryConfigStatus
got := storedDiscoveryConfig.Status

require.Equal(t, want.State, got.State)
require.Equal(t, want.DiscoveredResources, got.DiscoveredResources)
require.Equal(t, want.ErrorMessage, got.ErrorMessage)
for expectedKey, expectedValue := range want.IntegrationDiscoveredResources {
require.Contains(t, got.IntegrationDiscoveredResources, expectedKey)
require.Equal(t, expectedValue, got.IntegrationDiscoveredResources[expectedKey])
}
return true
}, 500*time.Millisecond, 50*time.Millisecond)
}
Expand Down Expand Up @@ -2141,16 +2151,17 @@ func TestDiscoveryDatabase(t *testing.T) {
}

tcs := []struct {
name string
existingDatabases []types.Database
integrationsOnlyCredentials bool
awsMatchers []types.AWSMatcher
azureMatchers []types.AzureMatcher
expectDatabases []types.Database
discoveryConfigs func(*testing.T) []*discoveryconfig.DiscoveryConfig
discoveryConfigStatusCheck func(*testing.T, discoveryconfig.Status)
userTasksCheck func(*testing.T, []*usertasksv1.UserTask)
wantEvents int
name string
existingDatabases []types.Database
integrationsOnlyCredentials bool
awsMatchers []types.AWSMatcher
azureMatchers []types.AzureMatcher
expectDatabases []types.Database
discoveryConfigs func(*testing.T) []*discoveryconfig.DiscoveryConfig
discoveryConfigStatusCheck func(*testing.T, discoveryconfig.Status)
discoveryConfigStatusExpectedResources int
userTasksCheck func(*testing.T, []*usertasksv1.UserTask)
wantEvents int
}{
{
name: "discover AWS database",
Expand Down Expand Up @@ -2349,11 +2360,11 @@ func TestDiscoveryDatabase(t *testing.T) {
},
wantEvents: 1,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(1), s.DiscoveredResources)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Enrolled)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Found)
require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsRds.Failed)
},
discoveryConfigStatusExpectedResources: 1,
},
{
name: "running in integrations-only-mode with a matcher without an integration, must find 1 database",
Expand Down Expand Up @@ -2383,10 +2394,10 @@ func TestDiscoveryDatabase(t *testing.T) {
expectDatabases: []types.Database{},
wantEvents: 0,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(1), s.DiscoveredResources)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsEks.Found)
require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled)
},
discoveryConfigStatusExpectedResources: 1,
},
{
name: "discovery config status must be updated even when there are no resources",
Expand All @@ -2405,9 +2416,9 @@ func TestDiscoveryDatabase(t *testing.T) {
expectDatabases: []types.Database{},
wantEvents: 0,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(0), s.DiscoveredResources)
require.Equal(t, "DISCOVERY_CONFIG_STATE_SYNCING", s.State)
},
discoveryConfigStatusExpectedResources: 0,
},
{
name: "discover-rds user task must be created when database is not configured to allow IAM DB Authentication",
Expand Down Expand Up @@ -2452,6 +2463,7 @@ func TestDiscoveryDatabase(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
fakeClock := clockwork.NewFakeClock()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down Expand Up @@ -2499,6 +2511,7 @@ func TestDiscoveryDatabase(t *testing.T) {
waitForReconcile <- struct{}{}
},
DiscoveryGroup: mainDiscoveryGroup,
clock: fakeClock,
})

require.NoError(t, err)
Expand Down Expand Up @@ -2548,10 +2561,18 @@ func TestDiscoveryDatabase(t *testing.T) {
}

if tc.discoveryConfigStatusCheck != nil {
dc, err := tlsServer.Auth().GetDiscoveryConfig(ctx, discoveryConfigName)
require.NoError(t, err)
require.Eventually(t, func() bool {
fakeClock.Advance(srv.PollInterval * 2)
dc, err := tlsServer.Auth().GetDiscoveryConfig(ctx, discoveryConfigName)
require.NoError(t, err)
if tc.discoveryConfigStatusExpectedResources != int(dc.Status.DiscoveredResources) {
return false
}

tc.discoveryConfigStatusCheck(t, dc.Status)
return true
}, time.Second, 100*time.Millisecond)

tc.discoveryConfigStatusCheck(t, dc.Status)
}
if tc.userTasksCheck != nil {
var userTasks []*usertasksv1.UserTask
Expand Down
16 changes: 5 additions & 11 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *Server) startKubeIntegrationWatchers() error {
Origin: types.OriginCloud,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
PreFetchHookFn: s.kubernetesIntegrationWatcherIterationStarted,
Clock: s.clock,
})
if err != nil {
return trace.Wrap(err)
Expand All @@ -86,7 +87,6 @@ func (s *Server) startKubeIntegrationWatchers() error {

go func() {
for {
discoveryConfigsChanged := map[string]struct{}{}
resourcesFoundByGroup := make(map[awsResourceGroup]int)
resourcesEnrolledByGroup := make(map[awsResourceGroup]int)

Expand Down Expand Up @@ -124,7 +124,6 @@ func (s *Server) startKubeIntegrationWatchers() error {

resourceGroup := awsResourceGroupFromLabels(newCluster.GetStaticLabels())
resourcesFoundByGroup[resourceGroup] += 1
discoveryConfigsChanged[resourceGroup.discoveryConfigName] = struct{}{}

if enrollingClusters[newCluster.GetAWSConfig().Name] ||
slices.ContainsFunc(existingServers, func(c types.KubeServer) bool { return c.GetName() == newCluster.GetName() }) ||
Expand Down Expand Up @@ -175,10 +174,6 @@ func (s *Server) startKubeIntegrationWatchers() error {
for group, count := range resourcesEnrolledByGroup {
s.awsEKSResourcesStatus.incrementEnrolled(group, count)
}

for dc := range discoveryConfigsChanged {
s.updateDiscoveryConfigStatus(dc)
}
}
}()
return nil
Expand All @@ -203,16 +198,16 @@ func (s *Server) kubernetesIntegrationWatcherIterationStarted() {
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEKSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
for _, g := range awsResultGroups {
s.awsEKSResourcesStatus.iterationStarted(g)
}

s.awsEKSTasks.reset()
}

Expand All @@ -232,7 +227,6 @@ func (s *Server) enrollEKSClusters(region, integration, discoveryConfigName stri
}
mu.Unlock()

s.updateDiscoveryConfigStatus(discoveryConfigName)
s.upsertTasksForAWSEKSFailedEnrollments()
}()

Expand Down
2 changes: 0 additions & 2 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,6 @@ func (s *Server) ReportEC2SSMInstallationResult(ctx context.Context, result *ser
integration: result.IntegrationName,
}, 1)

s.updateDiscoveryConfigStatus(result.DiscoveryConfigName)

s.awsEC2Tasks.addFailedEnrollment(
awsEC2TaskKey{
integration: result.IntegrationName,
Expand Down
2 changes: 2 additions & 0 deletions lib/srv/server/azure_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v3"
"github.com/aws/aws-sdk-go/aws"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewAzureWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...O
ctx: cancelCtx,
cancel: cancelFn,
pollInterval: time.Minute,
clock: clockwork.NewRealClock(),
triggerFetchC: make(<-chan struct{}),
InstancesC: make(chan Instances),
}
Expand Down
2 changes: 2 additions & 0 deletions lib/srv/server/ec2_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
Expand Down Expand Up @@ -177,6 +178,7 @@ func NewEC2Watcher(ctx context.Context, fetchersFn func() []Fetcher, missedRotat
fetchersFn: fetchersFn,
ctx: cancelCtx,
cancel: cancelFn,
clock: clockwork.NewRealClock(),
pollInterval: time.Minute,
triggerFetchC: make(<-chan struct{}),
InstancesC: make(chan Instances),
Expand Down
2 changes: 2 additions & 0 deletions lib/srv/server/gcp_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

usageeventsv1 "github.com/gravitational/teleport/api/gen/proto/go/usageevents/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewGCPWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...Opt
fetchersFn: fetchersFn,
ctx: cancelCtx,
cancel: cancelFn,
clock: clockwork.NewRealClock(),
pollInterval: time.Minute,
triggerFetchC: make(<-chan struct{}),
InstancesC: make(chan Instances),
Expand Down
Loading

0 comments on commit a9f7b23

Please sign in to comment.