Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor various utility functions into a consul/lib package #1666

Merged
merged 6 commits into from
Feb 2, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -600,8 +601,8 @@ func (a *Agent) sendCoordinate() {
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv)
intv := lib.RateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + lib.RandomStagger(intv)

select {
case <-time.After(intv):
Expand Down
9 changes: 5 additions & 4 deletions command/agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-cleanhttp"
)

Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *CheckMonitor) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckMonitor) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, c.Script)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -366,7 +367,7 @@ func (c *CheckHTTP) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckHTTP) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -482,7 +483,7 @@ func (c *CheckTCP) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckTCP) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -580,7 +581,7 @@ func (c *CheckDocker) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime)
for {
Expand Down
3 changes: 2 additions & 1 deletion command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-reap"
Expand Down Expand Up @@ -424,7 +425,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log

// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(randomStagger(30 * time.Second))
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
Expand Down
3 changes: 2 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -634,7 +635,7 @@ func DecodeConfig(r io.Reader) (*Config, error) {
allowedKeys := []string{"service", "services", "check", "checks"}
var unused []string
for _, field := range md.Unused {
if !strContains(allowedKeys, field) {
if !lib.StrContains(allowedKeys, field) {
unused = append(unused, field)
}
}
Expand Down
4 changes: 3 additions & 1 deletion command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"testing"
"time"

"github.com/hashicorp/consul/lib"
)

func TestConfigEncryptBytes(t *testing.T) {
Expand Down Expand Up @@ -1103,7 +1105,7 @@ func TestDecodeConfig_Service(t *testing.T) {
t.Fatalf("bad: %v", serv)
}

if !strContains(serv.Tags, "master") {
if !lib.StrContains(serv.Tags, "master") {
t.Fatalf("bad: %v", serv)
}

Expand Down
9 changes: 5 additions & 4 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
)

const (
Expand Down Expand Up @@ -252,7 +253,7 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
if l.config.CheckUpdateInterval > 0 && check.Status == status {
check.Output = output
if _, ok := l.deferCheck[checkID]; !ok {
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + randomStagger(l.config.CheckUpdateInterval)
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
deferSync := time.AfterFunc(intv, func() {
l.Lock()
if _, ok := l.checkStatus[checkID]; ok {
Expand Down Expand Up @@ -302,11 +303,11 @@ SYNC:
case <-l.consulCh:
// Stagger the retry on leader election, avoid a thundering heard
select {
case <-time.After(randomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
case <-shutdownCh:
return
}
case <-time.After(syncRetryIntv + randomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
case <-shutdownCh:
return
}
Expand All @@ -317,7 +318,7 @@ SYNC:

// Schedule the next full sync, with a random stagger
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
aeIntv = aeIntv + randomStagger(aeIntv)
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)

// Wait for sync events
Expand Down
9 changes: 9 additions & 0 deletions command/agent/remote_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ import (

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-uuid"
)

func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
return "DEADC0DE-BADD-CAFE-D00D-FEEDFACECAFE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These might be better done as a panic, since this could cause weirdness in the tests if we hit this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

}
return ret
}

func TestRexecWriter(t *testing.T) {
writer := &rexecWriter{
BufCh: make(chan []byte, 16),
Expand Down
6 changes: 5 additions & 1 deletion command/agent/user_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"regexp"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
)

const (
Expand Down Expand Up @@ -78,7 +79,10 @@ func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
}

// Format message
params.ID = generateUUID()
var err error
if params.ID, err = uuid.GenerateUUID(); err != nil {
return fmt.Errorf("UUID generation failed: %v", err)
}
params.Version = userEventMaxVersion
payload, err := encodeMsgPack(&params)
if err != nil {
Expand Down
43 changes: 0 additions & 43 deletions command/agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package agent
import (
"bytes"
"crypto/md5"
crand "crypto/rand"
"fmt"
"math"
"math/rand"
"os"
"os/exec"
"os/user"
Expand Down Expand Up @@ -39,32 +37,6 @@ func aeScale(interval time.Duration, n int) time.Duration {
return time.Duration(multiplier) * interval
}

// rateScaledInterval is used to choose an interval to perform an action in order
// to target an aggregate number of actions per second across the whole cluster.
func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}

return interval
}

// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}

// ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string
Expand All @@ -82,21 +54,6 @@ func ExecScript(script string) (*exec.Cmd, error) {
return cmd, nil
}

// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}

return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}

// decodeMsgPack is used to decode a MsgPack encoded object
func decodeMsgPack(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
Expand Down
33 changes: 0 additions & 33 deletions command/agent/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,6 @@ func TestAEScale(t *testing.T) {
}
}

func TestRateScaledInterval(t *testing.T) {
min := 1 * time.Second
rate := 200.0
if v := rateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 100); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 200); v != 1*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
}

func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}

func TestStringHash(t *testing.T) {
in := "hello world"
expected := "5eb63bbbe01eeed093cb22bb8f5acdc3"
Expand Down
7 changes: 6 additions & 1 deletion consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
)

// ACL endpoint is used to manipulate ACLs
Expand Down Expand Up @@ -62,7 +63,11 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
if args.ACL.ID == "" {
state := a.srv.fsm.State()
for {
args.ACL.ID = generateUUID()
if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
return err
}

_, acl, err := state.ACLGet(args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion consul/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
Expand Down Expand Up @@ -436,7 +437,7 @@ func TestACLEndpoint_List(t *testing.T) {
if s.ID == anonymousToken || s.ID == "root" {
continue
}
if !strContains(ids, s.ID) {
if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s)
}
if s.Name != "User token" {
Expand Down
3 changes: 2 additions & 1 deletion consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
Expand Down Expand Up @@ -978,7 +979,7 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("bad: %v", out)
}
services := out.NodeServices.Services
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
if !lib.StrContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out)
}
if len(services["web"].Tags) != 0 || services["web"].Port != 80 {
Expand Down
12 changes: 11 additions & 1 deletion consul/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
)

Expand Down Expand Up @@ -38,6 +40,14 @@ func makeLog(buf []byte) *raft.Log {
}
}

func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
return "DEADC0DE-BADD-CAFE-D00D-FEEDFACECAFE"
}
return ret
}

func TestFSM_RegisterNode(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
Expand Down Expand Up @@ -452,7 +462,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if len(fooSrv.Services) != 2 {
t.Fatalf("Bad: %v", fooSrv)
}
if !strContains(fooSrv.Services["db"].Tags, "primary") {
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
t.Fatalf("Bad: %v", fooSrv)
}
if fooSrv.Services["db"].Port != 5000 {
Expand Down
Loading