forked from secondbit/wendy
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnode.go
141 lines (127 loc) · 4.12 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package wendy
import (
"strconv"
"sync"
"sync/atomic"
"time"
)
// Node represents a specific machine in the cluster.
type Node struct {
LocalIP string // The IP through which the Node should be accessed by other Nodes with an identical Region
GlobalIP string // The IP through which the Node should be accessed by other Nodes whose Region differs
Port int // The port the Node is listening on
Region string // A string that allows you to intelligently route between local and global requests for, e.g., EC2 regions
ID NodeID
proximity int64
mutex *sync.RWMutex // lock and unlock a Node for concurrency safety
lastHeardFrom time.Time // The last time we heard from this node
leafsetVersion uint64 // the version number of the leafset
routingTableVersion uint64 // the version number of the routing table
neighborhoodSetVersion uint64 // the version number of the neighborhood set
}
// NewNode initialises a new Node and its associated mutexes. It does *not* update the proximity of the Node.
func NewNode(id NodeID, local, global, region string, port int) *Node {
return &Node{
ID: id,
LocalIP: local,
GlobalIP: global,
Port: port,
Region: region,
proximity: -1,
mutex: new(sync.RWMutex),
lastHeardFrom: time.Now(),
leafsetVersion: 0,
routingTableVersion: 0,
neighborhoodSetVersion: 0,
}
}
// IsZero returns whether or the given Node has been initialised or if it's an empty Node struct. IsZero returns true if the Node has been initialised, false if it's an empty struct.
func (self Node) IsZero() bool {
return self.LocalIP == "" && self.GlobalIP == "" && self.Port == 0
}
// GetIP returns the IP and port that should be used when communicating with a Node, to respect Regions.
func (self Node) GetIP(other Node) string {
self.mutex.RLock()
defer self.mutex.RUnlock()
if other.mutex != nil {
other.mutex.RLock()
defer other.mutex.RUnlock()
}
ip := ""
if self.Region == other.Region {
ip = other.LocalIP
} else {
ip = other.GlobalIP
}
ip = ip + ":" + strconv.Itoa(other.Port)
return ip
}
// Proximity returns the proximity score for the Node, adjusted for the Region. The proximity score of a Node reflects how close it is to the current Node; a lower proximity score means a closer Node. Nodes outside the current Region are penalised by a multiplier.
func (self *Node) Proximity(n *Node) int64 {
if n == nil {
return -1
}
if self.mutex == nil {
self.mutex = new(sync.RWMutex)
}
n.mutex.RLock()
defer n.mutex.RUnlock()
multiplier := int64(1)
if n.Region != self.Region {
multiplier = 5
}
score := n.proximity * multiplier
return score
}
func (self *Node) getRawProximity() int64 {
if self.mutex == nil {
self.mutex = new(sync.RWMutex)
}
self.mutex.RLock()
defer self.mutex.RUnlock()
return self.proximity
}
func (self *Node) setProximity(proximity int64) {
if self.mutex == nil {
self.mutex = new(sync.RWMutex)
}
self.mutex.Lock()
defer self.mutex.Unlock()
self.proximity = proximity
}
func (self *Node) updateLastHeardFrom() {
if self.mutex == nil {
self.mutex = new(sync.RWMutex)
}
self.mutex.Lock()
defer self.mutex.Unlock()
self.lastHeardFrom = time.Now()
}
func (self *Node) LastHeardFrom() time.Time {
if self.mutex == nil {
self.mutex = new(sync.RWMutex)
}
self.mutex.RLock()
defer self.mutex.RUnlock()
return self.lastHeardFrom
}
func (self *Node) incrementLSVersion() {
atomic.AddUint64(&self.leafsetVersion, 1)
}
func (self *Node) incrementRTVersion() {
atomic.AddUint64(&self.routingTableVersion, 1)
}
func (self *Node) incrementNSVersion() {
atomic.AddUint64(&self.neighborhoodSetVersion, 1)
}
func (self *Node) updateVersions(RTVersion, LSVersion, NSVersion uint64) {
for self.routingTableVersion < RTVersion {
self.incrementRTVersion()
}
for self.leafsetVersion < LSVersion {
self.incrementLSVersion()
}
for self.neighborhoodSetVersion < NSVersion {
self.incrementNSVersion()
}
}