Skip to content

Commit

Permalink
add agent locality and replicate it across peer streams
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn committed Mar 3, 2023
1 parent 9a485cd commit f19c05a
Show file tree
Hide file tree
Showing 21 changed files with 726 additions and 466 deletions.
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String()
cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate
cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate
cfg.Locality = runtimeCfg.StructLocality()

enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil
Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
// gossip configuration
GossipLANGossipInterval: b.durationVal("gossip_lan..gossip_interval", c.GossipLAN.GossipInterval),
GossipLANGossipNodes: intVal(c.GossipLAN.GossipNodes),
Locality: c.Locality,
GossipLANProbeInterval: b.durationVal("gossip_lan..probe_interval", c.GossipLAN.ProbeInterval),
GossipLANProbeTimeout: b.durationVal("gossip_lan..probe_timeout", c.GossipLAN.ProbeTimeout),
GossipLANSuspicionMult: intVal(c.GossipLAN.SuspicionMult),
Expand Down
10 changes: 10 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type Config struct {
LeaveOnTerm *bool `mapstructure:"leave_on_terminate" json:"leave_on_terminate,omitempty"`
LicensePath *string `mapstructure:"license_path" json:"license_path,omitempty"`
Limits Limits `mapstructure:"limits" json:"-"`
Locality Locality `mapstructure:"locality" json:"-"`
LogLevel *string `mapstructure:"log_level" json:"log_level,omitempty"`
LogJSON *bool `mapstructure:"log_json" json:"log_json,omitempty"`
LogFile *string `mapstructure:"log_file" json:"log_file,omitempty"`
Expand Down Expand Up @@ -311,6 +312,15 @@ type GossipWANConfig struct {
RetransmitMult *int `mapstructure:"retransmit_mult"`
}

// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region *string `mapstructure:"region"`

// Zone is the zone the entity is running in.
Zone *string `mapstructure:"zone"`
}

type Consul struct {
Coordinate struct {
UpdateBatchSize *int `mapstructure:"update_batch_size"`
Expand Down
9 changes: 9 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ type RuntimeConfig struct {
// hcl: leave_on_terminate = (true|false)
LeaveOnTerm bool

Locality Locality

// Logging configuration used to initialize agent logging.
Logging logging.Config

Expand Down Expand Up @@ -1713,6 +1715,13 @@ func (c *RuntimeConfig) VersionWithMetadata() string {
return version
}

func (c *RuntimeConfig) StructLocality() structs.Locality {
return structs.Locality{
Region: stringVal(c.Locality.Region),
Zone: stringVal(c.Locality.Zone),
}
}

// Sanitized returns a JSON/HCL compatible representation of the runtime
// configuration where all fields with potential secrets had their
// values replaced by 'hidden'. In addition, network addresses and
Expand Down
1 change: 1 addition & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7092,6 +7092,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
},
},
},
Locality: Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
}

b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
Expand Down
4 changes: 4 additions & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@
"LeaveDrainTime": "0s",
"LeaveOnTerm": false,
"LocalProxyConfigResyncInterval": "0s",
"Locality": {
"Region": "us-west-1",
"Zone": "us-west-1a"
},
"Logging": {
"EnableSyslog": false,
"LogFilePath": "",
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ type Config struct {

PeeringTestAllowPeerRegistrations bool

Locality structs.Locality

// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
}
Expand Down
1 change: 1 addition & 0 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func (s *Server) establishStream(ctx context.Context,
Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition,
Datacenter: s.config.Datacenter,
Locality: pbpeering.LocalityFromStruct(s.config.Locality),
},
},
},
Expand Down
14 changes: 14 additions & 0 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,11 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
t.Skip("too slow for testing.Short")
}

acceptorLocality := structs.Locality{
Region: "us-west-2",
Zone: "us-west-2a",
}

ca := connect.TestCA(t, nil)
_, acceptingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "accepting-server"
Expand All @@ -676,13 +681,18 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
"RootCert": ca.RootCert,
},
}
c.Locality = acceptorLocality
})
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")

// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

dialerLocality := structs.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
}
conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
//nolint:staticcheck
Expand All @@ -705,13 +715,15 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
// Ensure that the token contains the correct partition and dc
require.Equal(t, "dc1", token.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, token.Remote.Partition)
require.Equal(t, acceptorLocality, token.Remote.Locality)

// Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
_, dialingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialing-server"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
c.PeeringEnabled = true
c.Locality = dialerLocality
})
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")

Expand Down Expand Up @@ -743,6 +755,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(acceptorLocality), p.Peering.Remote.Locality)

// Retry fetching the until the peering is active in the acceptor.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -758,6 +771,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NotNil(t, p)
require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(dialerLocality), p.Peering.Remote.Locality)
}

// Test that the dialing peer attempts to reestablish connections when the accepting peer
Expand Down
1 change: 1 addition & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
Datacenter: config.Datacenter,
ConnectEnabled: config.ConnectEnabled,
PeeringEnabled: config.PeeringEnabled,
Locality: config.Locality,
})
s.peeringServer = p
o := operator.NewServer(operator.Config{
Expand Down
1 change: 1 addition & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
req.Peering.Remote = &pbpeering.RemoteInfo{
Partition: existing.Remote.Partition,
Datacenter: existing.Remote.Datacenter,
Locality: existing.Remote.Locality,
}
}

Expand Down
36 changes: 36 additions & 0 deletions agent/consul/state/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
},
Expand All @@ -1272,6 +1276,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
secrets: &pbpeering.PeeringSecrets{
Expand Down Expand Up @@ -1303,6 +1311,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
secrets: &pbpeering.PeeringSecrets{
Expand Down Expand Up @@ -1332,6 +1344,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
secrets: &pbpeering.PeeringSecrets{
Expand Down Expand Up @@ -1361,6 +1377,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
// Secrets for baz should have been deleted
Expand Down Expand Up @@ -1389,6 +1409,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
// Meta should be unchanged.
Meta: nil,
Expand Down Expand Up @@ -1416,6 +1440,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
secrets: nil,
Expand Down Expand Up @@ -1443,6 +1471,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
// Secrets for baz should have been deleted
Expand All @@ -1469,6 +1501,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{
Partition: "part1",
Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
},
},
// Secrets for baz should have been deleted
Expand Down
3 changes: 3 additions & 0 deletions agent/rpc/peering/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Config struct {
Datacenter string
ConnectEnabled bool
PeeringEnabled bool
Locality structs.Locality
}

func NewServer(cfg Config) *Server {
Expand Down Expand Up @@ -327,6 +328,7 @@ func (s *Server) GenerateToken(
Remote: structs.PeeringTokenRemote{
Partition: req.PartitionOrDefault(),
Datacenter: s.Datacenter,
Locality: s.Config.Locality,
},
}

Expand Down Expand Up @@ -445,6 +447,7 @@ func (s *Server) Establish(
Remote: &pbpeering.RemoteInfo{
Partition: tok.Remote.Partition,
Datacenter: tok.Remote.Datacenter,
Locality: pbpeering.LocalityFromStruct(tok.Remote.Locality),
},
}

Expand Down
1 change: 1 addition & 0 deletions agent/structs/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type PeeringToken struct {
type PeeringTokenRemote struct {
Partition string
Datacenter string
Locality Locality
}

type IndexedExportedServiceList struct {
Expand Down
9 changes: 9 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3021,3 +3021,12 @@ func TimeToProto(s time.Time) *timestamppb.Timestamp {
func IsZeroProtoTime(t *timestamppb.Timestamp) bool {
return t.Seconds == 0 && t.Nanos == 0
}

// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region string `json:",omitempty"`

// Zone is the zone the entity is running in.
Zone string `json:",omitempty"`
}
10 changes: 10 additions & 0 deletions api/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ type PeeringRemoteInfo struct {
Partition string
// Datacenter is the remote peer's datacenter.
Datacenter string
Locality Locality
}

// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region string

// Zone is the zone the entity is running in.
Zone string
}

type Peering struct {
Expand Down
22 changes: 22 additions & 0 deletions proto/private/pbpeering/peering.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f19c05a

Please sign in to comment.