Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Various race condition and test fixes. into release/1.17.x #20216

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,7 +2217,7 @@ func TestACL_Authorize(t *testing.T) {
}

t.Parallel()
a1 := NewTestAgent(t, TestACLConfigWithParams(nil))
a1 := NewTestAgent(t, TestACLConfigWithParams(nil), TestAgentOpts{DisableACLBootstrapCheck: true})
defer a1.Shutdown()

testrpc.WaitForTestAgent(t, a1.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
Expand Down Expand Up @@ -2253,7 +2253,7 @@ func TestACL_Authorize(t *testing.T) {
secondaryParams.ReplicationToken = secondaryParams.InitialManagementToken
secondaryParams.EnableTokenReplication = true

a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams))
a2 := NewTestAgent(t, `datacenter = "dc2" `+TestACLConfigWithParams(secondaryParams), TestAgentOpts{DisableACLBootstrapCheck: true})
defer a2.Shutdown()

addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
Expand Down
4 changes: 2 additions & 2 deletions agent/grpc-external/services/peerstream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {

func TestStreamResources_Server_AckNackNonce(t *testing.T) {
srv, store := newTestServer(t, func(c *Config) {
c.incomingHeartbeatTimeout = 10 * time.Millisecond
c.incomingHeartbeatTimeout = 5 * time.Second
})

p := writePeeringToBeDialed(t, store, 1, "my-peer")
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func TestStreamResources_Server_AckNackNonce(t *testing.T) {
})
// Add in a sleep to prevent the test from flaking.
// The mock client expects certain calls to be made.
time.Sleep(50 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}

// Test that when the client doesn't send a heartbeat in time, the stream is disconnected.
Expand Down
2 changes: 1 addition & 1 deletion agent/grpc-external/services/peerstream/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error {
}

func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) {
return c.RecvWithTimeout(10 * time.Millisecond)
return c.RecvWithTimeout(100 * time.Millisecond)
}

func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) {
Expand Down
24 changes: 0 additions & 24 deletions agent/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,28 +150,6 @@ func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorde
}
}

func assertMetricsWithLabelIsNonZero(t *testing.T, respRec *httptest.ResponseRecorder, label, labelValue string) {
if respRec.Body.String() == "" {
t.Fatalf("Response body is empty.")
}

metrics := respRec.Body.String()
labelWithValueTarget := label + "=" + "\"" + labelValue + "\""

for _, line := range strings.Split(metrics, "\n") {
if len(line) < 1 || line[0] == '#' {
continue
}

if strings.Contains(line, labelWithValueTarget) {
s := strings.SplitN(line, " ", 2)
if s[1] == "0" {
t.Fatalf("Metric with label provided \"%s:%s\" has the value 0", label, labelValue)
}
}
}
}

func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) {
if respRec.Body.String() == "" {
t.Fatalf("Response body is empty.")
Expand Down Expand Up @@ -241,8 +219,6 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) {
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"})
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
// make sure rpc calls with elapsed time below 1ms are reported as decimal
assertMetricsWithLabelIsNonZero(t, respRec, "method", "Status.Ping")
})
}

Expand Down
8 changes: 3 additions & 5 deletions agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
// -- ImportedServicesCount and ExportedServicesCount
// NOTE: we return a new peering with this additional data
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
cp := copyPeering(peering)
streamState, found := s.Tracker.StreamStatus(peering.ID)
if !found {
// TODO(peering): this may be noise on non-leaders
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
" exported services count or reconcile peering state", "peerID", peering.ID)
peering.StreamStatus = &pbpeering.StreamStatus{}
return peering
cp.StreamStatus = &pbpeering.StreamStatus{}
return cp
} else {
cp := copyPeering(peering)

// reconcile pbpeering.PeeringState_Active
if streamState.Connected {
cp.State = pbpeering.PeeringState_ACTIVE
Expand Down Expand Up @@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error {
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
var copyP pbpeering.Peering
proto.Merge(&copyP, p)

return &copyP
}
56 changes: 36 additions & 20 deletions agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
require.NoError(t, err)
Expand Down Expand Up @@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
require.NoError(t, err)
Expand Down Expand Up @@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})
toWrite := proto.Clone(p).(*pbpeering.Peering)
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})
require.NoError(t, err)

client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
Expand All @@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) {

options := structs.QueryOptions{
MinQueryIndex: lastIdx,
MaxQueryTime: 1 * time.Second,
MaxQueryTime: 10 * time.Second,
}
ctx, err = external.ContextWithQueryOptions(ctx, options)
require.NoError(t, err)

// Mutate the original peering
p = proto.Clone(p).(*pbpeering.Peering)
p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2")

// Async change to trigger update
marker := time.Now()
recvChan := make(chan *pbpeering.PeeringReadResponse)
errChan := make(chan error)
var header metadata.MD
go func() {
time.Sleep(100 * time.Millisecond)
lastIdx++
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}))
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
if err != nil {
errChan <- err
return
}
recvChan <- resp
}()

var header metadata.MD
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
require.NoError(t, err)

// The query should return after the async change, but before the timeout
require.True(t, time.Since(marker) >= 100*time.Millisecond)
require.True(t, time.Since(marker) < 1*time.Second)

// Verify query results
meta, err := external.QueryMetaFromGRPCMeta(header)
require.NoError(t, err)
require.Equal(t, lastIdx, meta.Index)
lastIdx++
toWrite = proto.Clone(p).(*pbpeering.Peering)
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}))

prototest.AssertDeepEqual(t, p, resp.Peering)
select {
case err := <-errChan:
require.NoError(t, err)
case resp := <-recvChan:
meta, err := external.QueryMetaFromGRPCMeta(header)
require.NoError(t, err)
require.Equal(t, lastIdx, meta.Index)
resp.Peering.CreateIndex = 0
resp.Peering.ModifyIndex = 0
prototest.AssertDeepEqual(t, p, resp.Peering)
case <-time.After(2 * time.Second):
t.Error("blocking query timed out while waiting")
}
}

func TestPeeringService_Delete(t *testing.T) {
Expand Down Expand Up @@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo}))

Expand All @@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "barservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar}))

Expand Down Expand Up @@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "bazservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
go func() {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo}))
bar := &pbpeering.Peering{
Expand All @@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "barservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))

Expand Down
78 changes: 76 additions & 2 deletions agent/testagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (
"net/http/httptest"
"path/filepath"
"strconv"
"strings"
"testing"
"text/template"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
Expand Down Expand Up @@ -86,15 +88,32 @@ type TestAgent struct {
// allows the BaseDeps to be modified before starting the embedded agent
OverrideDeps func(deps *BaseDeps)

// Skips asserting that the ACL bootstrap has occurred. This may be required
// for various tests where multiple servers are joined later.
disableACLBootstrapCheck bool

// Agent is the embedded consul agent.
// It is valid after Start().
*Agent
}

type TestAgentOpts struct {
// Skips asserting that the ACL bootstrap has occurred. This may be required
// for various tests where multiple servers are joined later.
DisableACLBootstrapCheck bool
}

// NewTestAgent returns a started agent with the given configuration. It fails
// the test if the Agent could not be started.
func NewTestAgent(t *testing.T, hcl string) *TestAgent {
a := StartTestAgent(t, TestAgent{HCL: hcl})
func NewTestAgent(t *testing.T, hcl string, opts ...TestAgentOpts) *TestAgent {
// This varargs approach is used so that we don't have to modify all of the `NewTestAgent()` calls
// in order to introduce more optional arguments.
require.LessOrEqual(t, len(opts), 1, "NewTestAgent cannot accept more than one opts argument")
ta := TestAgent{HCL: hcl}
if len(opts) == 1 {
ta.disableACLBootstrapCheck = opts[0].DisableACLBootstrapCheck
}
a := StartTestAgent(t, ta)
t.Cleanup(func() { a.Shutdown() })
return a
}
Expand Down Expand Up @@ -286,6 +305,16 @@ func (a *TestAgent) waitForUp() error {
continue // fail, try again
}
if a.Config.Bootstrap && a.Config.ServerMode {
if !a.disableACLBootstrapCheck {
if ok, err := a.isACLBootstrapped(); err != nil {
retErr = fmt.Errorf("error checking for acl bootstrap: %w", err)
continue // fail, try again
} else if !ok {
retErr = fmt.Errorf("acl system not bootstrapped yet")
continue // fail, try again
}
}

if a.baseDeps.UseV2Resources() {
args := structs.DCSpecificRequest{
Datacenter: "dc1",
Expand Down Expand Up @@ -337,11 +366,56 @@ func (a *TestAgent) waitForUp() error {
}
return nil // success
}

}

return fmt.Errorf("unavailable. last error: %v", retErr)
}

func (a *TestAgent) isACLBootstrapped() (bool, error) {
if a.config.ACLInitialManagementToken == "" {
logger := a.Agent.logger.Named("test")
logger.Warn("Skipping check for ACL bootstrapping")

return true, nil // We lie because we can't check.
}

const policyName = structs.ACLPolicyGlobalManagementName

req := httptest.NewRequest("GET", "/v1/acl/policy/name/"+policyName, nil)
req.Header.Add("X-Consul-Token", a.config.ACLInitialManagementToken)
resp := httptest.NewRecorder()

raw, err := a.srv.ACLPolicyReadByName(resp, req)
if err != nil {
if strings.Contains(err.Error(), "Unexpected response code: 403 (ACL not found)") {
return false, nil
} else if isACLNotBootstrapped(err) {
return false, nil
}
return false, err
}
if raw == nil {
return false, nil
}
policy, ok := raw.(*structs.ACLPolicy)
if !ok {
return false, fmt.Errorf("expected ACLPolicy got %T", raw)
}

return policy != nil, nil
}

func isACLNotBootstrapped(err error) bool {
switch {
case strings.Contains(err.Error(), "ACL system must be bootstrapped before making any requests that require authorization"):
return true
case strings.Contains(err.Error(), "The ACL system is currently in legacy mode"):
return true
}
return false
}

// Shutdown stops the agent and removes the data directory if it is
// managed by the test agent.
func (a *TestAgent) Shutdown() error {
Expand Down