Skip to content

Commit

Permalink
Merge pull request #9029 from hashicorp/b-tgs-updates
Browse files Browse the repository at this point in the history
consul/connect: trigger update as necessary on connect changes
  • Loading branch information
shoenig authored Oct 5, 2020
2 parents c195f93 + 37cd0ec commit 933e32a
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 2 deletions.
59 changes: 57 additions & 2 deletions nomad/structs/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (c *ConsulConnect) Copy() *ConsulConnect {
}
}

// Equals returns true if the structs are recursively equal.
// Equals returns true if the connect blocks are deeply equal.
func (c *ConsulConnect) Equals(o *ConsulConnect) bool {
if c == nil || o == nil {
return c == o
Expand All @@ -710,7 +710,9 @@ func (c *ConsulConnect) Equals(o *ConsulConnect) bool {
return false
}

// todo(shoenig) task has never been compared, should it be?
if !c.SidecarTask.Equals(o.SidecarTask) {
return false
}

if !c.Gateway.Equals(o.Gateway) {
return false
Expand Down Expand Up @@ -864,6 +866,59 @@ type SidecarTask struct {
KillSignal string
}

func (t *SidecarTask) Equals(o *SidecarTask) bool {
if t == nil || o == nil {
return t == o
}

if t.Name != o.Name {
return false
}

if t.Driver != o.Driver {
return false
}

if t.User != o.User {
return false
}

// config compare
if !opaqueMapsEqual(t.Config, o.Config) {
return false
}

if !helper.CompareMapStringString(t.Env, o.Env) {
return false
}

if !t.Resources.Equals(o.Resources) {
return false
}

if !helper.CompareMapStringString(t.Meta, o.Meta) {
return false
}

if !helper.CompareTimePtrs(t.KillTimeout, o.KillTimeout) {
return false
}

if !t.LogConfig.Equals(o.LogConfig) {
return false
}

if !helper.CompareTimePtrs(t.ShutdownDelay, o.ShutdownDelay) {
return false
}

if t.KillSignal != o.KillSignal {
return false
}

return true
}

func (t *SidecarTask) Copy() *SidecarTask {
if t == nil {
return nil
Expand Down
79 changes: 79 additions & 0 deletions nomad/structs/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,85 @@ func TestSidecarTask_MergeIntoTask(t *testing.T) {
require.Exactly(t, expected, task)
}

func TestSidecarTask_Equals(t *testing.T) {
t.Parallel()

original := &SidecarTask{
Name: "sidecar-task-1",
Driver: "docker",
User: "nobody",
Config: map[string]interface{}{"foo": 1},
Env: map[string]string{"color": "blue"},
Resources: &Resources{MemoryMB: 300},
Meta: map[string]string{"index": "1"},
KillTimeout: helper.TimeToPtr(2 * time.Second),
LogConfig: &LogConfig{
MaxFiles: 2,
MaxFileSizeMB: 300,
},
ShutdownDelay: helper.TimeToPtr(10 * time.Second),
KillSignal: "SIGTERM",
}

t.Run("unmodified", func(t *testing.T) {
duplicate := original.Copy()
require.True(t, duplicate.Equals(original))
})

type st = SidecarTask
type tweaker = func(task *st)

try := func(t *testing.T, tweak tweaker) {
modified := original.Copy()
tweak(modified)
require.NotEqual(t, original, modified)
}

t.Run("mod name", func(t *testing.T) {
try(t, func(s *st) { s.Name = "sidecar-task-2" })
})

t.Run("mod driver", func(t *testing.T) {
try(t, func(s *st) { s.Driver = "exec" })
})

t.Run("mod user", func(t *testing.T) {
try(t, func(s *st) { s.User = "root" })
})

t.Run("mod config", func(t *testing.T) {
try(t, func(s *st) { s.Config = map[string]interface{}{"foo": 2} })
})

t.Run("mod env", func(t *testing.T) {
try(t, func(s *st) { s.Env = map[string]string{"color": "red"} })
})

t.Run("mod resources", func(t *testing.T) {
try(t, func(s *st) { s.Resources = &Resources{MemoryMB: 200} })
})

t.Run("mod meta", func(t *testing.T) {
try(t, func(s *st) { s.Meta = map[string]string{"index": "2"} })
})

t.Run("mod kill timeout", func(t *testing.T) {
try(t, func(s *st) { s.KillTimeout = helper.TimeToPtr(3 * time.Second) })
})

t.Run("mod log config", func(t *testing.T) {
try(t, func(s *st) { s.LogConfig = &LogConfig{MaxFiles: 3} })
})

t.Run("mod shutdown delay", func(t *testing.T) {
try(t, func(s *st) { s.ShutdownDelay = helper.TimeToPtr(20 * time.Second) })
})

t.Run("mod kill signal", func(t *testing.T) {
try(t, func(s *st) { s.KillSignal = "SIGHUP" })
})
}

func TestConsulUpstream_upstreamEquals(t *testing.T) {
t.Parallel()

Expand Down
16 changes: 16 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6231,6 +6231,22 @@ type LogConfig struct {
MaxFileSizeMB int
}

func (l *LogConfig) Equals(o *LogConfig) bool {
if l == nil || o == nil {
return l == o
}

if l.MaxFiles != o.MaxFiles {
return false
}

if l.MaxFileSizeMB != o.MaxFileSizeMB {
return false
}

return true
}

func (l *LogConfig) Copy() *LogConfig {
if l == nil {
return nil
Expand Down
32 changes: 32 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,38 @@ func TestTask_Validate_LogConfig(t *testing.T) {
}
}

func TestLogConfig_Equals(t *testing.T) {
t.Run("both nil", func(t *testing.T) {
a := (*LogConfig)(nil)
b := (*LogConfig)(nil)
require.True(t, a.Equals(b))
})

t.Run("one nil", func(t *testing.T) {
a := new(LogConfig)
b := (*LogConfig)(nil)
require.False(t, a.Equals(b))
})

t.Run("max files", func(t *testing.T) {
a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200}
b := &LogConfig{MaxFiles: 2, MaxFileSizeMB: 200}
require.False(t, a.Equals(b))
})

t.Run("max file size", func(t *testing.T) {
a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 100}
b := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200}
require.False(t, a.Equals(b))
})

t.Run("same", func(t *testing.T) {
a := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200}
b := &LogConfig{MaxFiles: 1, MaxFileSizeMB: 200}
require.True(t, a.Equals(b))
})
}

func TestTask_Validate_CSIPluginConfig(t *testing.T) {
table := []struct {
name string
Expand Down
77 changes: 77 additions & 0 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
return true
}

// Check connect service(s) updated
if connectServiceUpdated(a.Services, b.Services) {
return true
}

// Check each task
for _, at := range a.Tasks {
bt := b.LookupTask(at.Name)
Expand Down Expand Up @@ -429,6 +434,78 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
return false
}

// connectServiceUpdated returns true if any services with a connect stanza have
// been changed in such a way that requires a destructive update.
//
// Ordinary services can be updated in-place by updating the service definition
// in Consul. Connect service changes mostly require destroying the task.
func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool {
for _, serviceA := range servicesA {
if serviceA.Connect != nil {
for _, serviceB := range servicesB {
if serviceA.Name == serviceB.Name {
if connectUpdated(serviceA.Connect, serviceB.Connect) {
return true
}
// Part of the Connect plumbing is derived from port label,
// if that changes we need to destroy the task.
if serviceA.PortLabel != serviceB.PortLabel {
return true
}
break
}
}
}
}
return false
}

// connectUpdated returns true if the connect block has been updated in a manner
// that will require a destructive update.
//
// Fields that can be updated through consul-sync do not need a destructive
// update.
func connectUpdated(connectA, connectB *structs.ConsulConnect) bool {
if connectA == nil || connectB == nil {
return connectA == connectB
}

if connectA.Native != connectB.Native {
return true
}

if !connectA.Gateway.Equals(connectB.Gateway) {
return true
}

if !connectA.SidecarTask.Equals(connectB.SidecarTask) {
return true
}

// not everything in sidecar_service needs task destruction
if connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService) {
return true
}

return false
}

func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) bool {
if ssA == nil || ssB == nil {
return ssA == ssB
}

if ssA.Port != ssB.Port {
return true
}

// sidecar_service.tags handled in-place (registration)

// sidecar_service.proxy handled in-place (registration + xDS)

return false
}

func networkUpdated(netA, netB []*structs.NetworkResource) bool {
if len(netA) != len(netB) {
return true
Expand Down
Loading

0 comments on commit 933e32a

Please sign in to comment.