-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathzk_handler.go
96 lines (80 loc) · 2.34 KB
/
zk_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package drill
import (
"fmt"
"time"
"github.com/factset/go-drill/internal/log"
"github.com/factset/go-drill/internal/rpc/proto/exec"
"github.com/go-zookeeper/zk"
"google.golang.org/protobuf/proto"
)
type zkconn interface {
Get(path string) ([]byte, *zk.Stat, error)
Children(path string) ([]string, *zk.Stat, error)
Close()
}
type zkHandler struct {
conn zkconn
Nodes []string
Path string
Connecting bool
Err error
}
// newZKHandler attempts to connect to a zookeeper cluster made up of the provided nodes.
//
// The cluster passed in here would be the Drill cluster name which is used to form the path
// to the drill meta data information.
func newZKHandler(path string, nodes ...string) (*zkHandler, error) {
hdlr := &zkHandler{Connecting: true, Nodes: zk.FormatServers(nodes), Path: path}
var err error
hdlr.conn, _, err = zk.Connect(hdlr.Nodes, 30*time.Second, zk.WithLogger(&log.Logger), zk.WithEventCallback(func(ev zk.Event) {
switch ev.Type {
case zk.EventSession:
switch ev.State {
case zk.StateAuthFailed:
hdlr.Err = fmt.Errorf("ZK Auth Failed: %w", zk.ErrAuthFailed)
hdlr.conn.Close()
case zk.StateExpired:
hdlr.Err = fmt.Errorf("ZK Session Expired: %w", zk.ErrSessionExpired)
hdlr.conn.Close()
}
}
hdlr.Connecting = false
if ev.State == zk.StateConnected {
log.Print("Connected to Zookeeper.")
}
}))
if err != nil {
return nil, err
}
return hdlr, nil
}
// GetDrillBits returns the list of drillbit names that can in turn be passed to
// GetEndpoint to get the endpoint information to connect to them.
func (z *zkHandler) GetDrillBits() []string {
children, stat, err := z.conn.Children(z.Path)
if err != nil {
z.Err = err
}
log.Printf("%+v %+v", children, stat)
return children
}
// GetEndpoint returns the information necessary to connect to a given drillbit
// from its name.
func (z *zkHandler) GetEndpoint(drillbit string) Drillbit {
data, _, err := z.conn.Get(z.Path + "/" + drillbit)
if err != nil {
z.Err = err
return nil
}
drillServer := exec.DrillServiceInstance{}
if err = proto.Unmarshal(data, &drillServer); err != nil {
z.Err = err
return nil
}
log.Printf("%+v", drillServer.String())
return drillServer.GetEndpoint()
}
// Close closes the zookeeper connection and should be called when finished.
func (z *zkHandler) Close() {
z.conn.Close()
}