Skip to content

Commit

Permalink
Fix race condition in reconcilePeering.
Browse files Browse the repository at this point in the history
This resolves an issue where a peering object in the state store was
incorrectly mutated by a function, resulting in the test being flagged as
failing when the -race flag was used.
  • Loading branch information
hashi-derek committed Jan 16, 2024
1 parent 91dfaad commit dc04075
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
8 changes: 3 additions & 5 deletions agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,16 +764,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
// -- ImportedServicesCount and ExportedServicesCount
// NOTE: we return a new peering with this additional data
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
cp := copyPeering(peering)
streamState, found := s.Tracker.StreamStatus(peering.ID)
if !found {
// TODO(peering): this may be noise on non-leaders
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
" exported services count or reconcile peering state", "peerID", peering.ID)
peering.StreamStatus = &pbpeering.StreamStatus{}
return peering
cp.StreamStatus = &pbpeering.StreamStatus{}
return cp
} else {
cp := copyPeering(peering)

// reconcile pbpeering.PeeringState_Active
if streamState.Connected {
cp.State = pbpeering.PeeringState_ACTIVE
Expand Down Expand Up @@ -1160,6 +1159,5 @@ func validatePeer(peering *pbpeering.Peering, shouldDial bool) error {
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
var copyP pbpeering.Peering
proto.Merge(&copyP, p)

return &copyP
}
56 changes: 36 additions & 20 deletions agent/rpc/peering/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func TestPeeringService_Read(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
require.NoError(t, err)
Expand Down Expand Up @@ -815,6 +816,7 @@ func TestPeeringService_Read_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: p})
require.NoError(t, err)
Expand Down Expand Up @@ -879,8 +881,10 @@ func TestPeeringService_Read_Blocking(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "test",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p})
toWrite := proto.Clone(p).(*pbpeering.Peering)
err := s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite})
require.NoError(t, err)

client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
Expand All @@ -891,37 +895,44 @@ func TestPeeringService_Read_Blocking(t *testing.T) {

options := structs.QueryOptions{
MinQueryIndex: lastIdx,
MaxQueryTime: 1 * time.Second,
MaxQueryTime: 10 * time.Second,
}
ctx, err = external.ContextWithQueryOptions(ctx, options)
require.NoError(t, err)

// Mutate the original peering
p = proto.Clone(p).(*pbpeering.Peering)
p.PeerServerAddresses = append(p.PeerServerAddresses, "addr2")

// Async change to trigger update
marker := time.Now()
recvChan := make(chan *pbpeering.PeeringReadResponse)
errChan := make(chan error)
var header metadata.MD
go func() {
time.Sleep(100 * time.Millisecond)
lastIdx++
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: p}))
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
if err != nil {
errChan <- err
return
}
recvChan <- resp
}()

var header metadata.MD
resp, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "foo"}, gogrpc.Header(&header))
require.NoError(t, err)

// The query should return after the async change, but before the timeout
require.True(t, time.Since(marker) >= 100*time.Millisecond)
require.True(t, time.Since(marker) < 1*time.Second)

// Verify query results
meta, err := external.QueryMetaFromGRPCMeta(header)
require.NoError(t, err)
require.Equal(t, lastIdx, meta.Index)
lastIdx++
toWrite = proto.Clone(p).(*pbpeering.Peering)
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: toWrite}))

prototest.AssertDeepEqual(t, p, resp.Peering)
select {
case err := <-errChan:
require.NoError(t, err)
case resp := <-recvChan:
meta, err := external.QueryMetaFromGRPCMeta(header)
require.NoError(t, err)
require.Equal(t, lastIdx, meta.Index)
resp.Peering.CreateIndex = 0
resp.Peering.ModifyIndex = 0
prototest.AssertDeepEqual(t, p, resp.Peering)
case <-time.After(2 * time.Second):
t.Error("blocking query timed out while waiting")
}
}

func TestPeeringService_Delete(t *testing.T) {
Expand Down Expand Up @@ -1064,6 +1075,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: foo}))

Expand All @@ -1075,6 +1087,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "barservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.PeeringWriteRequest{Peering: bar}))

Expand Down Expand Up @@ -1120,6 +1133,7 @@ func TestPeeringService_List(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "bazservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
go func() {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -1166,6 +1180,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, &pbpeering.PeeringWriteRequest{Peering: foo}))
bar := &pbpeering.Peering{
Expand All @@ -1175,6 +1190,7 @@ func TestPeeringService_List_ACLEnforcement(t *testing.T) {
PeerCAPems: nil,
PeerServerName: "barservername",
PeerServerAddresses: []string{"addr1"},
StreamStatus: &pbpeering.StreamStatus{},
}
require.NoError(t, s.Server.FSM().State().PeeringWrite(15, &pbpeering.PeeringWriteRequest{Peering: bar}))

Expand Down

0 comments on commit dc04075

Please sign in to comment.