Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Replace byte hash with string hash
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Jul 1, 2021
1 parent 3c0006e commit 95920ea
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 26 deletions.
12 changes: 6 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.Behaviors.MultiRegionBatchLimit, maxBatchSize)
setter.SetDefault(&c.Behaviors.MultiRegionSyncWait, time.Second)

setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, DefaultReplicas))
setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))
setter.SetDefault(&c.Cache, NewLRUCache(0))

Expand Down Expand Up @@ -335,12 +335,12 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,

switch pp {
case "replicated-hash":
setter.SetDefault(&replicas, getEnvInteger(log, "GUBER_REPLICATED_HASH_REPLICAS"), DefaultReplicas)
setter.SetDefault(&replicas, getEnvInteger(log, "GUBER_REPLICATED_HASH_REPLICAS"), defaultReplicas)
conf.Picker = NewReplicatedConsistentHash(nil, replicas)
setter.SetDefault(&hash, os.Getenv("GUBER_PEER_PICKER_HASH"), "fnv1a")
hashFuncs := map[string]HashFunc64{
"fnv1a": fnv1a.HashBytes64,
"fnv1": fnv1.HashBytes64,
hashFuncs := map[string]HashString64{
"fnv1a": fnv1a.HashString64,
"fnv1": fnv1.HashString64,
}
fn, ok := hashFuncs[hash]
if !ok {
Expand Down Expand Up @@ -528,7 +528,7 @@ func validClientAuthTypes(m map[string]tls.ClientAuthType) string {
return strings.Join(rs, ",")
}

func validHash64Keys(m map[string]HashFunc64) string {
func validHash64Keys(m map[string]HashString64) string {
var rs []string
for k, _ := range m {
rs = append(rs, k)
Expand Down
4 changes: 2 additions & 2 deletions region_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type RegionPicker struct {
reqQueue chan *RateLimitReq
}

func NewRegionPicker(fn HashFunc64) *RegionPicker {
func NewRegionPicker(fn HashString64) *RegionPicker {
rp := &RegionPicker{
regions: make(map[string]PeerPicker),
reqQueue: make(chan *RateLimitReq, 0),
ReplicatedConsistentHash: NewReplicatedConsistentHash(fn, DefaultReplicas),
ReplicatedConsistentHash: NewReplicatedConsistentHash(fn, defaultReplicas),
}
return rp
}
Expand Down
16 changes: 8 additions & 8 deletions replicated_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import (
"github.com/segmentio/fasthash/fnv1"
)

const DefaultReplicas = 512
const defaultReplicas = 512

type HashFunc64 func(data []byte) uint64
type HashString64 func(data string) uint64

var DefaultHash64 HashFunc64 = fnv1.HashBytes64
var defaultHashString64 HashString64 = fnv1.HashString64

// Implements PeerPicker
type ReplicatedConsistentHash struct {
hashFunc HashFunc64
hashFunc HashString64
peerKeys []peerInfo
peers map[string]*PeerClient
replicas int
Expand All @@ -47,15 +47,15 @@ type peerInfo struct {
peer *PeerClient
}

func NewReplicatedConsistentHash(fn HashFunc64, replicas int) *ReplicatedConsistentHash {
func NewReplicatedConsistentHash(fn HashString64, replicas int) *ReplicatedConsistentHash {
ch := &ReplicatedConsistentHash{
hashFunc: fn,
peers: make(map[string]*PeerClient),
replicas: replicas,
}

if ch.hashFunc == nil {
ch.hashFunc = DefaultHash64
ch.hashFunc = defaultHashString64
}
return ch
}
Expand All @@ -82,7 +82,7 @@ func (ch *ReplicatedConsistentHash) Add(peer *PeerClient) {

key := fmt.Sprintf("%x", md5.Sum([]byte(peer.Info().GRPCAddress)))
for i := 0; i < ch.replicas; i++ {
hash := ch.hashFunc(strToBytesUnsafe(strconv.Itoa(i) + key))
hash := ch.hashFunc(strconv.Itoa(i) + key)
ch.peerKeys = append(ch.peerKeys, peerInfo{
hash: hash,
peer: peer,
Expand All @@ -107,7 +107,7 @@ func (ch *ReplicatedConsistentHash) Get(key string) (*PeerClient, error) {
if ch.Size() == 0 {
return nil, errors.New("unable to pick a peer; pool is empty")
}
hash := ch.hashFunc(strToBytesUnsafe(key))
hash := ch.hashFunc(key)

// Binary search for appropriate peer
idx := sort.Search(len(ch.peerKeys), func(i int) bool { return ch.peerKeys[i].hash >= hash })
Expand Down
20 changes: 10 additions & 10 deletions replicated_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestReplicatedConsistentHash(t *testing.T) {
hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"}

t.Run("Size", func(t *testing.T) {
hash := NewReplicatedConsistentHash(nil, DefaultReplicas)
hash := NewReplicatedConsistentHash(nil, defaultReplicas)

for _, h := range hosts {
hash.Add(&PeerClient{conf: PeerConfig{Info: PeerInfo{GRPCAddress: h}}})
Expand All @@ -23,7 +23,7 @@ func TestReplicatedConsistentHash(t *testing.T) {
})

t.Run("Host", func(t *testing.T) {
hash := NewReplicatedConsistentHash(nil, DefaultReplicas)
hash := NewReplicatedConsistentHash(nil, defaultReplicas)
hostMap := map[string]*PeerClient{}

for _, h := range hosts {
Expand All @@ -46,7 +46,7 @@ func TestReplicatedConsistentHash(t *testing.T) {

for _, tc := range []struct {
name string
inHashFunc HashFunc64
inHashFunc HashString64
outDistribution map[string]int
}{{
name: "default",
Expand All @@ -55,19 +55,19 @@ func TestReplicatedConsistentHash(t *testing.T) {
},
}, {
name: "fasthash/fnv1a",
inHashFunc: fnv1a.HashBytes64,
inHashFunc: fnv1a.HashString64,
outDistribution: map[string]int{
"a.svc.local": 3110, "b.svc.local": 3856, "c.svc.local": 3034,
},
}, {
name: "fasthash/fnv1",
inHashFunc: fnv1.HashBytes64,
inHashFunc: fnv1.HashString64,
outDistribution: map[string]int{
"a.svc.local": 2948, "b.svc.local": 3592, "c.svc.local": 3460,
},
}} {
t.Run(tc.name, func(t *testing.T) {
hash := NewReplicatedConsistentHash(tc.inHashFunc, DefaultReplicas)
hash := NewReplicatedConsistentHash(tc.inHashFunc, defaultReplicas)
distribution := make(map[string]int)

for _, h := range hosts {
Expand All @@ -87,9 +87,9 @@ func TestReplicatedConsistentHash(t *testing.T) {
}

func BenchmarkReplicatedConsistantHash(b *testing.B) {
hashFuncs := map[string]HashFunc64{
"fasthash/fnv1a": fnv1a.HashBytes64,
"fasthash/fnv1": fnv1.HashBytes64,
hashFuncs := map[string]HashString64{
"fasthash/fnv1a": fnv1a.HashString64,
"fasthash/fnv1": fnv1.HashString64,
}

for name, hashFunc := range hashFuncs {
Expand All @@ -99,7 +99,7 @@ func BenchmarkReplicatedConsistantHash(b *testing.B) {
ips[i] = net.IPv4(byte(i>>24), byte(i>>16), byte(i>>8), byte(i)).String()
}

hash := NewReplicatedConsistentHash(hashFunc, DefaultReplicas)
hash := NewReplicatedConsistentHash(hashFunc, defaultReplicas)
hosts := []string{"a.svc.local", "b.svc.local", "c.svc.local"}
for _, h := range hosts {
hash.Add(&PeerClient{conf: PeerConfig{Info: PeerInfo{GRPCAddress: h}}})
Expand Down

0 comments on commit 95920ea

Please sign in to comment.