-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathconnection.go
179 lines (156 loc) · 3.59 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package gremtune
import (
"net/http"
"time"
"sync"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)
type dialer interface {
connect() error
IsConnected() bool
IsDisposed() bool
write([]byte) error
read() (int, []byte, error)
close() error
getAuth() *auth
ping(errs chan error)
}
/////
/*
WebSocket Connection
*/
/////
// Ws is the dialer for a WebSocket connection
type Ws struct {
host string
conn *websocket.Conn
auth *auth
disposed bool
connected bool
pingInterval time.Duration
writingWait time.Duration
readingWait time.Duration
timeout time.Duration
readBufSize int
writeBufSize int
quit chan struct{}
sync.RWMutex
}
//Auth is the container for authentication data of dialer
type auth struct {
username string
password string
}
func (ws *Ws) connect() (err error) {
d := websocket.Dialer{
WriteBufferSize: ws.writeBufSize,
ReadBufferSize: ws.readBufSize,
HandshakeTimeout: ws.timeout, // Timeout or else we'll hang forever and never fail on bad hosts.
}
ws.conn, _, err = d.Dial(ws.host, http.Header{})
if err != nil {
// As of 3.2.2 the URL has changed.
// https://groups.google.com/forum/#!msg/gremlin-users/x4hiHsmTsHM/Xe4GcPtRCAAJ
ws.host = ws.host + "/gremlin"
ws.conn, _, err = d.Dial(ws.host, http.Header{})
}
if err == nil {
ws.connected = true
ws.conn.SetPongHandler(func(appData string) error {
ws.connected = true
return nil
})
}
return
}
// IsConnected returns whether the underlying websocket is connected
func (ws *Ws) IsConnected() bool {
return ws.connected
}
// IsDisposed returns whether the underlying websocket is disposed
func (ws *Ws) IsDisposed() bool {
return ws.disposed
}
func (ws *Ws) write(msg []byte) (err error) {
err = ws.conn.WriteMessage(2, msg)
return
}
func (ws *Ws) read() (msgType int, msg []byte, err error) {
msgType, msg, err = ws.conn.ReadMessage()
return
}
func (ws *Ws) close() (err error) {
defer func() {
close(ws.quit)
ws.conn.Close()
ws.disposed = true
}()
err = ws.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) //Cleanly close the connection with the server
return
}
func (ws *Ws) getAuth() *auth {
if ws.auth == nil {
panic("You must create a Secure Dialer for authenticate with the server")
}
return ws.auth
}
func (ws *Ws) ping(errs chan error) {
ticker := time.NewTicker(ws.pingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
connected := true
if err := ws.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(ws.writingWait)); err != nil {
errs <- err
connected = false
}
ws.Lock()
ws.connected = connected
ws.Unlock()
case <-ws.quit:
return
}
}
}
func (c *Client) writeWorker(errs chan error, quit chan struct{}) { // writeWorker works on a loop and dispatches messages as soon as it receives them
for {
select {
case msg := <-c.requests:
c.Lock()
err := c.conn.write(msg)
if err != nil {
errs <- err
c.Errored = true
c.Unlock()
break
}
c.Unlock()
case <-quit:
return
}
}
}
func (c *Client) readWorker(errs chan error, quit chan struct{}) { // readWorker works on a loop and sorts messages as soon as it receives them
for {
msgType, msg, err := c.conn.read()
if msgType == -1 { // msgType == -1 is noFrame (close connection)
return
}
if err != nil {
errs <- errors.Wrapf(err, "Receive message type: %d", msgType)
c.Errored = true
break
}
if msg != nil {
c.handleResponse(msg)
}
select {
case <-quit:
return
default:
continue
}
}
}