Skip to content

Commit

Permalink
deduplicate peer server strings (#3)
Browse files Browse the repository at this point in the history
* deduplicate peer server strings

* consider removing members that are not started

* extend default startup grace period
  • Loading branch information
charless-splunk authored Mar 27, 2019
1 parent 9a8ae20 commit b944585
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
4 changes: 3 additions & 1 deletion embetcd/memberInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (m *Members) Get(member *membership.Member) (info *Member) {
info.Member = member

// update the member client endpoints
info.Client.SetEndpoints(member.ClientURLs...)
if member != nil && member.ClientURLs != nil {
info.Client.SetEndpoints(member.ClientURLs...)
}

return info
}
Expand Down
46 changes: 35 additions & 11 deletions embetcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"os"
"path"
"strings"
"sync"
"time"

Expand All @@ -26,7 +27,7 @@ const (
DefaultCleanUpInterval = time.Second * 5
// DefaultStartUpGracePeriod is the graceperiod to wait for new cluster members to startup
// before they're subject to health checks
DefaultStartUpGracePeriod = time.Second * 10
DefaultStartUpGracePeriod = time.Second * 60
// DefaultShutdownTimeout is the default time to wait for the server to shutdown cleanly
DefaultShutdownTimeout = time.Minute * 1
// DefaultDialTimeout is the default etcd dial timeout
Expand Down Expand Up @@ -59,6 +60,31 @@ func ServerNameConflicts(ctx context.Context, client *Client, name string) (conf
return conflicts, err
}

// dedupPeerString take a peer string and deduplicates it
func dedupPeerString(peer string) (peers string) {
// map to keep track of already used substrings
set := make(map[string]struct{})

// break on , and check if the substring has already been used
for _, substr := range strings.Split(peer, ",") {
cleansubstr := strings.TrimSpace(substr)
if _, ok := set[cleansubstr]; !ok {
parts := strings.Split(cleansubstr, "=")
if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
if len(set) == 0 {
// don't use a comma if first element
peers = cleansubstr
} else {
// use comma for everything after the first element
peers = fmt.Sprintf("%s,%s", peers, cleansubstr)
}
set[cleansubstr] = struct{}{}
}
}
}
return peers
}

// getServerPeers returns the peer urls for the cluster formatted for the initialCluster server configuration.
// The context that is passed in should have a configured timeout.
func getServerPeers(ctx context.Context, c *Client, initialCluster string) (peers string, err error) {
Expand Down Expand Up @@ -87,7 +113,7 @@ func getServerPeers(ctx context.Context, c *Client, initialCluster string) (peer
break
}
}

peers = dedupPeerString(peers)
return peers, err
}

Expand Down Expand Up @@ -372,17 +398,15 @@ func (s *Server) cleanCluster(ctx context.Context, members *Members, client *Cli
for _, cmember := range currentMembers {

// wait to check health until the member is listed as started
if cmember.IsStarted() {

currentMemberIDs[uint64(cmember.ID)] = struct{}{}
currentMemberIDs[uint64(cmember.ID)] = struct{}{}

// fetch the health of the member in a separate go routine
wg.Add(1)
go func(m *Member) {
m.Update(client)
wg.Done()
}(members.Get(cmember))
}
// fetch the health of the member in a separate go routine
wg.Add(1)
go func(m *Member) {
m.Update(client)
wg.Done()
}(members.Get(cmember))

}

Expand Down

0 comments on commit b944585

Please sign in to comment.