Skip to content

Commit

Permalink
Simplify RA pausing unit tests (#7868)
Browse files Browse the repository at this point in the history
Greatly simplify the two RA unit tests covering failed validations and
account+identifier pausing. Most importantly, directly manipulate the
ratelimit backing store during test setup, to avoid having to "perform"
extra validations.

Fixes #7812
  • Loading branch information
aarongable authored Dec 4, 2024
1 parent d962c61 commit aac7c22
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 244 deletions.
311 changes: 71 additions & 240 deletions ra/ra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,145 +773,53 @@ func TestPerformValidationSuccess(t *testing.T) {
test.Assert(t, *challenge.Validated == expectedValidated, "Validated timestamp incorrect or missing")
}

type mockSAPaused struct {
sync.RWMutex
// mockSAWithSyncPause is a mock sapb.StorageAuthorityClient that forwards all
// method calls to an inner SA, but also performs a blocking write to a channel
// when PauseIdentifiers is called to allow the tests to synchronize.
type mockSAWithSyncPause struct {
sapb.StorageAuthorityClient
authorizationsForRegID map[int64]*corepb.Authorization
identifiersForRegID map[int64][]*corepb.Identifier
registrationsForRegID map[int64]*corepb.Registration
out chan<- *sapb.PauseRequest
}

func newMockSAPaused(sa sapb.StorageAuthorityClient) *mockSAPaused {
return &mockSAPaused{
StorageAuthorityClient: sa,
authorizationsForRegID: make(map[int64]*corepb.Authorization),
identifiersForRegID: make(map[int64][]*corepb.Identifier),
registrationsForRegID: make(map[int64]*corepb.Registration),
}
}

func (msa *mockSAPaused) GetRegistration(ctx context.Context, req *sapb.RegistrationID, _ ...grpc.CallOption) (*corepb.Registration, error) {
msa.Lock()
defer msa.Unlock()
regPB, ok := msa.registrationsForRegID[req.Id]
if !ok {
return nil, fmt.Errorf("Unable to find registration for regID %d", req.Id)
}
return regPB, nil
}

func (msa *mockSAPaused) PauseIdentifiers(ctx context.Context, req *sapb.PauseRequest, _ ...grpc.CallOption) (*sapb.PauseIdentifiersResponse, error) {
msa.Lock()
defer msa.Unlock()
if len(req.Identifiers) <= 0 {
return nil, fmt.Errorf("No identifiers found to pause")
}
msa.identifiersForRegID[req.RegistrationID] = req.Identifiers

counts := make(map[int64]int64)
for range msa.identifiersForRegID {
counts[req.RegistrationID]++
}
return &sapb.PauseIdentifiersResponse{Paused: counts[req.RegistrationID]}, nil
}

func (msa *mockSAPaused) GetPausedIdentifiers(ctx context.Context, req *sapb.RegistrationID, _ ...grpc.CallOption) (*sapb.Identifiers, error) {
msa.Lock()
defer msa.Unlock()
_, ok := msa.registrationsForRegID[req.Id]
if !ok {
return nil, fmt.Errorf("Unable to find registration for regID %d", req.Id)
}
idents, ok := msa.identifiersForRegID[req.Id]
if !ok {
return nil, fmt.Errorf("No identifiers paused yet")
}
return &sapb.Identifiers{Identifiers: idents}, nil
}

func (msa *mockSAPaused) FinalizeAuthorization2(ctx context.Context, req *sapb.FinalizeAuthorizationRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
func (msa mockSAWithSyncPause) PauseIdentifiers(ctx context.Context, req *sapb.PauseRequest, _ ...grpc.CallOption) (*sapb.PauseIdentifiersResponse, error) {
res, err := msa.StorageAuthorityClient.PauseIdentifiers(ctx, req)
msa.out <- req
return res, err
}

func TestPerformValidation_FailedValidationsTriggerPauseIdentifiersRatelimit(t *testing.T) {
va, sa, ra, redisSrc, fc, cleanUp := initAuthorities(t)
va, sa, ra, rl, fc, cleanUp := initAuthorities(t)
defer cleanUp()

if ra.limiter == nil {
t.Skip("no redis limiter configured")
}

features.Set(features.Config{AutomaticallyPauseZombieClients: true})
defer features.Reset()

mockSA := newMockSAPaused(sa)
ra.SA = mockSA
// Replace the SA with one that will block when PauseIdentifiers is called.
pauseChan := make(chan *sapb.PauseRequest)
defer close(pauseChan)
ra.SA = mockSAWithSyncPause{
StorageAuthorityClient: ra.SA,
out: pauseChan,
}

// Override the default ratelimits to only allow one failed validation.
// Override the default ratelimits to only allow one failed validation per 24 hours.
txnBuilder, err := ratelimits.NewTransactionBuilder("testdata/one-failed-validation-before-pausing.yml", "")
test.AssertNotError(t, err, "making transaction composer")
ra.txnBuilder = txnBuilder

// We know this is OK because of TestNewAuthorization
// Set up a fake domain, authz, and bucket key to care about.
domain := randomDomain()
authzPB := createPendingAuthorization(t, sa, domain, fc.Now().Add(12*time.Hour))
mockSA.registrationsForRegID[authzPB.RegistrationID] = Registration
mockSA.authorizationsForRegID[authzPB.RegistrationID] = authzPB

// We induce the bad path by setting a problem. This will consume all
// available capacity in the rate limit bucket.
va.PerformValidationRequestResultReturn = &vapb.ValidationResult{
Records: []*corepb.ValidationRecord{
{
AddressUsed: []byte("192.168.0.1"),
Hostname: domain,
Port: "8080",
Url: fmt.Sprintf("http://%s/", domain),
ResolverAddrs: []string{"rebound"},
},
},
Problem: &corepb.ProblemDetails{
Detail: fmt.Sprintf("CAA invalid for %s", domain),
},
}

challIdx := dnsChallIdx(t, authzPB.Challenges)
authzPB, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
Authz: authzPB,
ChallengeIndex: challIdx,
})
test.AssertNotError(t, err, "PerformValidation failed")

select {
case r := <-va.performValidationRequest:
_ = r
case <-time.After(time.Second):
t.Fatal("Timed out waiting for DummyValidationAuthority.PerformValidation to complete")
}

// Sleep so the RA has a chance to write to the SA
time.Sleep(100 * time.Millisecond)

got, err := ra.SA.GetPausedIdentifiers(ctx, &sapb.RegistrationID{Id: authzPB.RegistrationID}, nil)
test.AssertError(t, err, "Should not have any paused identifiers yet, but found some")
test.AssertBoxedNil(t, got, "Should have received nil response, but did not")
test.AssertMetricWithLabelsEquals(t, ra.pauseCounter, prometheus.Labels{"paused": "false", "repaused": "false", "grace": "false"}, 0)

// We need the bucket key to scan for in Redis
bucketKey, err := ratelimits.NewRegIdDomainBucketKey(ratelimits.FailedAuthorizationsForPausingPerDomainPerAccount, authzPB.RegistrationID, domain)
test.AssertNotError(t, err, "Should have been able to construct bucket key, but could not")
test.AssertNotError(t, err, "constructing test bucket key")

// Verify that a redis entry exists for this accountID:identifier.
tat, err := redisSrc.Get(ctx, bucketKey)
test.AssertNotError(t, err, "Should not have errored, but did")

// There is no more capacity and the next failed validation will effectively
// pause issuance attempts. The ratelimit file is written to increment
// capacity every 24 hours, so we can check that the TAT states that, not
// that it particularly matters in this context.
test.AssertEquals(t, tat, fc.Now().Add(24*time.Hour))
// Set the stored TAT to indicate that this bucket has exhausted its quota.
err = rl.BatchSet(context.Background(), map[string]time.Time{
bucketKey: fc.Now().Add(25 * time.Hour),
})
test.AssertNotError(t, err, "updating rate limit bucket")

// A second failed validation should result in the identifier being paused
// Now a failed validation should result in the identifier being paused
// due to the strict ratelimit.
va.PerformValidationRequestResultReturn = &vapb.ValidationResult{
Records: []*corepb.ValidationRecord{
Expand All @@ -928,129 +836,63 @@ func TestPerformValidation_FailedValidationsTriggerPauseIdentifiersRatelimit(t *
},
}

challIdx = dnsChallIdx(t, authzPB.Challenges)
authzPB, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
_, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
Authz: authzPB,
ChallengeIndex: challIdx,
ChallengeIndex: dnsChallIdx(t, authzPB.Challenges),
})
test.AssertNotError(t, err, "PerformValidation failed")

select {
case r := <-va.performValidationRequest:
_ = r
case <-time.After(time.Second):
t.Fatal("Timed out waiting for DummyValidationAuthority.PerformValidation to complete")
}

// Sleep so the RA has a chance to write to the SA
time.Sleep(100 * time.Millisecond)
// Wait for the RA to finish processing the validation, and ensure that the paused
// account+identifier is what we expect.
paused := <-pauseChan
test.AssertEquals(t, len(paused.Identifiers), 1)
test.AssertEquals(t, paused.Identifiers[0].Value, domain)
}

// Ensure the identifier:account:domain we expect to be paused actually is.
got, err = ra.SA.GetPausedIdentifiers(ctx, &sapb.RegistrationID{Id: authzPB.RegistrationID}, nil)
test.AssertNotError(t, err, "Should not have errored getting paused identifiers")
test.AssertEquals(t, len(got.Identifiers), 1)
test.AssertEquals(t, got.Identifiers[0].Value, domain)
test.AssertMetricWithLabelsEquals(t, ra.pauseCounter, prometheus.Labels{"paused": "true", "repaused": "false", "grace": "false"}, 1)
// mockRLSourceWithSyncDelete is a mock ratelimits.Source that forwards all
// method calls to an inner Source, but also performs a blocking write to a
// channel when Delete is called to allow the tests to synchronize.
type mockRLSourceWithSyncDelete struct {
ratelimits.Source
out chan<- string
}

err = ra.limiter.Reset(ctx, bucketKey)
test.AssertNotError(t, err, "Failed cleaning up redis")
func (rl mockRLSourceWithSyncDelete) Delete(ctx context.Context, bucketKey string) error {
err := rl.Source.Delete(ctx, bucketKey)
rl.out <- bucketKey
return err
}

func TestPerformValidation_FailedThenSuccessfulValidationResetsPauseIdentifiersRatelimit(t *testing.T) {
va, sa, ra, redisSrc, fc, cleanUp := initAuthorities(t)
va, sa, ra, rl, fc, cleanUp := initAuthorities(t)
defer cleanUp()

if ra.limiter == nil {
t.Skip("no redis limiter configured")
}

features.Set(features.Config{AutomaticallyPauseZombieClients: true})
defer features.Reset()

// Because we're testing with a real Redis backend, we choose a different account ID
// than other tests to make we don't get interference from other tests using the same
// registration ID.
registration, err := sa.NewRegistration(ctx, &corepb.Registration{
Key: AccountKeyJSONC,
Status: string(core.StatusValid),
})
test.AssertNotError(t, err, "Failed to create registration")

mockSA := newMockSAPaused(sa)
ra.SA = mockSA

// Override the default ratelimits to only allow one failed validation.
txnBuilder, err := ratelimits.NewTransactionBuilder("testdata/two-failed-validations-before-pausing.yml", "")
test.AssertNotError(t, err, "making transaction composer")
ra.txnBuilder = txnBuilder

// We know this is OK because of TestNewAuthorization
// Replace the rate limit source with one that will block when Delete is called.
keyChan := make(chan string)
defer close(keyChan)
limiter, err := ratelimits.NewLimiter(fc, mockRLSourceWithSyncDelete{
Source: rl,
out: keyChan,
}, metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating mock limiter")
ra.limiter = limiter

// Set up a fake domain, authz, and bucket key to care about.
domain := randomDomain()
authzPB := createPendingAuthorization(t, sa, domain, fc.Now().Add(12*time.Hour))
authzPB.RegistrationID = registration.Id
mockSA.registrationsForRegID[authzPB.RegistrationID] = Registration
mockSA.authorizationsForRegID[authzPB.RegistrationID] = authzPB

// We induce the bad path by setting a problem. This will consume all
// available capacity in the rate limit bucket.
va.PerformValidationRequestResultReturn = &vapb.ValidationResult{
Records: []*corepb.ValidationRecord{
{
AddressUsed: []byte("192.168.0.1"),
Hostname: domain,
Port: "8080",
Url: fmt.Sprintf("http://%s/", domain),
ResolverAddrs: []string{"rebound"},
},
},
Problem: &corepb.ProblemDetails{
Detail: fmt.Sprintf("CAA invalid for %s", domain),
},
}

challIdx := dnsChallIdx(t, authzPB.Challenges)
authzPB, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
Authz: authzPB,
ChallengeIndex: challIdx,
})
test.AssertNotError(t, err, "PerformValidation failed")

select {
case r := <-va.performValidationRequest:
_ = r
case <-time.After(time.Second):
t.Fatal("Timed out waiting for DummyValidationAuthority.PerformValidation to complete")
}

// Sleep so the RA has a chance to write to the SA
time.Sleep(100 * time.Millisecond)

got, err := ra.SA.GetPausedIdentifiers(ctx, &sapb.RegistrationID{Id: authzPB.RegistrationID}, nil)
test.AssertError(t, err, "Should not have any paused identifiers yet, but found some")
test.AssertBoxedNil(t, got, "Should have received nil response, but did not")
test.AssertMetricWithLabelsEquals(t, ra.pauseCounter, prometheus.Labels{"paused": "false", "repaused": "false", "grace": "false"}, 0)

// We need the bucket key to scan for in Redis
bucketKey, err := ratelimits.NewRegIdDomainBucketKey(ratelimits.FailedAuthorizationsForPausingPerDomainPerAccount, authzPB.RegistrationID, domain)
test.AssertNotError(t, err, "Should have been able to construct bucket key, but could not")

// Verify that a redis entry exists for this accountID:identifier
tat, err := redisSrc.Get(ctx, bucketKey)
test.AssertNotError(t, err, "Should not have errored, but did")
test.AssertNotError(t, err, "constructing test bucket key")

// We should have capacity for 1 more failed validation, the next TAT should
// be immediately (despite the fact that this clearly says now + 12 hours).
test.AssertEquals(t, tat, fc.Now().Add(12*time.Hour))

//
// Now the goal is to perform a successful validation which should reset the
// FailedAuthorizationsForPausingPerDomainPerAccount ratelimit.
//

// We know this is OK because of TestNewAuthorization
authzPB = createPendingAuthorization(t, sa, domain, fc.Now().Add(12*time.Hour))
authzPB.RegistrationID = registration.Id
// Set a stored TAT so that we can tell when it's been reset.
err = rl.BatchSet(context.Background(), map[string]time.Time{
bucketKey: fc.Now().Add(25 * time.Hour),
})
test.AssertNotError(t, err, "updating rate limit bucket")

// Now a successful validation should reset the rate limit bucket.
va.PerformValidationRequestResultReturn = &vapb.ValidationResult{
Records: []*corepb.ValidationRecord{
{
Expand All @@ -1064,33 +906,22 @@ func TestPerformValidation_FailedThenSuccessfulValidationResetsPauseIdentifiersR
Problem: nil,
}

challIdx = dnsChallIdx(t, authzPB.Challenges)
authzPB, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
_, err = ra.PerformValidation(ctx, &rapb.PerformValidationRequest{
Authz: authzPB,
ChallengeIndex: challIdx,
ChallengeIndex: dnsChallIdx(t, authzPB.Challenges),
})
test.AssertNotError(t, err, "PerformValidation failed")
mockSA.authorizationsForRegID[authzPB.RegistrationID] = authzPB

select {
case r := <-va.performValidationRequest:
_ = r
case <-time.After(time.Second):
t.Fatal("Timed out waiting for DummyValidationAuthority.PerformValidation to complete")
}

// We need the bucket key to scan for in Redis
bucketKey, err = ratelimits.NewRegIdDomainBucketKey(ratelimits.FailedAuthorizationsForPausingPerDomainPerAccount, authzPB.RegistrationID, domain)
test.AssertNotError(t, err, "Should have been able to construct bucket key, but could not")
// Wait for the RA to finish processesing the validation, and ensure that
// the reset bucket key is what we expect.
reset := <-keyChan
test.AssertEquals(t, reset, bucketKey)

// Verify that the bucket no longer exists (because the limiter reset has
// deleted it). This indicates the accountID:identifier bucket has regained
// capacity avoiding being inadvertently paused.
_, err = redisSrc.Get(ctx, bucketKey)
_, err = rl.Get(ctx, bucketKey)
test.AssertErrorIs(t, err, ratelimits.ErrBucketNotFound)

err = ra.limiter.Reset(ctx, bucketKey)
test.AssertNotError(t, err, "Failed cleaning up redis")
}

func TestPerformValidationVAError(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions ra/testdata/two-failed-validations-before-pausing.yml

This file was deleted.

4 changes: 4 additions & 0 deletions test/inmem/sa/sa.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (sa SA) FQDNSetExists(ctx context.Context, req *sapb.FQDNSetExistsRequest,
return sa.Impl.FQDNSetExists(ctx, req)
}

func (sa SA) PauseIdentifiers(ctx context.Context, req *sapb.PauseRequest, _ ...grpc.CallOption) (*sapb.PauseIdentifiersResponse, error) {
return sa.Impl.PauseIdentifiers(ctx, req)
}

type mockStreamResult[T any] struct {
val T
err error
Expand Down

0 comments on commit aac7c22

Please sign in to comment.