-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathentropy.go
316 lines (258 loc) · 8.17 KB
/
entropy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package honu
import (
"fmt"
"time"
"google.golang.org/grpc"
pb "github.com/bbengfort/honu/rpc"
"github.com/bbengfort/x/stats"
"golang.org/x/net/context"
)
// AntiEntropy performs a pairwise, bilateral syncrhonization with a random
// remote peer, first sending our version vector, then sending any required
// versions to the remote host.
//
// NOTE: the view specified is the view at the start of anti-entropy.
func (s *Server) AntiEntropy() {
// Schedule the next anti-entropy session
defer time.AfterFunc(s.delay, s.AntiEntropy)
// Select a random peer for pairwise anti-entropy
reward := 0.0
arm := s.bandit.Select()
peer := s.peers[arm]
// Ensure we update the reward for the bandit when we are done.
defer func() { s.bandit.Update(arm, reward) }()
// TODO: do better at ignoring self-connections
if peer == s.addr {
// Penalize self selection by a lot
reward = -1.0
s.syncs[peer].Misses++
return
}
// Create a connection to the client
conn, err := grpc.Dial(
peer, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(timeout),
)
if err != nil {
s.syncs[peer].Misses++
warn(err.Error())
return
}
defer conn.Close()
// Create a gossip client
client := pb.NewGossipClient(conn)
debug("connected to anti-entropy peer at %s", peer)
// Get the current version vector for every object
vector := s.store.View()
// Create the pull request
req := &pb.PullRequest{
Versions: make(map[string]*pb.Version),
}
for key, version := range vector {
req.Versions[key] = version.topb()
}
// Send the pull request
pullStart := time.Now()
rep, err := client.Pull(context.Background(), req)
if err != nil {
s.syncs[peer].Misses++
warn(err.Error())
return
}
pullLatency := time.Since(pullStart)
s.syncs[peer].Update(pullLatency, "pull")
// Handle the pull response
if !rep.Success {
s.syncs[peer].Misses++
debug("no synchronization occurred")
return
}
reward += 0.25 // add reward for a successful pull request
// add reward for low latency pull requests
if pullLatency < 5*time.Millisecond {
reward += 0.20 // highest reward for local latencies
} else if pullLatency <= 100*time.Millisecond {
reward += 0.10 // reward for close by links that don't globe span.
}
s.syncs[peer].Pulls++
var items uint64
for key, pbentry := range rep.Entries {
entry := new(Entry)
entry.frompb(pbentry)
if s.store.PutEntry(key, entry) {
items++
// Track visibility if requested
if s.visibility != nil && entry.TrackVisibility {
s.visibility.Log(key, entry.Version.String())
if err := s.visibility.Error(); err != nil {
warne(err)
}
}
}
}
if items > 1 {
// add reward for multi-items
reward += 0.05
}
// Send the push request (bilateral)
// Can be fire and forget if needed
if len(rep.Pull.Versions) > 0 {
debug("pushing %d versions back to %s", len(rep.Pull.Versions), peer)
push := &pb.PushRequest{
Entries: make(map[string]*pb.Entry),
}
for key := range rep.Pull.Versions {
entry := s.store.GetEntry(key)
push.Entries[key] = entry.topb()
items++
}
reward += 0.25 // add reward for a push request
if len(rep.Pull.Versions) > 1 {
// add reward for multi-items
reward += 0.05
}
s.syncs[peer].Pushes++
pushStart := time.Now()
client.Push(context.Background(), push)
pushLatency := time.Since(pushStart)
s.syncs[peer].Update(pushLatency, "push")
// add reward for low latency pull requests
if pushLatency < 5*time.Millisecond {
reward += 0.20 // highest reward for local latencies
} else if pushLatency <= 100*time.Millisecond {
reward += 0.10 // reward for close by links that don't globe span.
}
}
// Log anti-entropy success and metrics
s.syncs[peer].Syncs++
s.syncs[peer].Versions += items
info("synchronized %d items to %s", items, peer)
}
//===========================================================================
// Server Gossip RPC methods
//===========================================================================
// Pull handles incoming push requests, comparing the object version with the
// current view of the server and returning a push reply with entries that are
// later than the remote and a pull request where the remote's versions are
// later. This method operates by read locking the entire store.
func (s *Server) Pull(ctx context.Context, in *pb.PullRequest) (*pb.PullReply, error) {
s.store.RLock()
defer s.store.RUnlock()
reply := &pb.PullReply{
Entries: make(map[string]*pb.Entry),
Pull: &pb.PullRequest{
Versions: make(map[string]*pb.Version),
},
}
for key, pbvers := range in.Versions {
// Get the remote version
version := new(Version)
version.frompb(pbvers)
// Get the latest version and compare with old version
entry := s.store.GetEntry(key)
// Compare versions to see which version is later
// Excluded condition is if the versions are equal.
if entry == nil || version.Greater(entry.Version) {
// Remote is greater than our local, request it to be pushed.
// First create the protobuf version
var vers *pb.Version
if entry == nil {
vers = nil
} else {
vers = entry.Version.topb()
}
// Update the version scalar
s.store.Update(key, version)
// Add version to the response
reply.Pull.Versions[key] = vers
} else if entry != nil && entry.Version.Greater(version) {
// Local is greater than the remote, send it on.
reply.Entries[key] = entry.topb()
}
}
// Set success on the reply if synchronization has occurred.
if len(reply.Entries) > 0 || len(reply.Pull.Versions) > 0 {
reply.Success = true
}
return reply, nil
}
// Push handles incoming push requests, accepting any entries in the request
// that are later than the current view. It returns success if any
// synchronization occurs, otherwise false for a late push.
func (s *Server) Push(ctx context.Context, in *pb.PushRequest) (*pb.PushReply, error) {
reply := &pb.PushReply{Success: false}
for key, pbent := range in.Entries {
entry := new(Entry)
entry.frompb(pbent)
if s.store.PutEntry(key, entry) {
reply.Success = true
// Track visibility if requested
if s.visibility != nil && entry.TrackVisibility {
s.visibility.Log(key, entry.Version.String())
if err := s.visibility.Error(); err != nil {
warne(err)
}
}
}
}
return reply, nil
}
//===========================================================================
// Per-peer metrics for syncrhonization
//===========================================================================
// Syncs is a map of peer hostnames to their synchronization statistics.
type Syncs map[string]*SyncStats
// Serialize the syncs to save to JSON format.
func (s Syncs) Serialize() map[string]interface{} {
data := make(map[string]interface{})
for peer, stats := range s {
data[peer] = stats.Serialize()
}
return data
}
// SyncStats represents per-peer pairwise metrics of synchronization.
type SyncStats struct {
Syncs uint64 // Total number of anti-entropy sessions between peers
Pulls uint64 // Number of successful pull exchanges between peers
Pushes uint64 // Number of successful push exchanges between peers
Misses uint64 // Number of unsuccessful exchanges between peers
Versions uint64 // The total number of object versions exchanged
PullLatency *stats.Benchmark
PushLatency *stats.Benchmark
initialized bool
}
// Init the Syncstats to ensure it's ready for updating.
func (s *SyncStats) Init() {
s.PullLatency = new(stats.Benchmark)
s.PushLatency = new(stats.Benchmark)
s.initialized = true
}
// Update the latency of the given type
func (s *SyncStats) Update(latency time.Duration, method string) error {
if !s.initialized {
s.Init()
}
switch method {
case "pull":
s.PullLatency.Update(latency)
case "push":
s.PushLatency.Update(latency)
default:
return fmt.Errorf("no method '%s'", method)
}
return nil
}
// Serialize the SyncStats to write to disk
func (s *SyncStats) Serialize() map[string]interface{} {
if !s.initialized {
s.Init()
}
data := make(map[string]interface{})
data["Syncs"] = s.Syncs
data["Pulls"] = s.Pulls
data["Pushes"] = s.Pushes
data["Misses"] = s.Misses
data["Versions"] = s.Versions
data["PullLatency"] = s.PullLatency.Serialize()
data["PushLatency"] = s.PushLatency.Serialize()
return data
}